You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nw...@apache.org on 2019/01/29 16:18:15 UTC
[incubator-heron] branch master updated: Soften padding constraint
(#3178)
This is an automated email from the ASF dual-hosted git repository.
nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new 3ebfff7 Soften padding constraint (#3178)
3ebfff7 is described below
commit 3ebfff7e87d8f26c337f805c6619b2f0d1986955
Author: Xiaoyao Qian <qi...@illinois.edu>
AuthorDate: Tue Jan 29 08:18:10 2019 -0800
Soften padding constraint (#3178)
* Soften padding constraint
* address comments
---
.../packing/roundrobin/RoundRobinPacking.java | 87 +++++++++++++---
.../packing/roundrobin/RoundRobinPackingTest.java | 113 ++++++++++++++++++++-
2 files changed, 182 insertions(+), 18 deletions(-)
diff --git a/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java b/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
index c48d6a2..84bc667 100644
--- a/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
+++ b/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
@@ -20,12 +20,14 @@
package org.apache.heron.packing.roundrobin;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Logger;
+import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
@@ -34,6 +36,7 @@ import org.apache.heron.api.utils.TopologyUtils;
import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.common.basics.CPUShare;
import org.apache.heron.common.basics.ResourceMeasure;
+import org.apache.heron.packing.RamRequirement;
import org.apache.heron.spi.common.Config;
import org.apache.heron.spi.common.Context;
import org.apache.heron.spi.packing.IPacking;
@@ -140,12 +143,16 @@ public class RoundRobinPacking implements IPacking, IRepacking {
Map<Integer, List<InstanceId>> roundRobinAllocation =
getRoundRobinAllocation(numContainer, parallelismMap);
+ ByteAmount containerDiskInBytes = getContainerDiskHint(roundRobinAllocation);
+ double containerCpuHint = getContainerCpuHint(roundRobinAllocation);
+ ByteAmount containerRamHint = getContainerRamHint(roundRobinAllocation);
+
// Get the RAM map for every instance
Map<Integer, Map<InstanceId, ByteAmount>> instancesRamMap =
calculateInstancesResourceMapInContainer(
roundRobinAllocation,
TopologyUtils.getComponentRamMapConfig(topology),
- getContainerRamHint(roundRobinAllocation),
+ containerRamHint,
instanceRamDefault,
containerRamPadding,
ByteAmount.ZERO,
@@ -157,17 +164,13 @@ public class RoundRobinPacking implements IPacking, IRepacking {
calculateInstancesResourceMapInContainer(
roundRobinAllocation,
CPUShare.convertDoubleMapToCpuShareMap(TopologyUtils.getComponentCpuMapConfig(topology)),
- CPUShare.fromDouble(getContainerCpuHint(roundRobinAllocation)),
+ CPUShare.fromDouble(containerCpuHint),
CPUShare.fromDouble(instanceCpuDefault),
CPUShare.fromDouble(containerCpuPadding),
CPUShare.fromDouble(0.0),
CPUShare.fromDouble(NOT_SPECIFIED_CPU_SHARE),
CPU);
- ByteAmount containerDiskInBytes = getContainerDiskHint(roundRobinAllocation);
- double containerCpuHint = getContainerCpuHint(roundRobinAllocation);
- ByteAmount containerRamHint = getContainerRamHint(roundRobinAllocation);
-
LOG.info(String.format("Pack internal: container CPU hint: %.3f, RAM hint: %s, disk hint: %s.",
containerCpuHint,
containerRamHint.toString(),
@@ -180,8 +183,8 @@ public class RoundRobinPacking implements IPacking, IRepacking {
// Calculate the resource required for single instance
Map<InstanceId, PackingPlan.InstancePlan> instancePlanMap = new HashMap<>();
- ByteAmount containerRam = containerRamPadding;
- double containerCpu = containerCpuPadding;
+ ByteAmount containerRam = ByteAmount.ZERO;
+ double containerCpu = 0.0;
for (InstanceId instanceId : instanceList) {
ByteAmount instanceRam = instancesRamMap.get(containerId).get(instanceId);
@@ -198,6 +201,20 @@ public class RoundRobinPacking implements IPacking, IRepacking {
containerCpu += instanceCpu;
}
+ // finalize container resource
+ if (!containerRamHint.equals(NOT_SPECIFIED_BYTE_AMOUNT)) {
+ containerRam = ByteAmount.fromBytes(
+ Math.min(containerRam.plus(containerRamPadding).asBytes(), containerRamHint.asBytes()));
+ } else {
+ containerRam = containerRam.plus(containerRamPadding);
+ }
+
+ if (containerCpuHint != NOT_SPECIFIED_CPU_SHARE) {
+ containerCpu = Math.min(containerCpu + containerCpuPadding, containerCpuHint);
+ } else {
+ containerCpu += containerCpuPadding;
+ }
+
Resource resource = new Resource(Math.max(containerCpu, containerCpuHint),
containerRam, containerDiskInBytes);
PackingPlan.ContainerPlan containerPlan = new PackingPlan.ContainerPlan(
@@ -280,13 +297,25 @@ public class RoundRobinPacking implements IPacking, IRepacking {
}
}
- // Validate instance resources specified so far don't violate container-level constraint
+ // Soft padding constraint validation: warn if padding amount cannot be accommodated
+ boolean paddingThrottling = false;
if (!containerResHint.equals(notSpecified)
&& usedRes.greaterThan(containerResHint.minus(containerResPadding))) {
- throw new PackingException(String.format("Invalid packing plan generated. "
- + "Total instance %s in a container (%s) + padding(%s) have exceeded "
- + "the container-level constraint of %s.",
- resourceType, usedRes.toString(), containerResPadding.toString(), containerResHint));
+ // Validate instance resources specified so far don't violate container-level constraint
+ if (usedRes.greaterThan(containerResHint)) {
+ throw new PackingException(String.format("Invalid packing plan generated. "
+ + "Total instance %s (%s) in container#%d have exceeded "
+ + "the container-level constraint of %s.",
+ resourceType, usedRes.toString(), containerId, containerResHint.toString()));
+ }
+
+ paddingThrottling = true;
+ LOG.warning(String.format("Container#%d (max %s: %s) is now hosting instances that "
+ + "take up to %s %s. The container may not have enough resource to accommodate "
+ + "internal processes which take up to %s %s.",
+ containerId, resourceType, containerResHint.toString(),
+ usedRes.toString(), resourceType,
+ containerResPadding.toString(), resourceType));
}
// calculate resource for the remaining unspecified instances if any
@@ -296,7 +325,12 @@ public class RoundRobinPacking implements IPacking, IRepacking {
// If container resource is specified
if (!containerResHint.equals(notSpecified)) {
// discount resource for heron internal process (padding) and used (usedRes)
- T remainingRes = (T) containerResHint.minus(containerResPadding).minus(usedRes);
+ T remainingRes;
+ if (paddingThrottling) {
+ remainingRes = (T) containerResHint.minus(usedRes);
+ } else {
+ remainingRes = (T) containerResHint.minus(containerResPadding).minus(usedRes);
+ }
if (remainingRes.lessOrEqual(zero)) {
throw new PackingException(String.format("Invalid packing plan generated. "
@@ -336,7 +370,12 @@ public class RoundRobinPacking implements IPacking, IRepacking {
int index = 1;
int globalTaskIndex = 1;
- for (String component : parallelismMap.keySet()) {
+
+ // To ensure we spread out the big instances first
+ // Only sorting by RAM here because only RAM can be explicitly capped by JVM processes
+ List<String> sortedInstances = getSortedRAMComponents(parallelismMap.keySet()).stream()
+ .map(RamRequirement::getComponentName).collect(Collectors.toList());
+ for (String component : sortedInstances) {
int numInstance = parallelismMap.get(component);
for (int i = 0; i < numInstance; ++i) {
allocation.get(index).add(new InstanceId(component, globalTaskIndex, i));
@@ -347,6 +386,24 @@ public class RoundRobinPacking implements IPacking, IRepacking {
return allocation;
}
+
+ /**
+ * Sort the components in decreasing order based on their RAM requirements
+ *
+ * @return The sorted list of components and their RAM requirements
+ */
+ private ArrayList<RamRequirement> getSortedRAMComponents(Set<String> componentNames) {
+ ArrayList<RamRequirement> ramRequirements = new ArrayList<>();
+ Map<String, ByteAmount> ramMap = TopologyUtils.getComponentRamMapConfig(topology);
+
+ for (String componentName : componentNames) {
+ ramRequirements.add(new RamRequirement(componentName,
+ ramMap.getOrDefault(componentName, ByteAmount.ZERO)));
+ }
+ Collections.sort(ramRequirements, Collections.reverseOrder());
+ return ramRequirements;
+ }
+
/**
* Get # of instances in the largest container
*
diff --git a/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java b/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
index 244fa88..109ae47 100644
--- a/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
+++ b/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
@@ -250,7 +250,73 @@ public class RoundRobinPackingTest {
* Test the scenario RAM map config is completely set
*/
@Test
- public void testCompleteRamMapRequested() throws Exception {
+ public void testCompleteRamMapRequestedWithExactlyEnoughResource() throws Exception {
+ int numContainers = 2;
+ int spoutParallelism = 4;
+ int boltParallelism = 3;
+ Integer totalInstances = spoutParallelism + boltParallelism;
+
+ // Set up the topology and its config
+ org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
+ topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
+
+ // Explicit set resources for container
+ // the value should be ignored, since we set the complete component RAM map
+ ByteAmount containerRam = ByteAmount.fromGigabytes(8);
+
+ // Explicit set component RAM map
+ ByteAmount boltRam = ByteAmount.fromGigabytes(1);
+ ByteAmount spoutRam = ByteAmount.fromGigabytes(2);
+
+ topologyConfig.setContainerRamRequested(containerRam);
+ topologyConfig.setComponentRam(BOLT_NAME, boltRam);
+ topologyConfig.setComponentRam(SPOUT_NAME, spoutRam);
+
+ TopologyAPI.Topology topologyExplicitRamMap =
+ getTopology(spoutParallelism, boltParallelism, topologyConfig);
+ PackingPlan packingPlanExplicitRamMap =
+ getRoundRobinPackingPlan(topologyExplicitRamMap);
+
+ AssertPacking.assertContainers(packingPlanExplicitRamMap.getContainers(),
+ BOLT_NAME, SPOUT_NAME, boltRam, spoutRam, null);
+ Assert.assertEquals(totalInstances, packingPlanExplicitRamMap.getInstanceCount());
+ }
+
+ /**
+ * Test the scenario RAM map config is completely set
+ */
+ @Test(expected = PackingException.class)
+ public void testCompleteRamMapRequestedWithLessThanEnoughResource() throws Exception {
+ int numContainers = 2;
+ int spoutParallelism = 4;
+ int boltParallelism = 3;
+
+ // Set up the topology and its config
+ org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
+ topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
+
+ // Explicit set resources for container
+ // the value should be ignored, since we set the complete component RAM map
+ ByteAmount containerRam = ByteAmount.fromGigabytes(2);
+
+ // Explicit set component RAM map
+ ByteAmount boltRam = ByteAmount.fromGigabytes(1);
+ ByteAmount spoutRam = ByteAmount.fromGigabytes(2);
+
+ topologyConfig.setContainerRamRequested(containerRam);
+ topologyConfig.setComponentRam(BOLT_NAME, boltRam);
+ topologyConfig.setComponentRam(SPOUT_NAME, spoutRam);
+
+ TopologyAPI.Topology topologyExplicitRamMap =
+ getTopology(spoutParallelism, boltParallelism, topologyConfig);
+ getRoundRobinPackingPlan(topologyExplicitRamMap);
+ }
+
+ /**
+ * Test the scenario RAM map config is completely set
+ */
+ @Test
+ public void testCompleteRamMapRequestedWithMoreThanEnoughResource() throws Exception {
int numContainers = 2;
int spoutParallelism = 4;
int boltParallelism = 3;
@@ -283,6 +349,42 @@ public class RoundRobinPackingTest {
}
/**
+ * Test the scenario RAM map config is completely set
+ */
+ @Test
+ public void testCompleteRamMapRequestedWithoutPaddingResource() throws Exception {
+ int numContainers = 2;
+ int spoutParallelism = 4;
+ int boltParallelism = 3;
+ Integer totalInstances = spoutParallelism + boltParallelism;
+
+ // Set up the topology and its config
+ org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
+ topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
+
+ // Explicit set resources for container
+ // the value should be ignored, since we set the complete component RAM map
+ ByteAmount containerRam = ByteAmount.fromGigabytes(6);
+
+ // Explicit set component RAM map
+ ByteAmount boltRam = ByteAmount.fromGigabytes(1);
+ ByteAmount spoutRam = ByteAmount.fromGigabytes(2);
+
+ topologyConfig.setContainerRamRequested(containerRam);
+ topologyConfig.setComponentRam(BOLT_NAME, boltRam);
+ topologyConfig.setComponentRam(SPOUT_NAME, spoutRam);
+
+ TopologyAPI.Topology topologyExplicitRamMap =
+ getTopology(spoutParallelism, boltParallelism, topologyConfig);
+ PackingPlan packingPlanExplicitRamMap =
+ getRoundRobinPackingPlan(topologyExplicitRamMap);
+
+ AssertPacking.assertContainers(packingPlanExplicitRamMap.getContainers(),
+ BOLT_NAME, SPOUT_NAME, boltRam, spoutRam, null);
+ Assert.assertEquals(totalInstances, packingPlanExplicitRamMap.getInstanceCount());
+ }
+
+ /**
* Test the scenario CPU map config is completely set and there are exactly enough resource
*/
@Test
@@ -360,6 +462,7 @@ public class RoundRobinPackingTest {
int numContainers = 2;
int spoutParallelism = 4;
int boltParallelism = 3;
+ Integer totalInstances = spoutParallelism + boltParallelism;
// Set up the topology and its config
org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
@@ -378,14 +481,18 @@ public class RoundRobinPackingTest {
TopologyAPI.Topology topologyExplicitCpuMap =
getTopology(spoutParallelism, boltParallelism, topologyConfig);
- getRoundRobinPackingPlan(topologyExplicitCpuMap);
+ PackingPlan packingPlanExplicitCpuMap = getRoundRobinPackingPlan(topologyExplicitCpuMap);
+
+ AssertPacking.assertContainers(packingPlanExplicitCpuMap.getContainers(),
+ BOLT_NAME, SPOUT_NAME, boltCpu, spoutCpu, null);
+ Assert.assertEquals(totalInstances, packingPlanExplicitCpuMap.getInstanceCount());
}
/**
* Test the scenario CPU map config is completely set
* and there are exactly enough resource for instances, but not enough for padding
*/
- @Test(expected = PackingException.class)
+ @Test
public void testCompleteCpuMapRequestedWithoutPaddingResource() throws Exception {
int numContainers = 2;
int spoutParallelism = 4;