You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by GitBox <gi...@apache.org> on 2019/01/11 22:10:00 UTC

[GitHub] nwangtw closed pull request #3152: Fix RoundRobinPacking repack with no specified numContainers

nwangtw closed pull request #3152: Fix RoundRobinPacking repack with no specified numContainers
URL: https://github.com/apache/incubator-heron/pull/3152
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java b/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
index 0a80440de8..1056504134 100644
--- a/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
+++ b/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
@@ -390,10 +390,18 @@ private void validatePackingPlan(PackingPlan plan) throws PackingException {
     }
   }
 
-  /*
-   * read the current packing plan with update parallelism to calculate a new packing plan
-   * the packing algorithm packInternal() is shared with pack()
+  /**
+   * Read the current packing plan with update parallelism to calculate a new packing plan.
+   * This method should determine a new number of containers based on the updated parallism
+   * while remaining the number of instances per container <= that of the old packing plan.
+   * The packing algorithm packInternal() is shared with pack()
    * delegate to packInternal() with the new container count and component parallelism
+   *
+   * @param currentPackingPlan Existing packing plan
+   * @param componentChanges Map &lt; componentName, new component parallelism &gt;
+   * that contains the parallelism for each component whose parallelism has changed.
+   * @return new packing plan
+   * @throws PackingException
    */
   @Override
   public PackingPlan repack(PackingPlan currentPackingPlan, Map<String, Integer> componentChanges)
@@ -402,11 +410,10 @@ public PackingPlan repack(PackingPlan currentPackingPlan, Map<String, Integer> c
     int initialNumInstance = TopologyUtils.getTotalInstance(topology);
     double initialNumInstancePerContainer = (double) initialNumInstance / initialNumContainer;
 
-    Map<String, Integer> currentComponentParallelism = currentPackingPlan.getComponentCounts();
     Map<String, Integer> newComponentParallelism =
         getNewComponentParallelism(currentPackingPlan, componentChanges);
 
-    int newNumInstance = TopologyUtils.getTotalInstance(currentComponentParallelism);
+    int newNumInstance = TopologyUtils.getTotalInstance(newComponentParallelism);
     int newNumContainer = (int) Math.ceil(newNumInstance / initialNumInstancePerContainer);
     return packInternal(newNumContainer, newComponentParallelism);
   }
@@ -421,6 +428,20 @@ public PackingPlan repack(PackingPlan currentPackingPlan, Map<String, Integer> c
     return currentComponentParallelism;
   }
 
+  /**
+   * Read the current packing plan with update parallelism and number of containers
+   * to calculate a new packing plan.
+   * The packing algorithm packInternal() is shared with pack()
+   * delegate to packInternal() with the new container count and component parallelism
+   *
+   * @param currentPackingPlan Existing packing plan
+   * @param containers &lt; the new number of containers for the topology
+   * specified by the user
+   * @param componentChanges Map &lt; componentName, new component parallelism &gt;
+   * that contains the parallelism for each component whose parallelism has changed.
+   * @return new packing plan
+   * @throws PackingException
+   */
   @Override
   public PackingPlan repack(PackingPlan currentPackingPlan, int containers, Map<String, Integer>
       componentChanges) throws PackingException {
diff --git a/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java b/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
index 742c6a4987..62f030ed0d 100644
--- a/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
+++ b/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
@@ -346,10 +346,10 @@ public void testEvenPacking() throws Exception {
 
 
   /**
-   * test re-packing of instances
+   * test re-packing with same total instances
    */
   @Test
-  public void testRePacking() throws Exception {
+  public void testRepackingWithSameTotalInstances() throws Exception {
     int numContainers = 2;
     int componentParallelism = 4;
 
@@ -388,6 +388,94 @@ public void testRePacking() throws Exception {
     Assert.assertEquals(componentParallelism + 1, boltCount);
   }
 
+  /**
+   * test re-packing with more total instances
+   */
+  @Test
+  public void testRepackingWithMoreTotalInstances() throws Exception {
+    int numContainers = 2;
+    int componentParallelism = 4;
+
+    // Set up the topology and its config
+    org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
+    topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
+
+    TopologyAPI.Topology topology =
+        getTopology(componentParallelism, componentParallelism, topologyConfig);
+
+    int numInstance = TopologyUtils.getTotalInstance(topology);
+    // Two components
+    Assert.assertEquals(2 * componentParallelism, numInstance);
+
+    Map<String, Integer> componentChanges = new HashMap<>();
+    componentChanges.put(SPOUT_NAME, +1);
+    componentChanges.put(BOLT_NAME,  +1);
+    PackingPlan output = getRoundRobinRePackingPlan(topology, componentChanges);
+    Assert.assertEquals(numContainers + 1, output.getContainers().size());
+    Assert.assertEquals((Integer) (numInstance + 2), output.getInstanceCount());
+
+    int spoutCount = 0;
+    int boltCount = 0;
+    for (PackingPlan.ContainerPlan container : output.getContainers()) {
+      Assert.assertTrue((double) container.getInstances().size()
+          <= (double) numInstance / numContainers);
+
+      for (PackingPlan.InstancePlan instancePlan : container.getInstances()) {
+        if (SPOUT_NAME.equals(instancePlan.getComponentName())) {
+          spoutCount++;
+        } else if (BOLT_NAME.equals(instancePlan.getComponentName())) {
+          boltCount++;
+        }
+      }
+    }
+    Assert.assertEquals(componentParallelism + 1, spoutCount);
+    Assert.assertEquals(componentParallelism + 1, boltCount);
+  }
+
+  /**
+   * test re-packing with fewer total instances
+   */
+  @Test
+  public void testRepackingWithFewerTotalInstances() throws Exception {
+    int numContainers = 2;
+    int componentParallelism = 4;
+
+    // Set up the topology and its config
+    org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
+    topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
+
+    TopologyAPI.Topology topology =
+        getTopology(componentParallelism, componentParallelism, topologyConfig);
+
+    int numInstance = TopologyUtils.getTotalInstance(topology);
+    // Two components
+    Assert.assertEquals(2 * componentParallelism, numInstance);
+
+    Map<String, Integer> componentChanges = new HashMap<>();
+    componentChanges.put(SPOUT_NAME, -2);
+    componentChanges.put(BOLT_NAME,  -2);
+    PackingPlan output = getRoundRobinRePackingPlan(topology, componentChanges);
+    Assert.assertEquals(numContainers - 1, output.getContainers().size());
+    Assert.assertEquals((Integer) (numInstance - 4), output.getInstanceCount());
+
+    int spoutCount = 0;
+    int boltCount = 0;
+    for (PackingPlan.ContainerPlan container : output.getContainers()) {
+      Assert.assertTrue((double) container.getInstances().size()
+          <= (double) numInstance / numContainers);
+
+      for (PackingPlan.InstancePlan instancePlan : container.getInstances()) {
+        if (SPOUT_NAME.equals(instancePlan.getComponentName())) {
+          spoutCount++;
+        } else if (BOLT_NAME.equals(instancePlan.getComponentName())) {
+          boltCount++;
+        }
+      }
+    }
+    Assert.assertEquals(componentParallelism - 2, spoutCount);
+    Assert.assertEquals(componentParallelism - 2, boltCount);
+  }
+
   private static void assertComponentCount(
       PackingPlan.ContainerPlan containerPlan, String componentName, int expectedCount) {
     int count = 0;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services