You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nw...@apache.org on 2019/01/29 16:18:15 UTC

[incubator-heron] branch master updated: Soften padding constraint (#3178)

This is an automated email from the ASF dual-hosted git repository.

nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ebfff7  Soften padding constraint (#3178)
3ebfff7 is described below

commit 3ebfff7e87d8f26c337f805c6619b2f0d1986955
Author: Xiaoyao Qian <qi...@illinois.edu>
AuthorDate: Tue Jan 29 08:18:10 2019 -0800

    Soften padding constraint (#3178)
    
    * Soften padding constraint
    
    * address comments
---
 .../packing/roundrobin/RoundRobinPacking.java      |  87 +++++++++++++---
 .../packing/roundrobin/RoundRobinPackingTest.java  | 113 ++++++++++++++++++++-
 2 files changed, 182 insertions(+), 18 deletions(-)

diff --git a/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java b/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
index c48d6a2..84bc667 100644
--- a/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
+++ b/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
@@ -20,12 +20,14 @@
 package org.apache.heron.packing.roundrobin;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.logging.Logger;
+import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -34,6 +36,7 @@ import org.apache.heron.api.utils.TopologyUtils;
 import org.apache.heron.common.basics.ByteAmount;
 import org.apache.heron.common.basics.CPUShare;
 import org.apache.heron.common.basics.ResourceMeasure;
+import org.apache.heron.packing.RamRequirement;
 import org.apache.heron.spi.common.Config;
 import org.apache.heron.spi.common.Context;
 import org.apache.heron.spi.packing.IPacking;
@@ -140,12 +143,16 @@ public class RoundRobinPacking implements IPacking, IRepacking {
     Map<Integer, List<InstanceId>> roundRobinAllocation =
         getRoundRobinAllocation(numContainer, parallelismMap);
 
+    ByteAmount containerDiskInBytes = getContainerDiskHint(roundRobinAllocation);
+    double containerCpuHint = getContainerCpuHint(roundRobinAllocation);
+    ByteAmount containerRamHint = getContainerRamHint(roundRobinAllocation);
+
     // Get the RAM map for every instance
     Map<Integer, Map<InstanceId, ByteAmount>> instancesRamMap =
         calculateInstancesResourceMapInContainer(
         roundRobinAllocation,
         TopologyUtils.getComponentRamMapConfig(topology),
-        getContainerRamHint(roundRobinAllocation),
+        containerRamHint,
         instanceRamDefault,
         containerRamPadding,
         ByteAmount.ZERO,
@@ -157,17 +164,13 @@ public class RoundRobinPacking implements IPacking, IRepacking {
         calculateInstancesResourceMapInContainer(
         roundRobinAllocation,
         CPUShare.convertDoubleMapToCpuShareMap(TopologyUtils.getComponentCpuMapConfig(topology)),
-        CPUShare.fromDouble(getContainerCpuHint(roundRobinAllocation)),
+        CPUShare.fromDouble(containerCpuHint),
         CPUShare.fromDouble(instanceCpuDefault),
         CPUShare.fromDouble(containerCpuPadding),
         CPUShare.fromDouble(0.0),
         CPUShare.fromDouble(NOT_SPECIFIED_CPU_SHARE),
         CPU);
 
-    ByteAmount containerDiskInBytes = getContainerDiskHint(roundRobinAllocation);
-    double containerCpuHint = getContainerCpuHint(roundRobinAllocation);
-    ByteAmount containerRamHint = getContainerRamHint(roundRobinAllocation);
-
     LOG.info(String.format("Pack internal: container CPU hint: %.3f, RAM hint: %s, disk hint: %s.",
         containerCpuHint,
         containerRamHint.toString(),
@@ -180,8 +183,8 @@ public class RoundRobinPacking implements IPacking, IRepacking {
 
       // Calculate the resource required for single instance
       Map<InstanceId, PackingPlan.InstancePlan> instancePlanMap = new HashMap<>();
-      ByteAmount containerRam = containerRamPadding;
-      double containerCpu = containerCpuPadding;
+      ByteAmount containerRam = ByteAmount.ZERO;
+      double containerCpu = 0.0;
 
       for (InstanceId instanceId : instanceList) {
         ByteAmount instanceRam = instancesRamMap.get(containerId).get(instanceId);
@@ -198,6 +201,20 @@ public class RoundRobinPacking implements IPacking, IRepacking {
         containerCpu += instanceCpu;
       }
 
+      // finalize container resource
+      if (!containerRamHint.equals(NOT_SPECIFIED_BYTE_AMOUNT)) {
+        containerRam = ByteAmount.fromBytes(
+            Math.min(containerRam.plus(containerRamPadding).asBytes(), containerRamHint.asBytes()));
+      } else {
+        containerRam = containerRam.plus(containerRamPadding);
+      }
+
+      if (containerCpuHint != NOT_SPECIFIED_CPU_SHARE) {
+        containerCpu = Math.min(containerCpu + containerCpuPadding, containerCpuHint);
+      } else {
+        containerCpu += containerCpuPadding;
+      }
+
       Resource resource = new Resource(Math.max(containerCpu, containerCpuHint),
           containerRam, containerDiskInBytes);
       PackingPlan.ContainerPlan containerPlan = new PackingPlan.ContainerPlan(
@@ -280,13 +297,25 @@ public class RoundRobinPacking implements IPacking, IRepacking {
         }
       }
 
-      // Validate instance resources specified so far don't violate container-level constraint
+      // Soft padding constraint validation: warn if padding amount cannot be accommodated
+      boolean paddingThrottling = false;
       if (!containerResHint.equals(notSpecified)
           && usedRes.greaterThan(containerResHint.minus(containerResPadding))) {
-        throw new PackingException(String.format("Invalid packing plan generated. "
-                + "Total instance %s in a container (%s) + padding(%s) have exceeded "
-                + "the container-level constraint of %s.",
-            resourceType, usedRes.toString(), containerResPadding.toString(), containerResHint));
+        // Validate instance resources specified so far don't violate container-level constraint
+        if (usedRes.greaterThan(containerResHint)) {
+          throw new PackingException(String.format("Invalid packing plan generated. "
+                  + "Total instance %s (%s) in container#%d have exceeded "
+                  + "the container-level constraint of %s.",
+              resourceType, usedRes.toString(), containerId, containerResHint.toString()));
+        }
+
+        paddingThrottling = true;
+        LOG.warning(String.format("Container#%d (max %s: %s) is now hosting instances that "
+                + "take up to %s %s. The container may not have enough resource to accommodate "
+                + "internal processes which take up to %s %s.",
+            containerId, resourceType, containerResHint.toString(),
+            usedRes.toString(), resourceType,
+            containerResPadding.toString(), resourceType));
       }
 
       // calculate resource for the remaining unspecified instances if any
@@ -296,7 +325,12 @@ public class RoundRobinPacking implements IPacking, IRepacking {
         // If container resource is specified
         if (!containerResHint.equals(notSpecified)) {
           // discount resource for heron internal process (padding) and used (usedRes)
-          T remainingRes = (T) containerResHint.minus(containerResPadding).minus(usedRes);
+          T remainingRes;
+          if (paddingThrottling) {
+            remainingRes = (T) containerResHint.minus(usedRes);
+          } else {
+            remainingRes = (T) containerResHint.minus(containerResPadding).minus(usedRes);
+          }
 
           if (remainingRes.lessOrEqual(zero)) {
             throw new PackingException(String.format("Invalid packing plan generated. "
@@ -336,7 +370,12 @@ public class RoundRobinPacking implements IPacking, IRepacking {
 
     int index = 1;
     int globalTaskIndex = 1;
-    for (String component : parallelismMap.keySet()) {
+
+    // To ensure we spread out the big instances first
+    // Only sorting by RAM here because only RAM can be explicitly capped by JVM processes
+    List<String> sortedInstances = getSortedRAMComponents(parallelismMap.keySet()).stream()
+        .map(RamRequirement::getComponentName).collect(Collectors.toList());
+    for (String component : sortedInstances) {
       int numInstance = parallelismMap.get(component);
       for (int i = 0; i < numInstance; ++i) {
         allocation.get(index).add(new InstanceId(component, globalTaskIndex, i));
@@ -347,6 +386,24 @@ public class RoundRobinPacking implements IPacking, IRepacking {
     return allocation;
   }
 
+
+  /**
+   * Sort the components in decreasing order based on their RAM requirements
+   *
+   * @return The sorted list of components and their RAM requirements
+   */
+  private ArrayList<RamRequirement> getSortedRAMComponents(Set<String> componentNames) {
+    ArrayList<RamRequirement> ramRequirements = new ArrayList<>();
+    Map<String, ByteAmount> ramMap = TopologyUtils.getComponentRamMapConfig(topology);
+
+    for (String componentName : componentNames) {
+      ramRequirements.add(new RamRequirement(componentName,
+          ramMap.getOrDefault(componentName, ByteAmount.ZERO)));
+    }
+    Collections.sort(ramRequirements, Collections.reverseOrder());
+    return ramRequirements;
+  }
+
   /**
    * Get # of instances in the largest container
    *
diff --git a/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java b/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
index 244fa88..109ae47 100644
--- a/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
+++ b/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
@@ -250,7 +250,73 @@ public class RoundRobinPackingTest {
    * Test the scenario RAM map config is completely set
    */
   @Test
-  public void testCompleteRamMapRequested() throws Exception {
+  public void testCompleteRamMapRequestedWithExactlyEnoughResource() throws Exception {
+    int numContainers = 2;
+    int spoutParallelism = 4;
+    int boltParallelism = 3;
+    Integer totalInstances = spoutParallelism + boltParallelism;
+
+    // Set up the topology and its config
+    org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
+    topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
+
+    // Explicit set resources for container
+    // the value should be ignored, since we set the complete component RAM map
+    ByteAmount containerRam = ByteAmount.fromGigabytes(8);
+
+    // Explicit set component RAM map
+    ByteAmount boltRam = ByteAmount.fromGigabytes(1);
+    ByteAmount spoutRam = ByteAmount.fromGigabytes(2);
+
+    topologyConfig.setContainerRamRequested(containerRam);
+    topologyConfig.setComponentRam(BOLT_NAME, boltRam);
+    topologyConfig.setComponentRam(SPOUT_NAME, spoutRam);
+
+    TopologyAPI.Topology topologyExplicitRamMap =
+        getTopology(spoutParallelism, boltParallelism, topologyConfig);
+    PackingPlan packingPlanExplicitRamMap =
+        getRoundRobinPackingPlan(topologyExplicitRamMap);
+
+    AssertPacking.assertContainers(packingPlanExplicitRamMap.getContainers(),
+        BOLT_NAME, SPOUT_NAME, boltRam, spoutRam, null);
+    Assert.assertEquals(totalInstances, packingPlanExplicitRamMap.getInstanceCount());
+  }
+
+  /**
+   * Test the scenario RAM map config is completely set
+   */
+  @Test(expected = PackingException.class)
+  public void testCompleteRamMapRequestedWithLessThanEnoughResource() throws Exception {
+    int numContainers = 2;
+    int spoutParallelism = 4;
+    int boltParallelism = 3;
+
+    // Set up the topology and its config
+    org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
+    topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
+
+    // Explicit set resources for container
+    // the value should be ignored, since we set the complete component RAM map
+    ByteAmount containerRam = ByteAmount.fromGigabytes(2);
+
+    // Explicit set component RAM map
+    ByteAmount boltRam = ByteAmount.fromGigabytes(1);
+    ByteAmount spoutRam = ByteAmount.fromGigabytes(2);
+
+    topologyConfig.setContainerRamRequested(containerRam);
+    topologyConfig.setComponentRam(BOLT_NAME, boltRam);
+    topologyConfig.setComponentRam(SPOUT_NAME, spoutRam);
+
+    TopologyAPI.Topology topologyExplicitRamMap =
+        getTopology(spoutParallelism, boltParallelism, topologyConfig);
+    getRoundRobinPackingPlan(topologyExplicitRamMap);
+  }
+
+  /**
+   * Test the scenario RAM map config is completely set
+   */
+  @Test
+  public void testCompleteRamMapRequestedWithMoreThanEnoughResource() throws Exception {
     int numContainers = 2;
     int spoutParallelism = 4;
     int boltParallelism = 3;
@@ -283,6 +349,42 @@ public class RoundRobinPackingTest {
   }
 
   /**
+   * Test the scenario RAM map config is completely set
+   */
+  @Test
+  public void testCompleteRamMapRequestedWithoutPaddingResource() throws Exception {
+    int numContainers = 2;
+    int spoutParallelism = 4;
+    int boltParallelism = 3;
+    Integer totalInstances = spoutParallelism + boltParallelism;
+
+    // Set up the topology and its config
+    org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
+    topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
+
+    // Explicit set resources for container
+    // the value should be ignored, since we set the complete component RAM map
+    ByteAmount containerRam = ByteAmount.fromGigabytes(6);
+
+    // Explicit set component RAM map
+    ByteAmount boltRam = ByteAmount.fromGigabytes(1);
+    ByteAmount spoutRam = ByteAmount.fromGigabytes(2);
+
+    topologyConfig.setContainerRamRequested(containerRam);
+    topologyConfig.setComponentRam(BOLT_NAME, boltRam);
+    topologyConfig.setComponentRam(SPOUT_NAME, spoutRam);
+
+    TopologyAPI.Topology topologyExplicitRamMap =
+        getTopology(spoutParallelism, boltParallelism, topologyConfig);
+    PackingPlan packingPlanExplicitRamMap =
+        getRoundRobinPackingPlan(topologyExplicitRamMap);
+
+    AssertPacking.assertContainers(packingPlanExplicitRamMap.getContainers(),
+        BOLT_NAME, SPOUT_NAME, boltRam, spoutRam, null);
+    Assert.assertEquals(totalInstances, packingPlanExplicitRamMap.getInstanceCount());
+  }
+
+  /**
    * Test the scenario CPU map config is completely set and there are exactly enough resource
    */
   @Test
@@ -360,6 +462,7 @@ public class RoundRobinPackingTest {
     int numContainers = 2;
     int spoutParallelism = 4;
     int boltParallelism = 3;
+    Integer totalInstances = spoutParallelism + boltParallelism;
 
     // Set up the topology and its config
     org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
@@ -378,14 +481,18 @@ public class RoundRobinPackingTest {
 
     TopologyAPI.Topology topologyExplicitCpuMap =
         getTopology(spoutParallelism, boltParallelism, topologyConfig);
-    getRoundRobinPackingPlan(topologyExplicitCpuMap);
+    PackingPlan packingPlanExplicitCpuMap = getRoundRobinPackingPlan(topologyExplicitCpuMap);
+
+    AssertPacking.assertContainers(packingPlanExplicitCpuMap.getContainers(),
+        BOLT_NAME, SPOUT_NAME, boltCpu, spoutCpu, null);
+    Assert.assertEquals(totalInstances, packingPlanExplicitCpuMap.getInstanceCount());
   }
 
   /**
    * Test the scenario CPU map config is completely set
    * and there are exactly enough resource for instances, but not enough for padding
    */
-  @Test(expected = PackingException.class)
+  @Test
   public void testCompleteCpuMapRequestedWithoutPaddingResource() throws Exception {
     int numContainers = 2;
     int spoutParallelism = 4;