You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nw...@apache.org on 2019/01/11 22:10:05 UTC
[incubator-heron] branch master updated: Fix RoundRobinPacking
repack with no specified numContainers (#3152)
This is an automated email from the ASF dual-hosted git repository.
nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new 94de4fa Fix RoundRobinPacking repack with no specified numContainers (#3152)
94de4fa is described below
commit 94de4fa1b0b25eaa69ce0798177d198b8da0ef89
Author: Xiaoyao Qian <qi...@illinois.edu>
AuthorDate: Fri Jan 11 14:09:58 2019 -0800
Fix RoundRobinPacking repack with no specified numContainers (#3152)
---
.../packing/roundrobin/RoundRobinPacking.java | 31 ++++++--
.../packing/roundrobin/RoundRobinPackingTest.java | 92 +++++++++++++++++++++-
2 files changed, 116 insertions(+), 7 deletions(-)
diff --git a/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java b/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
index e4d2cf3..1eef9a6 100644
--- a/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
+++ b/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
@@ -433,10 +433,18 @@ public class RoundRobinPacking implements IPacking, IRepacking {
}
}
- /*
- * read the current packing plan with update parallelism to calculate a new packing plan
- * the packing algorithm packInternal() is shared with pack()
+ /**
+ * Read the current packing plan with update parallelism to calculate a new packing plan.
+ * This method should determine a new number of containers based on the updated parallism
+ * while remaining the number of instances per container <= that of the old packing plan.
+ * The packing algorithm packInternal() is shared with pack()
* delegate to packInternal() with the new container count and component parallelism
+ *
+ * @param currentPackingPlan Existing packing plan
+ * @param componentChanges Map < componentName, new component parallelism >
+ * that contains the parallelism for each component whose parallelism has changed.
+ * @return new packing plan
+ * @throws PackingException
*/
@Override
public PackingPlan repack(PackingPlan currentPackingPlan, Map<String, Integer> componentChanges)
@@ -445,11 +453,10 @@ public class RoundRobinPacking implements IPacking, IRepacking {
int initialNumInstance = TopologyUtils.getTotalInstance(topology);
double initialNumInstancePerContainer = (double) initialNumInstance / initialNumContainer;
- Map<String, Integer> currentComponentParallelism = currentPackingPlan.getComponentCounts();
Map<String, Integer> newComponentParallelism =
getNewComponentParallelism(currentPackingPlan, componentChanges);
- int newNumInstance = TopologyUtils.getTotalInstance(currentComponentParallelism);
+ int newNumInstance = TopologyUtils.getTotalInstance(newComponentParallelism);
int newNumContainer = (int) Math.ceil(newNumInstance / initialNumInstancePerContainer);
return packInternal(newNumContainer, newComponentParallelism);
}
@@ -464,6 +471,20 @@ public class RoundRobinPacking implements IPacking, IRepacking {
return currentComponentParallelism;
}
+ /**
+ * Read the current packing plan with update parallelism and number of containers
+ * to calculate a new packing plan.
+ * The packing algorithm packInternal() is shared with pack()
+ * delegate to packInternal() with the new container count and component parallelism
+ *
+ * @param currentPackingPlan Existing packing plan
+ * @param containers < the new number of containers for the topology
+ * specified by the user
+ * @param componentChanges Map < componentName, new component parallelism >
+ * that contains the parallelism for each component whose parallelism has changed.
+ * @return new packing plan
+ * @throws PackingException
+ */
@Override
public PackingPlan repack(PackingPlan currentPackingPlan, int containers, Map<String, Integer>
componentChanges) throws PackingException {
diff --git a/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java b/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
index 826feaa..244fa88 100644
--- a/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
+++ b/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
@@ -558,10 +558,10 @@ public class RoundRobinPackingTest {
/**
- * test re-packing of instances
+ * test re-packing with same total instances
*/
@Test
- public void testRePacking() throws Exception {
+ public void testRepackingWithSameTotalInstances() throws Exception {
int numContainers = 2;
int componentParallelism = 4;
@@ -600,6 +600,94 @@ public class RoundRobinPackingTest {
Assert.assertEquals(componentParallelism + 1, boltCount);
}
+ /**
+ * test re-packing with more total instances
+ */
+ @Test
+ public void testRepackingWithMoreTotalInstances() throws Exception {
+ int numContainers = 2;
+ int componentParallelism = 4;
+
+ // Set up the topology and its config
+ org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
+ topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
+
+ TopologyAPI.Topology topology =
+ getTopology(componentParallelism, componentParallelism, topologyConfig);
+
+ int numInstance = TopologyUtils.getTotalInstance(topology);
+ // Two components
+ Assert.assertEquals(2 * componentParallelism, numInstance);
+
+ Map<String, Integer> componentChanges = new HashMap<>();
+ componentChanges.put(SPOUT_NAME, +1);
+ componentChanges.put(BOLT_NAME, +1);
+ PackingPlan output = getRoundRobinRePackingPlan(topology, componentChanges);
+ Assert.assertEquals(numContainers + 1, output.getContainers().size());
+ Assert.assertEquals((Integer) (numInstance + 2), output.getInstanceCount());
+
+ int spoutCount = 0;
+ int boltCount = 0;
+ for (PackingPlan.ContainerPlan container : output.getContainers()) {
+ Assert.assertTrue((double) container.getInstances().size()
+ <= (double) numInstance / numContainers);
+
+ for (PackingPlan.InstancePlan instancePlan : container.getInstances()) {
+ if (SPOUT_NAME.equals(instancePlan.getComponentName())) {
+ spoutCount++;
+ } else if (BOLT_NAME.equals(instancePlan.getComponentName())) {
+ boltCount++;
+ }
+ }
+ }
+ Assert.assertEquals(componentParallelism + 1, spoutCount);
+ Assert.assertEquals(componentParallelism + 1, boltCount);
+ }
+
+ /**
+ * test re-packing with fewer total instances
+ */
+ @Test
+ public void testRepackingWithFewerTotalInstances() throws Exception {
+ int numContainers = 2;
+ int componentParallelism = 4;
+
+ // Set up the topology and its config
+ org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
+ topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
+
+ TopologyAPI.Topology topology =
+ getTopology(componentParallelism, componentParallelism, topologyConfig);
+
+ int numInstance = TopologyUtils.getTotalInstance(topology);
+ // Two components
+ Assert.assertEquals(2 * componentParallelism, numInstance);
+
+ Map<String, Integer> componentChanges = new HashMap<>();
+ componentChanges.put(SPOUT_NAME, -2);
+ componentChanges.put(BOLT_NAME, -2);
+ PackingPlan output = getRoundRobinRePackingPlan(topology, componentChanges);
+ Assert.assertEquals(numContainers - 1, output.getContainers().size());
+ Assert.assertEquals((Integer) (numInstance - 4), output.getInstanceCount());
+
+ int spoutCount = 0;
+ int boltCount = 0;
+ for (PackingPlan.ContainerPlan container : output.getContainers()) {
+ Assert.assertTrue((double) container.getInstances().size()
+ <= (double) numInstance / numContainers);
+
+ for (PackingPlan.InstancePlan instancePlan : container.getInstances()) {
+ if (SPOUT_NAME.equals(instancePlan.getComponentName())) {
+ spoutCount++;
+ } else if (BOLT_NAME.equals(instancePlan.getComponentName())) {
+ boltCount++;
+ }
+ }
+ }
+ Assert.assertEquals(componentParallelism - 2, spoutCount);
+ Assert.assertEquals(componentParallelism - 2, boltCount);
+ }
+
private static void assertComponentCount(
PackingPlan.ContainerPlan containerPlan, String componentName, int expectedCount) {
int count = 0;