You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nw...@apache.org on 2019/02/08 23:06:10 UTC

[incubator-heron] branch master updated: init (#3191)

This is an automated email from the ASF dual-hosted git repository.

nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new c066c76  init (#3191)
c066c76 is described below

commit c066c7626d38dd31ec984a95be7dabcee32ec422
Author: Xiaoyao Qian <qi...@illinois.edu>
AuthorDate: Fri Feb 8 15:06:05 2019 -0800

    init (#3191)
---
 .../binpacking/FirstFitDecreasingPacking.java      | 19 +++++----
 .../heron/packing/builder/PackingPlanBuilder.java  | 45 +++++++++++++++-------
 .../roundrobin/ResourceCompliantRRPacking.java     | 23 +++++++----
 .../apache/heron/packing/CommonPackingTests.java   |  4 +-
 .../apache/heron/packing/PackingTestHelper.java    | 10 ++---
 .../packing/builder/PackingPlanBuilderTest.java    | 15 ++++----
 6 files changed, 74 insertions(+), 42 deletions(-)

diff --git a/heron/packing/src/java/org/apache/heron/packing/binpacking/FirstFitDecreasingPacking.java b/heron/packing/src/java/org/apache/heron/packing/binpacking/FirstFitDecreasingPacking.java
index d889abe..ab9b650 100644
--- a/heron/packing/src/java/org/apache/heron/packing/binpacking/FirstFitDecreasingPacking.java
+++ b/heron/packing/src/java/org/apache/heron/packing/binpacking/FirstFitDecreasingPacking.java
@@ -36,6 +36,9 @@ import org.apache.heron.packing.builder.HomogeneityScorer;
 import org.apache.heron.packing.builder.InstanceCountScorer;
 import org.apache.heron.packing.builder.PackingPlanBuilder;
 import org.apache.heron.packing.builder.Scorer;
+import org.apache.heron.packing.constraints.MinRamConstraint;
+import org.apache.heron.packing.constraints.ResourceConstraint;
+import org.apache.heron.packing.exceptions.ConstraintViolationException;
 import org.apache.heron.packing.exceptions.ResourceExceededException;
 import org.apache.heron.packing.utils.PackingUtils;
 import org.apache.heron.spi.common.Config;
@@ -161,7 +164,9 @@ public class FirstFitDecreasingPacking implements IPacking, IRepacking {
         .setMaxContainerResource(maxContainerResources)
         .setDefaultInstanceResource(defaultInstanceResources)
         .setRequestedContainerPadding(paddingPercentage)
-        .setRequestedComponentRam(TopologyUtils.getComponentRamMapConfig(topology));
+        .setRequestedComponentRam(TopologyUtils.getComponentRamMapConfig(topology))
+        .setInstanceConstraints(Collections.singletonList(new MinRamConstraint()))
+        .setPackingConstraints(Collections.singletonList(new ResourceConstraint()));
   }
 
   /**
@@ -176,7 +181,7 @@ public class FirstFitDecreasingPacking implements IPacking, IRepacking {
     // Get the instances using FFD allocation
     try {
       planBuilder = getFFDAllocation(planBuilder);
-    } catch (ResourceExceededException e) {
+    } catch (ConstraintViolationException e) {
       throw new PackingException("Could not allocate all instances to packing plan", e);
     }
 
@@ -193,7 +198,7 @@ public class FirstFitDecreasingPacking implements IPacking, IRepacking {
     // Get the instances using FFD allocation
     try {
       planBuilder = getFFDAllocation(planBuilder, currentPackingPlan, componentChanges);
-    } catch (ResourceExceededException e) {
+    } catch (ConstraintViolationException e) {
       throw new PackingException("Could not repack instances into existing packing plan", e);
     }
 
@@ -239,7 +244,7 @@ public class FirstFitDecreasingPacking implements IPacking, IRepacking {
    * @return Map &lt; containerId, list of InstanceId belonging to this container &gt;
    */
   private PackingPlanBuilder getFFDAllocation(PackingPlanBuilder planBuilder)
-      throws ResourceExceededException {
+      throws ConstraintViolationException {
     Map<String, Integer> parallelismMap = TopologyUtils.getComponentParallelism(topology);
     assignInstancesToContainers(planBuilder, parallelismMap);
     return planBuilder;
@@ -253,7 +258,7 @@ public class FirstFitDecreasingPacking implements IPacking, IRepacking {
   private PackingPlanBuilder getFFDAllocation(PackingPlanBuilder packingPlanBuilder,
                                               PackingPlan currentPackingPlan,
                                               Map<String, Integer> componentChanges)
-      throws ResourceExceededException {
+      throws ConstraintViolationException {
     this.numContainers = currentPackingPlan.getContainers().size();
 
     Map<String, Integer> componentsToScaleDown =
@@ -279,7 +284,7 @@ public class FirstFitDecreasingPacking implements IPacking, IRepacking {
    * @param parallelismMap component parallelism
    */
   private void assignInstancesToContainers(PackingPlanBuilder planBuilder,
-      Map<String, Integer> parallelismMap) throws ResourceExceededException {
+      Map<String, Integer> parallelismMap) throws ConstraintViolationException {
     ArrayList<RamRequirement> ramRequirements = getSortedRAMInstances(parallelismMap.keySet());
     for (RamRequirement ramRequirement : ramRequirements) {
       String componentName = ramRequirement.getComponentName();
@@ -326,7 +331,7 @@ public class FirstFitDecreasingPacking implements IPacking, IRepacking {
    *
    */
   private void placeFFDInstance(PackingPlanBuilder planBuilder,
-                                String componentName) throws ResourceExceededException {
+                                String componentName) throws ConstraintViolationException {
     if (this.numContainers == 0) {
       planBuilder.updateNumContainers(++numContainers);
     }
diff --git a/heron/packing/src/java/org/apache/heron/packing/builder/PackingPlanBuilder.java b/heron/packing/src/java/org/apache/heron/packing/builder/PackingPlanBuilder.java
index 633a0f4..64f4b8e 100644
--- a/heron/packing/src/java/org/apache/heron/packing/builder/PackingPlanBuilder.java
+++ b/heron/packing/src/java/org/apache/heron/packing/builder/PackingPlanBuilder.java
@@ -38,6 +38,9 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 
 import org.apache.heron.common.basics.ByteAmount;
+import org.apache.heron.packing.constraints.InstanceConstraint;
+import org.apache.heron.packing.constraints.PackingConstraint;
+import org.apache.heron.packing.exceptions.ConstraintViolationException;
 import org.apache.heron.packing.exceptions.ResourceExceededException;
 import org.apache.heron.packing.utils.PackingUtils;
 import org.apache.heron.spi.packing.InstanceId;
@@ -58,6 +61,8 @@ public class PackingPlanBuilder {
   private Map<String, ByteAmount> componentRamMap;
   private int requestedContainerPadding;
   private int numContainers;
+  private List<PackingConstraint> packingConstraints;
+  private List<InstanceConstraint> instanceConstraints;
 
   private Map<Integer, Container> containers;
   private TreeSet<Integer> taskIds; // globally unique ids assigned to instances
@@ -73,6 +78,8 @@ public class PackingPlanBuilder {
     this.numContainers = 0;
     this.requestedContainerPadding = 0;
     this.componentRamMap = new HashMap<>();
+    this.packingConstraints = new ArrayList<>();
+    this.instanceConstraints = new ArrayList<>();
   }
 
   // set resource settings
@@ -96,6 +103,16 @@ public class PackingPlanBuilder {
     return this;
   }
 
+  public PackingPlanBuilder setPackingConstraints(List<PackingConstraint> packingConstraintLst) {
+    this.packingConstraints = packingConstraintLst;
+    return this;
+  }
+
+  public PackingPlanBuilder setInstanceConstraints(List<InstanceConstraint> instanceConstraintLst) {
+    this.instanceConstraints = instanceConstraintLst;
+    return this;
+  }
+
   // Calling updateNumContainers will produce that many containers, starting with id 1. The build()
   // method will prune out empty containers from the plan.
   public PackingPlanBuilder updateNumContainers(int count) {
@@ -107,7 +124,7 @@ public class PackingPlanBuilder {
   // be lazily initialized, which could result in more containers than those requested using the
   // updateNumContainers method
   public PackingPlanBuilder addInstance(Integer containerId,
-                                        String componentName) throws ResourceExceededException {
+                                        String componentName) throws ConstraintViolationException {
     initContainer(containerId);
 
     Integer taskId = taskIds.isEmpty() ? 1 : taskIds.last() + 1;
@@ -120,15 +137,18 @@ public class PackingPlanBuilder {
         componentName, this.componentRamMap, this.defaultInstanceResource,
         this.maxContainerResource, this.requestedContainerPadding);
 
-    try {
-      addToContainer(containers.get(containerId),
-          new PackingPlan.InstancePlan(instanceId, instanceResource),
-          this.componentIndexes, this.taskIds);
-    } catch (ResourceExceededException e) {
-      throw new ResourceExceededException(String.format(
-          "Insufficient container resources to add instance %s with resources %s to container %d.",
-          instanceId, instanceResource, containerId), e);
+    Container container = containers.get(containerId);
+    PackingPlan.InstancePlan instancePlan
+        = new PackingPlan.InstancePlan(instanceId, instanceResource);
+
+    // Check constraints
+    for (InstanceConstraint constraint : instanceConstraints) {
+      constraint.test(instancePlan);
     }
+    for (PackingConstraint constraint : packingConstraints) {
+      constraint.test(container, instancePlan);
+    }
+    addToContainer(container, instancePlan, this.componentIndexes, this.taskIds);
 
     LOG.finest(String.format("Added to container %d instance %s", containerId, instanceId));
     return this;
@@ -161,7 +181,7 @@ public class PackingPlanBuilder {
       try {
         addInstance(container.getContainerId(), componentName);
         return container.getContainerId();
-      } catch (ResourceExceededException e) {
+      } catch (ConstraintViolationException e) {
         // ignore since we'll continue trying
       }
     }
@@ -423,9 +443,8 @@ public class PackingPlanBuilder {
     container.add(instancePlan);
     String componentName = instancePlan.getComponentName();
 
-    if (componentIndexes.get(componentName) == null) {
-      componentIndexes.put(componentName, new TreeSet<Integer>());
-    }
+    // update componentIndex and taskIds
+    componentIndexes.computeIfAbsent(componentName, k -> new TreeSet<Integer>());
     componentIndexes.get(componentName).add(instancePlan.getComponentIndex());
     taskIds.add(instancePlan.getTaskId());
   }
diff --git a/heron/packing/src/java/org/apache/heron/packing/roundrobin/ResourceCompliantRRPacking.java b/heron/packing/src/java/org/apache/heron/packing/roundrobin/ResourceCompliantRRPacking.java
index 1dcb900..df9718e 100644
--- a/heron/packing/src/java/org/apache/heron/packing/roundrobin/ResourceCompliantRRPacking.java
+++ b/heron/packing/src/java/org/apache/heron/packing/roundrobin/ResourceCompliantRRPacking.java
@@ -20,6 +20,7 @@
 package org.apache.heron.packing.roundrobin;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.logging.Logger;
@@ -33,6 +34,9 @@ import org.apache.heron.packing.builder.HomogeneityScorer;
 import org.apache.heron.packing.builder.InstanceCountScorer;
 import org.apache.heron.packing.builder.PackingPlanBuilder;
 import org.apache.heron.packing.builder.Scorer;
+import org.apache.heron.packing.constraints.MinRamConstraint;
+import org.apache.heron.packing.constraints.ResourceConstraint;
+import org.apache.heron.packing.exceptions.ConstraintViolationException;
 import org.apache.heron.packing.exceptions.ResourceExceededException;
 import org.apache.heron.packing.utils.PackingUtils;
 import org.apache.heron.spi.common.Config;
@@ -165,7 +169,9 @@ public class ResourceCompliantRRPacking implements IPacking, IRepacking {
         .setMaxContainerResource(maxContainerResources)
         .setDefaultInstanceResource(defaultInstanceResources)
         .setRequestedContainerPadding(paddingPercentage)
-        .setRequestedComponentRam(TopologyUtils.getComponentRamMapConfig(topology));
+        .setRequestedComponentRam(TopologyUtils.getComponentRamMapConfig(topology))
+        .setInstanceConstraints(Collections.singletonList(new MinRamConstraint()))
+        .setPackingConstraints(Collections.singletonList(new ResourceConstraint()));
   }
 
   @Override
@@ -179,7 +185,7 @@ public class ResourceCompliantRRPacking implements IPacking, IRepacking {
 
         return planBuilder.build();
 
-      } catch (ResourceExceededException e) {
+      } catch (ConstraintViolationException e) {
         //Not enough containers. Adjust the number of containers.
         LOG.finest(String.format(
             "%s Increasing the number of containers to %s and attempting to place again.",
@@ -216,7 +222,7 @@ public class ResourceCompliantRRPacking implements IPacking, IRepacking {
 
         return planBuilder.build();
 
-      } catch (ResourceExceededException e) {
+      } catch (ConstraintViolationException e) {
         //Not enough containers. Adjust the number of containers.
         increaseNumContainers(1);
         resetToFirstContainer();
@@ -258,7 +264,7 @@ public class ResourceCompliantRRPacking implements IPacking, IRepacking {
   }
 
   private PackingPlanBuilder getResourceCompliantRRAllocation(
-      PackingPlanBuilder planBuilder) throws ResourceExceededException {
+      PackingPlanBuilder planBuilder) throws ConstraintViolationException {
 
     Map<String, Integer> parallelismMap = TopologyUtils.getComponentParallelism(topology);
     int totalInstances = TopologyUtils.getTotalInstance(topology);
@@ -282,7 +288,7 @@ public class ResourceCompliantRRPacking implements IPacking, IRepacking {
    */
   private PackingPlanBuilder getResourceCompliantRRAllocation(
       PackingPlanBuilder planBuilder, Map<String, Integer> componentChanges)
-      throws ResourceExceededException {
+      throws ConstraintViolationException {
 
     Map<String, Integer> componentsToScaleDown =
         PackingUtils.getComponentsToScale(componentChanges, PackingUtils.ScalingDirection.DOWN);
@@ -309,7 +315,8 @@ public class ResourceCompliantRRPacking implements IPacking, IRepacking {
    */
   private void assignInstancesToContainers(PackingPlanBuilder planBuilder,
                                            Map<String, Integer> parallelismMap,
-                                           PolicyType policyType) throws ResourceExceededException {
+                                           PolicyType policyType)
+      throws ConstraintViolationException {
     for (String componentName : parallelismMap.keySet()) {
       int numInstance = parallelismMap.get(componentName);
       for (int i = 0; i < numInstance; ++i) {
@@ -326,7 +333,7 @@ public class ResourceCompliantRRPacking implements IPacking, IRepacking {
    * @throws ResourceExceededException if there is no room on the current container for the instance
    */
   private void strictRRpolicy(PackingPlanBuilder planBuilder,
-                              String componentName) throws ResourceExceededException {
+                              String componentName) throws ConstraintViolationException {
     planBuilder.addInstance(this.containerId, componentName);
     this.containerId = nextContainerId(this.containerId);
   }
@@ -383,7 +390,7 @@ public class ResourceCompliantRRPacking implements IPacking, IRepacking {
     private void assignInstance(PackingPlanBuilder planBuilder,
                                 String componentName,
                                 ResourceCompliantRRPacking packing)
-        throws ResourceExceededException, RuntimeException {
+        throws ConstraintViolationException, RuntimeException {
       switch (this) {
         case STRICT:
           packing.strictRRpolicy(planBuilder, componentName);
diff --git a/heron/packing/tests/java/org/apache/heron/packing/CommonPackingTests.java b/heron/packing/tests/java/org/apache/heron/packing/CommonPackingTests.java
index 98e16f9..97aaeec 100644
--- a/heron/packing/tests/java/org/apache/heron/packing/CommonPackingTests.java
+++ b/heron/packing/tests/java/org/apache/heron/packing/CommonPackingTests.java
@@ -29,7 +29,7 @@ import org.apache.heron.api.generated.TopologyAPI;
 import org.apache.heron.common.basics.ByteAmount;
 import org.apache.heron.common.basics.Pair;
 import org.apache.heron.common.utils.topology.TopologyTests;
-import org.apache.heron.packing.exceptions.ResourceExceededException;
+import org.apache.heron.packing.exceptions.ConstraintViolationException;
 import org.apache.heron.spi.common.Config;
 import org.apache.heron.spi.common.Context;
 import org.apache.heron.spi.packing.IPacking;
@@ -365,7 +365,7 @@ public abstract class CommonPackingTests {
   protected void doScaleDownTest(Pair<Integer, InstanceId>[] initialComponentInstances,
                                Map<String, Integer> componentChanges,
                                Pair<Integer, InstanceId>[] expectedComponentInstances)
-      throws ResourceExceededException {
+      throws ConstraintViolationException {
     String topologyId = this.topology.getId();
 
     // The padding percentage used in repack() must be <= one as used in pack(), otherwise we can't
diff --git a/heron/packing/tests/java/org/apache/heron/packing/PackingTestHelper.java b/heron/packing/tests/java/org/apache/heron/packing/PackingTestHelper.java
index 530b5d2..d4dc6fe 100644
--- a/heron/packing/tests/java/org/apache/heron/packing/PackingTestHelper.java
+++ b/heron/packing/tests/java/org/apache/heron/packing/PackingTestHelper.java
@@ -22,7 +22,7 @@ package org.apache.heron.packing;
 import org.apache.heron.common.basics.ByteAmount;
 import org.apache.heron.common.basics.Pair;
 import org.apache.heron.packing.builder.PackingPlanBuilder;
-import org.apache.heron.packing.exceptions.ResourceExceededException;
+import org.apache.heron.packing.exceptions.ConstraintViolationException;
 import org.apache.heron.spi.packing.InstanceId;
 import org.apache.heron.spi.packing.PackingPlan;
 import org.apache.heron.spi.packing.Resource;
@@ -37,7 +37,7 @@ public final class PackingTestHelper {
   public static PackingPlan createTestPackingPlan(String topologyName,
                                                   Pair<Integer, String>[] instances,
                                                   int containerPadding)
-      throws ResourceExceededException {
+      throws ConstraintViolationException {
     return generateTestPackingPlan(topologyName, null, instances, null, containerPadding);
   }
 
@@ -45,7 +45,7 @@ public final class PackingTestHelper {
                                                  PackingPlan previousPackingPlan,
                                                  Pair<Integer, String>[] instances,
                                                  int containerPadding)
-      throws ResourceExceededException {
+      throws ConstraintViolationException {
     return generateTestPackingPlan(
         topologyName, previousPackingPlan, instances, null, containerPadding);
   }
@@ -54,7 +54,7 @@ public final class PackingTestHelper {
                                                       PackingPlan previousPackingPlan,
                                                       Pair<Integer, String>[] instances,
                                                       int containerPadding)
-      throws ResourceExceededException {
+      throws ConstraintViolationException {
     return generateTestPackingPlan(
         topologyName, previousPackingPlan, null, instances, containerPadding);
   }
@@ -68,7 +68,7 @@ public final class PackingTestHelper {
                                                      Pair<Integer, String>[] addInstances,
                                                      Pair<Integer, String>[] removeInstances,
                                                      int containerPadding)
-      throws ResourceExceededException {
+      throws ConstraintViolationException {
     PackingPlanBuilder builder = new PackingPlanBuilder(topologyName, previousPackingPlan);
 
     int instanceCount = 0;
diff --git a/heron/packing/tests/java/org/apache/heron/packing/builder/PackingPlanBuilderTest.java b/heron/packing/tests/java/org/apache/heron/packing/builder/PackingPlanBuilderTest.java
index 24123ce..dfd0c85 100644
--- a/heron/packing/tests/java/org/apache/heron/packing/builder/PackingPlanBuilderTest.java
+++ b/heron/packing/tests/java/org/apache/heron/packing/builder/PackingPlanBuilderTest.java
@@ -35,6 +35,7 @@ import org.apache.heron.common.basics.ByteAmount;
 import org.apache.heron.common.basics.Pair;
 import org.apache.heron.packing.AssertPacking;
 import org.apache.heron.packing.PackingTestHelper;
+import org.apache.heron.packing.exceptions.ConstraintViolationException;
 import org.apache.heron.packing.exceptions.ResourceExceededException;
 import org.apache.heron.spi.packing.InstanceId;
 import org.apache.heron.spi.packing.PackingException;
@@ -190,12 +191,12 @@ public class PackingPlanBuilderTest {
       PackingTestHelper.toContainerIdComponentNames(testContainerInstances);
 
   @Test
-  public void testBuildPackingPlan() throws ResourceExceededException {
+  public void testBuildPackingPlan() throws ConstraintViolationException {
     doCreatePackingPlanTest(testContainerInstances);
   }
 
   @Test
-  public void testAddToPackingPlan() throws ResourceExceededException {
+  public void testAddToPackingPlan() throws ConstraintViolationException {
     PackingPlan plan = doCreatePackingPlanTest(testContainerInstances);
 
     @SuppressWarnings({"unchecked", "rawtypes"})
@@ -211,7 +212,7 @@ public class PackingPlanBuilderTest {
   }
 
   @Test(expected = ResourceExceededException.class)
-  public void testExceededCapacityAddingToPackingPlan() throws ResourceExceededException {
+  public void testExceededCapacityAddingToPackingPlan() throws ConstraintViolationException {
     PackingPlan plan = doCreatePackingPlanTest(testContainerInstances);
 
     @SuppressWarnings({"unchecked", "rawtypes"})
@@ -223,7 +224,7 @@ public class PackingPlanBuilderTest {
   }
 
   @Test
-  public void testRemoveFromPackingPlan() throws ResourceExceededException {
+  public void testRemoveFromPackingPlan() throws ConstraintViolationException {
     PackingPlan plan = doCreatePackingPlanTest(testContainerInstances);
 
     @SuppressWarnings({"unchecked", "rawtypes"})
@@ -242,7 +243,7 @@ public class PackingPlanBuilderTest {
   }
 
   @Test(expected = PackingException.class)
-  public void testInvalidContainerRemoveFromPackingPlan() throws ResourceExceededException {
+  public void testInvalidContainerRemoveFromPackingPlan() throws ConstraintViolationException {
     PackingPlan plan = doCreatePackingPlanTest(testContainerInstances);
 
     @SuppressWarnings({"unchecked", "rawtypes"})
@@ -253,7 +254,7 @@ public class PackingPlanBuilderTest {
   }
 
   @Test(expected = PackingException.class)
-  public void testInvalidComponentRemoveFromPackingPlan() throws ResourceExceededException {
+  public void testInvalidComponentRemoveFromPackingPlan() throws ConstraintViolationException {
     PackingPlan plan = doCreatePackingPlanTest(testContainerInstances);
 
     @SuppressWarnings({"unchecked", "rawtypes"})
@@ -264,7 +265,7 @@ public class PackingPlanBuilderTest {
   }
 
   private static PackingPlan doCreatePackingPlanTest(
-      Pair<Integer, InstanceId>[] instances) throws ResourceExceededException {
+      Pair<Integer, InstanceId>[] instances) throws ConstraintViolationException {
     PackingPlan plan = PackingTestHelper.createTestPackingPlan(
         TOPOLOGY_ID, PackingTestHelper.toContainerIdComponentNames(instances), 0);
     AssertPacking.assertPackingPlan(TOPOLOGY_ID, instances, plan);