You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nw...@apache.org on 2019/01/11 22:10:05 UTC

[incubator-heron] branch master updated: Fix RoundRobinPacking repack with no specified numContainers (#3152)

This is an automated email from the ASF dual-hosted git repository.

nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 94de4fa  Fix RoundRobinPacking repack with no specified numContainers (#3152)
94de4fa is described below

commit 94de4fa1b0b25eaa69ce0798177d198b8da0ef89
Author: Xiaoyao Qian <qi...@illinois.edu>
AuthorDate: Fri Jan 11 14:09:58 2019 -0800

    Fix RoundRobinPacking repack with no specified numContainers (#3152)
---
 .../packing/roundrobin/RoundRobinPacking.java      | 31 ++++++--
 .../packing/roundrobin/RoundRobinPackingTest.java  | 92 +++++++++++++++++++++-
 2 files changed, 116 insertions(+), 7 deletions(-)

diff --git a/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java b/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
index e4d2cf3..1eef9a6 100644
--- a/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
+++ b/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
@@ -433,10 +433,18 @@ public class RoundRobinPacking implements IPacking, IRepacking {
     }
   }
 
-  /*
-   * read the current packing plan with update parallelism to calculate a new packing plan
-   * the packing algorithm packInternal() is shared with pack()
+  /**
+   * Read the current packing plan with update parallelism to calculate a new packing plan.
+   * This method should determine a new number of containers based on the updated parallism
+   * while remaining the number of instances per container <= that of the old packing plan.
+   * The packing algorithm packInternal() is shared with pack()
    * delegate to packInternal() with the new container count and component parallelism
+   *
+   * @param currentPackingPlan Existing packing plan
+   * @param componentChanges Map &lt; componentName, new component parallelism &gt;
+   * that contains the parallelism for each component whose parallelism has changed.
+   * @return new packing plan
+   * @throws PackingException
    */
   @Override
   public PackingPlan repack(PackingPlan currentPackingPlan, Map<String, Integer> componentChanges)
@@ -445,11 +453,10 @@ public class RoundRobinPacking implements IPacking, IRepacking {
     int initialNumInstance = TopologyUtils.getTotalInstance(topology);
     double initialNumInstancePerContainer = (double) initialNumInstance / initialNumContainer;
 
-    Map<String, Integer> currentComponentParallelism = currentPackingPlan.getComponentCounts();
     Map<String, Integer> newComponentParallelism =
         getNewComponentParallelism(currentPackingPlan, componentChanges);
 
-    int newNumInstance = TopologyUtils.getTotalInstance(currentComponentParallelism);
+    int newNumInstance = TopologyUtils.getTotalInstance(newComponentParallelism);
     int newNumContainer = (int) Math.ceil(newNumInstance / initialNumInstancePerContainer);
     return packInternal(newNumContainer, newComponentParallelism);
   }
@@ -464,6 +471,20 @@ public class RoundRobinPacking implements IPacking, IRepacking {
     return currentComponentParallelism;
   }
 
+  /**
+   * Read the current packing plan with update parallelism and number of containers
+   * to calculate a new packing plan.
+   * The packing algorithm packInternal() is shared with pack()
+   * delegate to packInternal() with the new container count and component parallelism
+   *
+   * @param currentPackingPlan Existing packing plan
+   * @param containers &lt; the new number of containers for the topology
+   * specified by the user
+   * @param componentChanges Map &lt; componentName, new component parallelism &gt;
+   * that contains the parallelism for each component whose parallelism has changed.
+   * @return new packing plan
+   * @throws PackingException
+   */
   @Override
   public PackingPlan repack(PackingPlan currentPackingPlan, int containers, Map<String, Integer>
       componentChanges) throws PackingException {
diff --git a/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java b/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
index 826feaa..244fa88 100644
--- a/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
+++ b/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
@@ -558,10 +558,10 @@ public class RoundRobinPackingTest {
 
 
   /**
-   * test re-packing of instances
+   * test re-packing with same total instances
    */
   @Test
-  public void testRePacking() throws Exception {
+  public void testRepackingWithSameTotalInstances() throws Exception {
     int numContainers = 2;
     int componentParallelism = 4;
 
@@ -600,6 +600,94 @@ public class RoundRobinPackingTest {
     Assert.assertEquals(componentParallelism + 1, boltCount);
   }
 
+  /**
+   * test re-packing with more total instances
+   */
+  @Test
+  public void testRepackingWithMoreTotalInstances() throws Exception {
+    int numContainers = 2;
+    int componentParallelism = 4;
+
+    // Set up the topology and its config
+    org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
+    topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
+
+    TopologyAPI.Topology topology =
+        getTopology(componentParallelism, componentParallelism, topologyConfig);
+
+    int numInstance = TopologyUtils.getTotalInstance(topology);
+    // Two components
+    Assert.assertEquals(2 * componentParallelism, numInstance);
+
+    Map<String, Integer> componentChanges = new HashMap<>();
+    componentChanges.put(SPOUT_NAME, +1);
+    componentChanges.put(BOLT_NAME,  +1);
+    PackingPlan output = getRoundRobinRePackingPlan(topology, componentChanges);
+    Assert.assertEquals(numContainers + 1, output.getContainers().size());
+    Assert.assertEquals((Integer) (numInstance + 2), output.getInstanceCount());
+
+    int spoutCount = 0;
+    int boltCount = 0;
+    for (PackingPlan.ContainerPlan container : output.getContainers()) {
+      Assert.assertTrue((double) container.getInstances().size()
+          <= (double) numInstance / numContainers);
+
+      for (PackingPlan.InstancePlan instancePlan : container.getInstances()) {
+        if (SPOUT_NAME.equals(instancePlan.getComponentName())) {
+          spoutCount++;
+        } else if (BOLT_NAME.equals(instancePlan.getComponentName())) {
+          boltCount++;
+        }
+      }
+    }
+    Assert.assertEquals(componentParallelism + 1, spoutCount);
+    Assert.assertEquals(componentParallelism + 1, boltCount);
+  }
+
+  /**
+   * test re-packing with fewer total instances
+   */
+  @Test
+  public void testRepackingWithFewerTotalInstances() throws Exception {
+    int numContainers = 2;
+    int componentParallelism = 4;
+
+    // Set up the topology and its config
+    org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
+    topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
+
+    TopologyAPI.Topology topology =
+        getTopology(componentParallelism, componentParallelism, topologyConfig);
+
+    int numInstance = TopologyUtils.getTotalInstance(topology);
+    // Two components
+    Assert.assertEquals(2 * componentParallelism, numInstance);
+
+    Map<String, Integer> componentChanges = new HashMap<>();
+    componentChanges.put(SPOUT_NAME, -2);
+    componentChanges.put(BOLT_NAME,  -2);
+    PackingPlan output = getRoundRobinRePackingPlan(topology, componentChanges);
+    Assert.assertEquals(numContainers - 1, output.getContainers().size());
+    Assert.assertEquals((Integer) (numInstance - 4), output.getInstanceCount());
+
+    int spoutCount = 0;
+    int boltCount = 0;
+    for (PackingPlan.ContainerPlan container : output.getContainers()) {
+      Assert.assertTrue((double) container.getInstances().size()
+          <= (double) numInstance / numContainers);
+
+      for (PackingPlan.InstancePlan instancePlan : container.getInstances()) {
+        if (SPOUT_NAME.equals(instancePlan.getComponentName())) {
+          spoutCount++;
+        } else if (BOLT_NAME.equals(instancePlan.getComponentName())) {
+          boltCount++;
+        }
+      }
+    }
+    Assert.assertEquals(componentParallelism - 2, spoutCount);
+    Assert.assertEquals(componentParallelism - 2, boltCount);
+  }
+
   private static void assertComponentCount(
       PackingPlan.ContainerPlan containerPlan, String componentName, int expectedCount) {
     int count = 0;