You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nw...@apache.org on 2019/02/08 18:11:17 UTC
[incubator-heron] branch master updated: Revamp Packing tests
(#3188)
This is an automated email from the ASF dual-hosted git repository.
nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new fe39a46 Revamp Packing tests (#3188)
fe39a46 is described below
commit fe39a460bb2714955ce5c957d16093ffd53a1ae4
Author: Xiaoyao Qian <qi...@illinois.edu>
AuthorDate: Fri Feb 8 10:11:11 2019 -0800
Revamp Packing tests (#3188)
* Consolidate packing tests
* remove code duplicates
---
.../packing/roundrobin/RoundRobinPacking.java | 89 +--
.../org/apache/heron/packing/AssertPacking.java | 93 ++-
.../apache/heron/packing/CommonPackingTests.java | 419 +++++++------
.../binpacking/FirstFitDecreasingPackingTest.java | 192 ++++++
.../roundrobin/ResourceCompliantRRPackingTest.java | 192 ++++++
.../packing/roundrobin/RoundRobinPackingTest.java | 667 +++++++--------------
.../org/apache/heron/spi/packing/Resource.java | 8 +
7 files changed, 907 insertions(+), 753 deletions(-)
diff --git a/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java b/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
index 84bc667..e3508a6 100644
--- a/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
+++ b/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
@@ -46,6 +46,10 @@ import org.apache.heron.spi.packing.PackingException;
import org.apache.heron.spi.packing.PackingPlan;
import org.apache.heron.spi.packing.Resource;
+import static org.apache.heron.api.Config.TOPOLOGY_CONTAINER_CPU_REQUESTED;
+import static org.apache.heron.api.Config.TOPOLOGY_CONTAINER_DISK_REQUESTED;
+import static org.apache.heron.api.Config.TOPOLOGY_CONTAINER_RAM_REQUESTED;
+
/**
* Round-robin packing algorithm
* <p>
@@ -143,16 +147,14 @@ public class RoundRobinPacking implements IPacking, IRepacking {
Map<Integer, List<InstanceId>> roundRobinAllocation =
getRoundRobinAllocation(numContainer, parallelismMap);
- ByteAmount containerDiskInBytes = getContainerDiskHint(roundRobinAllocation);
- double containerCpuHint = getContainerCpuHint(roundRobinAllocation);
- ByteAmount containerRamHint = getContainerRamHint(roundRobinAllocation);
+ Resource containerResourceHint = getContainerResourceHint(roundRobinAllocation);
// Get the RAM map for every instance
Map<Integer, Map<InstanceId, ByteAmount>> instancesRamMap =
calculateInstancesResourceMapInContainer(
roundRobinAllocation,
TopologyUtils.getComponentRamMapConfig(topology),
- containerRamHint,
+ containerResourceHint.getRam(),
instanceRamDefault,
containerRamPadding,
ByteAmount.ZERO,
@@ -164,7 +166,7 @@ public class RoundRobinPacking implements IPacking, IRepacking {
calculateInstancesResourceMapInContainer(
roundRobinAllocation,
CPUShare.convertDoubleMapToCpuShareMap(TopologyUtils.getComponentCpuMapConfig(topology)),
- CPUShare.fromDouble(containerCpuHint),
+ CPUShare.fromDouble(containerResourceHint.getCpu()),
CPUShare.fromDouble(instanceCpuDefault),
CPUShare.fromDouble(containerCpuPadding),
CPUShare.fromDouble(0.0),
@@ -172,9 +174,9 @@ public class RoundRobinPacking implements IPacking, IRepacking {
CPU);
LOG.info(String.format("Pack internal: container CPU hint: %.3f, RAM hint: %s, disk hint: %s.",
- containerCpuHint,
- containerRamHint.toString(),
- containerDiskInBytes.toString()));
+ containerResourceHint.getCpu(),
+ containerResourceHint.getRam().toString(),
+ containerResourceHint.getDisk().toString()));
// Construct the PackingPlan
Set<PackingPlan.ContainerPlan> containerPlans = new HashSet<>();
@@ -202,21 +204,22 @@ public class RoundRobinPacking implements IPacking, IRepacking {
}
// finalize container resource
- if (!containerRamHint.equals(NOT_SPECIFIED_BYTE_AMOUNT)) {
+ if (!containerResourceHint.getRam().equals(NOT_SPECIFIED_BYTE_AMOUNT)) {
containerRam = ByteAmount.fromBytes(
- Math.min(containerRam.plus(containerRamPadding).asBytes(), containerRamHint.asBytes()));
+ Math.min(containerRam.plus(containerRamPadding).asBytes(),
+ containerResourceHint.getRam().asBytes()));
} else {
containerRam = containerRam.plus(containerRamPadding);
}
- if (containerCpuHint != NOT_SPECIFIED_CPU_SHARE) {
- containerCpu = Math.min(containerCpu + containerCpuPadding, containerCpuHint);
+ if (containerResourceHint.getCpu() != NOT_SPECIFIED_CPU_SHARE) {
+ containerCpu = Math.min(containerCpu + containerCpuPadding, containerResourceHint.getCpu());
} else {
containerCpu += containerCpuPadding;
}
- Resource resource = new Resource(Math.max(containerCpu, containerCpuHint),
- containerRam, containerDiskInBytes);
+ Resource resource = new Resource(Math.max(containerCpu, containerResourceHint.getCpu()),
+ containerRam, containerResourceHint.getDisk());
PackingPlan.ContainerPlan containerPlan = new PackingPlan.ContainerPlan(
containerId, new HashSet<>(instancePlanMap.values()), resource);
@@ -420,54 +423,18 @@ public class RoundRobinPacking implements IPacking, IRepacking {
return max;
}
- /**
- * Provide CPU per container.
- *
- * @param allocation packing output.
- * @return CPU per container.
- */
- private double getContainerCpuHint(Map<Integer, List<InstanceId>> allocation) {
- List<TopologyAPI.Config.KeyValue> topologyConfig = topology.getTopologyConfig().getKvsList();
- double defaultContainerCpu =
- DEFAULT_CPU_PADDING_PER_CONTAINER + getLargestContainerSize(allocation);
-
- String cpuHint = TopologyUtils.getConfigWithDefault(
- topologyConfig, org.apache.heron.api.Config.TOPOLOGY_CONTAINER_CPU_REQUESTED,
- Double.toString(defaultContainerCpu));
-
- return Double.parseDouble(cpuHint);
- }
-
- /**
- * Provide disk per container.
- *
- * @param allocation packing output.
- * @return disk per container.
- */
- private ByteAmount getContainerDiskHint(Map<Integer, List<InstanceId>> allocation) {
- ByteAmount defaultContainerDisk = instanceDiskDefault
- .multiply(getLargestContainerSize(allocation))
- .plus(DEFAULT_DISK_PADDING_PER_CONTAINER);
-
- List<TopologyAPI.Config.KeyValue> topologyConfig = topology.getTopologyConfig().getKvsList();
-
- return TopologyUtils.getConfigWithDefault(topologyConfig,
- org.apache.heron.api.Config.TOPOLOGY_CONTAINER_DISK_REQUESTED,
- defaultContainerDisk);
- }
-
- /**
- * Provide RAM per container.
- *
- * @param allocation packing
- * @return Container RAM requirement
- */
- private ByteAmount getContainerRamHint(Map<Integer, List<InstanceId>> allocation) {
+ private Resource getContainerResourceHint(Map<Integer, List<InstanceId>> allocation) {
List<TopologyAPI.Config.KeyValue> topologyConfig = topology.getTopologyConfig().getKvsList();
-
- return TopologyUtils.getConfigWithDefault(
- topologyConfig, org.apache.heron.api.Config.TOPOLOGY_CONTAINER_RAM_REQUESTED,
- NOT_SPECIFIED_BYTE_AMOUNT);
+ int largestContainerSize = getLargestContainerSize(allocation);
+
+ return new Resource(
+ TopologyUtils.getConfigWithDefault(topologyConfig, TOPOLOGY_CONTAINER_CPU_REQUESTED,
+ (double) Math.round(instanceCpuDefault * largestContainerSize + containerCpuPadding)),
+ TopologyUtils.getConfigWithDefault(topologyConfig, TOPOLOGY_CONTAINER_RAM_REQUESTED,
+ instanceRamDefault.multiply(largestContainerSize).plus(containerRamPadding)),
+ TopologyUtils.getConfigWithDefault(topologyConfig, TOPOLOGY_CONTAINER_DISK_REQUESTED,
+ instanceDiskDefault.multiply(largestContainerSize)
+ .plus(DEFAULT_DISK_PADDING_PER_CONTAINER)));
}
/**
diff --git a/heron/packing/tests/java/org/apache/heron/packing/AssertPacking.java b/heron/packing/tests/java/org/apache/heron/packing/AssertPacking.java
index 356ff1a..3858bfe 100644
--- a/heron/packing/tests/java/org/apache/heron/packing/AssertPacking.java
+++ b/heron/packing/tests/java/org/apache/heron/packing/AssertPacking.java
@@ -42,7 +42,7 @@ import static org.junit.Assert.assertTrue;
*/
public final class AssertPacking {
- private static final double DELTA = 0.1;
+ public static final double DELTA = 0.1;
private AssertPacking() { }
@@ -88,42 +88,63 @@ public final class AssertPacking {
}
/**
+ * Verifies that the containerPlan has at least one bolt named boltName with RAM equal to
+ * expectedBoltRam and likewise for spouts. If notExpectedContainerRam is not null, verifies that
+ * the container RAM is not that.
+ */
+ public static void assertInstanceRam(Set<PackingPlan.ContainerPlan> containerPlans,
+ String boltName, String spoutName,
+ ByteAmount expectedBoltRam, ByteAmount expectedSpoutRam) {
+ // RAM for bolt should be the value in component RAM map
+ for (PackingPlan.ContainerPlan containerPlan : containerPlans) {
+ for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
+ if (instancePlan.getComponentName().equals(boltName)) {
+ assertEquals("Unexpected bolt RAM",
+ expectedBoltRam, instancePlan.getResource().getRam());
+ }
+ if (instancePlan.getComponentName().equals(spoutName)) {
+ assertEquals("Unexpected spout RAM",
+ expectedSpoutRam, instancePlan.getResource().getRam());
+ }
+ }
+ }
+ }
+
+ /**
* Verifies that the containerPlan has at least one bolt named boltName with CPU equal to
* expectedBoltCpu and likewise for spouts. If notExpectedContainerCpu is not null, verifies that
* the container CPU is not that.
*/
- public static void assertContainers(Set<PackingPlan.ContainerPlan> containerPlans,
- String boltName, String spoutName,
- Double expectedBoltCpu, Double expectedSpoutCpu,
- Double notExpectedContainerCpu) {
- boolean boltFound = false;
- boolean spoutFound = false;
- List<Integer> expectedInstanceIndices = new ArrayList<>();
- List<Integer> foundInstanceIndices = new ArrayList<>();
- int expectedInstanceIndex = 1;
+ public static void assertInstanceCpu(Set<PackingPlan.ContainerPlan> containerPlans,
+ String boltName, String spoutName,
+ Double expectedBoltCpu, Double expectedSpoutCpu) {
// CPU for bolt should be the value in component CPU map
for (PackingPlan.ContainerPlan containerPlan : containerPlans) {
- if (notExpectedContainerCpu != null) {
- assertNotEquals(notExpectedContainerCpu, containerPlan.getRequiredResource().getCpu());
- }
for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
- expectedInstanceIndices.add(expectedInstanceIndex++);
- foundInstanceIndices.add(instancePlan.getTaskId());
if (instancePlan.getComponentName().equals(boltName)) {
assertEquals("Unexpected bolt CPU",
- expectedBoltCpu.doubleValue(), instancePlan.getResource().getCpu(), DELTA);
- boltFound = true;
+ expectedBoltCpu, instancePlan.getResource().getCpu(), DELTA);
}
if (instancePlan.getComponentName().equals(spoutName)) {
- assertEquals(
- "Unexpected spout CPU",
- expectedSpoutCpu.doubleValue(), instancePlan.getResource().getCpu(), DELTA);
- spoutFound = true;
+ assertEquals("Unexpected spout CPU",
+ expectedSpoutCpu, instancePlan.getResource().getCpu(), DELTA);
}
}
}
- assertTrue("Bolt not found in any of the container plans: " + boltName, boltFound);
- assertTrue("Spout not found in any of the container plans: " + spoutName, spoutFound);
+ }
+
+ public static void assertInstanceIndices(Set<PackingPlan.ContainerPlan> containerPlans,
+ String boltName, String spoutName) {
+ List<Integer> expectedInstanceIndices = new ArrayList<>();
+ List<Integer> foundInstanceIndices = new ArrayList<>();
+ int expectedInstanceIndex = 1;
+ // CPU for bolt should be the value in component CPU map
+ for (PackingPlan.ContainerPlan containerPlan : containerPlans) {
+ for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
+ expectedInstanceIndices.add(expectedInstanceIndex++);
+ foundInstanceIndices.add(instancePlan.getTaskId());
+ }
+ }
Collections.sort(foundInstanceIndices);
assertEquals("Unexpected instance global id set found.",
@@ -135,14 +156,10 @@ public final class AssertPacking {
*/
public static void assertNumInstances(Set<PackingPlan.ContainerPlan> containerPlans,
String component, int numInstances) {
- int instancesFound = 0;
- for (PackingPlan.ContainerPlan containerPlan : containerPlans) {
- for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
- if (instancePlan.getComponentName().equals(component)) {
- instancesFound++;
- }
- }
- }
+ int instancesFound = (int) containerPlans.stream()
+ .flatMap(containerPlan -> containerPlan.getInstances().stream())
+ .filter(instancePlan -> instancePlan.getComponentName().equals(component))
+ .count();
assertEquals(numInstances, instancesFound);
}
@@ -160,6 +177,20 @@ public final class AssertPacking {
}
}
+ /**
+ * Verifies that the CPU allocated for every container in a packing plan is less than a given
+ * maximum value.
+ */
+ public static void assertContainerCpu(Set<PackingPlan.ContainerPlan> containerPlans,
+ double maxCpuforResources) {
+ for (PackingPlan.ContainerPlan containerPlan : containerPlans) {
+ assertTrue(String.format("Container with id %d requires more CPU (%.3f) than"
+ + " the maximum CPU allowed (%.3f)", containerPlan.getId(),
+ containerPlan.getRequiredResource().getCpu(), maxCpuforResources),
+ containerPlan.getRequiredResource().getCpu() <= maxCpuforResources);
+ }
+ }
+
public static void assertPackingPlan(String expectedTopologyName,
Pair<Integer, InstanceId>[] expectedComponentInstances,
PackingPlan plan) {
diff --git a/heron/packing/tests/java/org/apache/heron/packing/CommonPackingTests.java b/heron/packing/tests/java/org/apache/heron/packing/CommonPackingTests.java
index b74d0ef..98e16f9 100644
--- a/heron/packing/tests/java/org/apache/heron/packing/CommonPackingTests.java
+++ b/heron/packing/tests/java/org/apache/heron/packing/CommonPackingTests.java
@@ -19,12 +19,11 @@
package org.apache.heron.packing;
-import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Test;
import org.apache.heron.api.generated.TopologyAPI;
import org.apache.heron.common.basics.ByteAmount;
@@ -36,17 +35,18 @@ import org.apache.heron.spi.common.Context;
import org.apache.heron.spi.packing.IPacking;
import org.apache.heron.spi.packing.IRepacking;
import org.apache.heron.spi.packing.InstanceId;
-import org.apache.heron.spi.packing.PackingException;
import org.apache.heron.spi.packing.PackingPlan;
import org.apache.heron.spi.packing.Resource;
import org.apache.heron.spi.utils.PackingTestUtils;
+import static org.apache.heron.packing.AssertPacking.DELTA;
+
/**
* There is some common functionality in multiple packing plans. This class contains common tests.
*/
public abstract class CommonPackingTests {
- private static final String A = "A";
- private static final String B = "B";
+ protected static final String A = "A";
+ protected static final String B = "B";
protected static final String BOLT_NAME = "bolt";
protected static final String SPOUT_NAME = "spout";
@@ -57,6 +57,7 @@ public abstract class CommonPackingTests {
protected org.apache.heron.api.Config topologyConfig;
protected TopologyAPI.Topology topology;
protected Resource instanceDefaultResources;
+ protected int numContainers;
protected abstract IPacking getPackingImpl();
protected abstract IRepacking getRepackingImpl();
@@ -92,7 +93,7 @@ public abstract class CommonPackingTests {
return packing.pack();
}
- private PackingPlan repack(TopologyAPI.Topology testTopology,
+ protected PackingPlan repack(TopologyAPI.Topology testTopology,
PackingPlan initialPackingPlan,
Map<String, Integer> componentChanges) {
IRepacking repacking = getRepackingImpl();
@@ -100,6 +101,15 @@ public abstract class CommonPackingTests {
return repacking.repack(initialPackingPlan, componentChanges);
}
+ protected Resource getDefaultUnspecifiedContainerResource(int testNumInstances,
+ int testNumContainers,
+ Resource padding) {
+ int largestContainerSize = (int) Math.ceil((double) testNumInstances / testNumContainers);
+ return new Resource(largestContainerSize + padding.getCpu(),
+ ByteAmount.fromGigabytes(largestContainerSize).plus(padding.getRam()),
+ ByteAmount.fromGigabytes(largestContainerSize).plus(padding.getDisk()));
+ }
+
protected PackingPlan doDefaultScalingTest(Map<String, Integer> componentChanges,
int numContainersBeforeRepack) {
return doScalingTest(topology, componentChanges,
@@ -108,6 +118,117 @@ public abstract class CommonPackingTests {
numContainersBeforeRepack, totalInstances);
}
+ protected PackingPlan doPackingTest(TopologyAPI.Topology testTopology,
+ Resource boltRes, int testBoltParallelism,
+ Resource spoutRes, int testSpoutParallelism,
+ int testNumContainers,
+ Resource maxContainerResource) {
+ PackingPlan packingPlan = pack(testTopology);
+
+ Assert.assertEquals(testNumContainers, packingPlan.getContainers().size());
+ Assert.assertEquals(testBoltParallelism + testSpoutParallelism,
+ (int) packingPlan.getInstanceCount());
+ AssertPacking.assertNumInstances(packingPlan.getContainers(), BOLT_NAME, testBoltParallelism);
+ AssertPacking.assertNumInstances(packingPlan.getContainers(), SPOUT_NAME, testSpoutParallelism);
+
+ AssertPacking.assertInstanceRam(packingPlan.getContainers(), BOLT_NAME, SPOUT_NAME,
+ boltRes.getRam(), spoutRes.getRam());
+ AssertPacking.assertInstanceCpu(packingPlan.getContainers(), BOLT_NAME, SPOUT_NAME,
+ boltRes.getCpu(), spoutRes.getCpu());
+ AssertPacking.assertInstanceIndices(packingPlan.getContainers(), BOLT_NAME, SPOUT_NAME);
+
+ AssertPacking.assertContainerRam(packingPlan.getContainers(), maxContainerResource.getRam());
+ AssertPacking.assertContainerCpu(packingPlan.getContainers(), maxContainerResource.getCpu());
+
+ return packingPlan;
+ }
+
+ protected PackingPlan doPackingTestWithPartialResource(TopologyAPI.Topology testTopology,
+ Optional<ByteAmount> boltRam,
+ Optional<Double> boltCpu,
+ int testBoltParallelism,
+ Optional<ByteAmount> spoutRam,
+ Optional<Double> spoutCpu,
+ int testSpoutParallelism,
+ int testNumContainers,
+ Resource padding,
+ Resource maxContainerResource) {
+ PackingPlan packingPlan = pack(testTopology);
+ Assert.assertEquals(testNumContainers, packingPlan.getContainers().size());
+ Assert.assertEquals(testBoltParallelism + testSpoutParallelism,
+ (int) packingPlan.getInstanceCount());
+ AssertPacking.assertNumInstances(packingPlan.getContainers(), BOLT_NAME, testBoltParallelism);
+ AssertPacking.assertNumInstances(packingPlan.getContainers(), SPOUT_NAME, testSpoutParallelism);
+
+ for (PackingPlan.ContainerPlan containerPlan : packingPlan.getContainers()) {
+ int instancesCount = containerPlan.getInstances().size();
+
+ if (!boltRam.isPresent() && !spoutRam.isPresent()) {
+ ByteAmount instanceRam = maxContainerResource.getRam()
+ .minus(padding.getRam()).divide(instancesCount);
+ for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
+ Assert.assertEquals(instanceRam, instancePlan.getResource().getRam());
+ }
+ } else if (!boltRam.isPresent() || !spoutRam.isPresent()) {
+ String explicitComponent = boltRam.isPresent() ? BOLT_NAME : SPOUT_NAME;
+ String implicitComponent = boltRam.isPresent() ? SPOUT_NAME : BOLT_NAME;
+ ByteAmount explicitRam = boltRam.orElseGet(spoutRam::get);
+ int explicitCount = 0;
+ for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
+ if (instancePlan.getComponentName().equals(explicitComponent)) {
+ Assert.assertEquals(explicitRam, instancePlan.getResource().getRam());
+ explicitCount++;
+ }
+ }
+ int implicitCount = instancesCount - explicitCount;
+ for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
+ if (instancePlan.getComponentName().equals(implicitComponent)) {
+ Assert.assertEquals(
+ maxContainerResource.getRam()
+ .minus(explicitRam.multiply(explicitCount))
+ .minus(padding.getRam())
+ .divide(implicitCount),
+ instancePlan.getResource().getRam());
+ }
+ }
+ }
+
+ if (!boltCpu.isPresent() && !spoutCpu.isPresent()) {
+ double instanceCpu = (maxContainerResource.getCpu() - padding.getCpu()) / instancesCount;
+ for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
+ Assert.assertEquals(instanceCpu, instancePlan.getResource().getCpu(), DELTA);
+ }
+ } else if (!boltCpu.isPresent() || !spoutCpu.isPresent()) {
+ String explicitComponent = boltCpu.isPresent() ? BOLT_NAME : SPOUT_NAME;
+ String implicitComponent = boltCpu.isPresent() ? SPOUT_NAME : BOLT_NAME;
+ double explicitCpu = boltCpu.orElseGet(spoutCpu::get);
+ int explicitCount = 0;
+ for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
+ if (instancePlan.getComponentName().equals(explicitComponent)) {
+ Assert.assertEquals(explicitCpu, instancePlan.getResource().getCpu(), DELTA);
+ explicitCount++;
+ }
+ }
+ int implicitCount = instancesCount - explicitCount;
+ for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
+ if (instancePlan.getComponentName().equals(implicitComponent)) {
+ Assert.assertEquals(
+ (maxContainerResource.getCpu()
+ - explicitCpu * explicitCount
+ - padding.getCpu()) / implicitCount,
+ instancePlan.getResource().getCpu(), DELTA);
+ }
+ }
+ }
+ }
+
+ AssertPacking.assertInstanceIndices(packingPlan.getContainers(), BOLT_NAME, SPOUT_NAME);
+ AssertPacking.assertContainerRam(packingPlan.getContainers(), maxContainerResource.getRam());
+ AssertPacking.assertContainerCpu(packingPlan.getContainers(), maxContainerResource.getCpu());
+
+ return packingPlan;
+ }
+
/**
* Performs a scaling test for a specific topology. It first
* computes an initial packing plan as a basis for scaling.
@@ -147,7 +268,101 @@ public abstract class CommonPackingTests {
return newPackingPlan;
}
- private void doScaleDownTest(Pair<Integer, InstanceId>[] initialComponentInstances,
+ protected PackingPlan doScalingTestWithPartialResource(TopologyAPI.Topology testTopology,
+ PackingPlan packingPlan,
+ Map<String, Integer> componentChanges,
+ Optional<ByteAmount> boltRam,
+ Optional<Double> boltCpu,
+ int testBoltParallelism,
+ Optional<ByteAmount> spoutRam,
+ Optional<Double> spoutCpu,
+ int testSpoutParallelism,
+ int testNumContainers,
+ Resource padding,
+ Resource maxContainerResource) {
+ System.out.println(packingPlan);
+ PackingPlan newPackingPlan = repack(testTopology, packingPlan, componentChanges);
+ System.out.println(newPackingPlan);
+ Assert.assertEquals(testNumContainers, newPackingPlan.getContainers().size());
+ Assert.assertEquals(testBoltParallelism + testSpoutParallelism
+ + componentChanges.getOrDefault(BOLT_NAME, 0)
+ + componentChanges.getOrDefault(SPOUT_NAME, 0),
+ (int) newPackingPlan.getInstanceCount());
+ AssertPacking.assertNumInstances(newPackingPlan.getContainers(), BOLT_NAME,
+ testBoltParallelism + componentChanges.getOrDefault(BOLT_NAME, 0));
+ AssertPacking.assertNumInstances(newPackingPlan.getContainers(), SPOUT_NAME,
+ testSpoutParallelism + componentChanges.getOrDefault(SPOUT_NAME, 0));
+
+ for (PackingPlan.ContainerPlan containerPlan : newPackingPlan.getContainers()) {
+ int instancesCount = containerPlan.getInstances().size();
+
+ if (!boltRam.isPresent() && !spoutRam.isPresent()) {
+ ByteAmount instanceRam = maxContainerResource.getRam()
+ .minus(padding.getRam()).divide(instancesCount);
+ for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
+ Assert.assertEquals(instanceRam, instancePlan.getResource().getRam());
+ }
+ } else if (!boltRam.isPresent() || !spoutRam.isPresent()) {
+ String explicitComponent = boltRam.isPresent() ? BOLT_NAME : SPOUT_NAME;
+ String implicitComponent = boltRam.isPresent() ? SPOUT_NAME : BOLT_NAME;
+ ByteAmount explicitRam = boltRam.orElseGet(spoutRam::get);
+ int explicitCount = 0;
+ for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
+ if (instancePlan.getComponentName().equals(explicitComponent)) {
+ Assert.assertEquals(explicitRam, instancePlan.getResource().getRam());
+ explicitCount++;
+ }
+ }
+ int implicitCount = instancesCount - explicitCount;
+ for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
+ if (instancePlan.getComponentName().equals(implicitComponent)) {
+ Assert.assertEquals(
+ maxContainerResource.getRam()
+ .minus(explicitRam.multiply(explicitCount))
+ .minus(padding.getRam())
+ .divide(implicitCount),
+ instancePlan.getResource().getRam());
+ }
+ }
+ }
+
+ if (!boltCpu.isPresent() && !spoutCpu.isPresent()) {
+ double instanceCpu = (maxContainerResource.getCpu() - padding.getCpu()) / instancesCount;
+ for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
+ Assert.assertEquals(instanceCpu, instancePlan.getResource().getCpu(), DELTA);
+ }
+ } else if (!boltCpu.isPresent() || !spoutCpu.isPresent()) {
+ String explicitComponent = boltCpu.isPresent() ? BOLT_NAME : SPOUT_NAME;
+ String implicitComponent = boltCpu.isPresent() ? SPOUT_NAME : BOLT_NAME;
+ double explicitCpu = boltCpu.orElseGet(spoutCpu::get);
+ int explicitCount = 0;
+ for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
+ if (instancePlan.getComponentName().equals(explicitComponent)) {
+ Assert.assertEquals(explicitCpu, instancePlan.getResource().getCpu(), DELTA);
+ explicitCount++;
+ }
+ }
+ int implicitCount = instancesCount - explicitCount;
+ for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
+ if (instancePlan.getComponentName().equals(implicitComponent)) {
+ Assert.assertEquals(
+ (maxContainerResource.getCpu()
+ - explicitCpu * explicitCount
+ - padding.getCpu()) / implicitCount,
+ instancePlan.getResource().getCpu(), DELTA);
+ }
+ }
+ }
+ }
+
+ AssertPacking.assertInstanceIndices(newPackingPlan.getContainers(), BOLT_NAME, SPOUT_NAME);
+ AssertPacking.assertContainerRam(newPackingPlan.getContainers(), maxContainerResource.getRam());
+ AssertPacking.assertContainerCpu(newPackingPlan.getContainers(), maxContainerResource.getCpu());
+
+ return newPackingPlan;
+ }
+
+ protected void doScaleDownTest(Pair<Integer, InstanceId>[] initialComponentInstances,
Map<String, Integer> componentChanges,
Pair<Integer, InstanceId>[] expectedComponentInstances)
throws ResourceExceededException {
@@ -164,194 +379,4 @@ public abstract class CommonPackingTests {
AssertPacking.assertPackingPlan(topologyId, expectedComponentInstances, newPackingPlan);
}
-
- /**
- * Test the scenario where scaling down removes instances from containers that are most imbalanced
- * (i.e., tending towards homogeneity) first. If there is a tie (e.g. AABB, AB), chooses from the
- * container with the fewest instances, to favor ultimately removing containers. If there is
- * still a tie, favor removing from higher numbered containers
- */
- @Test
- public void testScaleDownOneComponentRemoveContainer() throws Exception {
- @SuppressWarnings({"unchecked", "rawtypes"})
- Pair<Integer, InstanceId>[] initialComponentInstances = new Pair[] {
- new Pair<>(1, new InstanceId(A, 1, 0)),
- new Pair<>(1, new InstanceId(A, 2, 1)),
- new Pair<>(1, new InstanceId(B, 3, 0)),
- new Pair<>(3, new InstanceId(B, 4, 1)),
- new Pair<>(3, new InstanceId(B, 5, 2)),
- new Pair<>(4, new InstanceId(B, 6, 3)),
- new Pair<>(4, new InstanceId(B, 7, 4))
- };
-
- Map<String, Integer> componentChanges = new HashMap<>();
- componentChanges.put(B, -2);
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- Pair<Integer, InstanceId>[] expectedComponentInstances = new Pair[] {
- new Pair<>(1, new InstanceId(A, 1, 0)),
- new Pair<>(1, new InstanceId(A, 2, 1)),
- new Pair<>(1, new InstanceId(B, 3, 0)),
- new Pair<>(3, new InstanceId(B, 4, 1)),
- new Pair<>(3, new InstanceId(B, 5, 2)),
- };
-
- doScaleDownTest(initialComponentInstances, componentChanges, expectedComponentInstances);
- }
-
- @Test
- public void testScaleDownTwoComponentsRemoveContainer() throws Exception {
- @SuppressWarnings({"unchecked", "rawtypes"})
- Pair<Integer, InstanceId>[] initialComponentInstances = new Pair[] {
- new Pair<>(1, new InstanceId(A, 1, 0)),
- new Pair<>(1, new InstanceId(A, 2, 1)),
- new Pair<>(1, new InstanceId(B, 3, 0)),
- new Pair<>(1, new InstanceId(B, 4, 1)),
- new Pair<>(3, new InstanceId(A, 5, 2)),
- new Pair<>(3, new InstanceId(A, 6, 3)),
- new Pair<>(3, new InstanceId(B, 7, 2)),
- new Pair<>(3, new InstanceId(B, 8, 3))
- };
-
- Map<String, Integer> componentChanges = new HashMap<>();
- componentChanges.put(A, -2);
- componentChanges.put(B, -2);
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- Pair<Integer, InstanceId>[] expectedComponentInstances = new Pair[] {
- new Pair<>(1, new InstanceId(A, 1, 0)),
- new Pair<>(1, new InstanceId(A, 2, 1)),
- new Pair<>(1, new InstanceId(B, 3, 0)),
- new Pair<>(1, new InstanceId(B, 4, 1)),
- };
-
- doScaleDownTest(initialComponentInstances, componentChanges, expectedComponentInstances);
- }
-
- @Test
- public void testScaleDownHomogenousFirst() throws Exception {
- @SuppressWarnings({"unchecked", "rawtypes"})
- Pair<Integer, InstanceId>[] initialComponentInstances = new Pair[] {
- new Pair<>(1, new InstanceId(A, 1, 0)),
- new Pair<>(1, new InstanceId(A, 2, 1)),
- new Pair<>(1, new InstanceId(B, 3, 0)),
- new Pair<>(3, new InstanceId(B, 4, 1)),
- new Pair<>(3, new InstanceId(B, 5, 2)),
- new Pair<>(3, new InstanceId(B, 6, 3)),
- new Pair<>(3, new InstanceId(B, 7, 4))
- };
-
- Map<String, Integer> componentChanges = new HashMap<>();
- componentChanges.put(B, -4);
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- Pair<Integer, InstanceId>[] expectedComponentInstances = new Pair[] {
- new Pair<>(1, new InstanceId(A, 1, 0)),
- new Pair<>(1, new InstanceId(A, 2, 1)),
- new Pair<>(1, new InstanceId(B, 3, 0))
- };
-
- doScaleDownTest(initialComponentInstances, componentChanges, expectedComponentInstances);
- }
-
- /**
- * Test the scenario where scaling down and up is simultaneously requested
- */
- @Test
- public void scaleDownAndUp() throws Exception {
- int spoutScalingDown = -4;
- int boltScalingUp = 6;
-
- Map<String, Integer> componentChanges = new HashMap<>();
- componentChanges.put(SPOUT_NAME, spoutScalingDown); // 0 spouts
- componentChanges.put(BOLT_NAME, boltScalingUp); // 9 bolts
- int numContainersBeforeRepack = 2;
- PackingPlan newPackingPlan = doDefaultScalingTest(componentChanges, numContainersBeforeRepack);
-
- Assert.assertEquals(3, newPackingPlan.getContainers().size());
- Assert.assertEquals((Integer) (totalInstances + spoutScalingDown + boltScalingUp),
- newPackingPlan.getInstanceCount());
- AssertPacking.assertNumInstances(newPackingPlan.getContainers(),
- BOLT_NAME, 9);
- AssertPacking.assertNumInstances(newPackingPlan.getContainers(),
- SPOUT_NAME, 0);
- }
-
- @Test(expected = PackingException.class)
- public void testScaleDownInvalidScaleFactor() throws Exception {
-
- //try to remove more spout instances than possible
- int spoutScalingDown = -5;
- Map<String, Integer> componentChanges = new HashMap<>();
- componentChanges.put(SPOUT_NAME, spoutScalingDown);
-
- int numContainersBeforeRepack = 2;
- doDefaultScalingTest(componentChanges, numContainersBeforeRepack);
- }
-
- @Test(expected = PackingException.class)
- public void testScaleDownInvalidComponent() throws Exception {
- Map<String, Integer> componentChanges = new HashMap<>();
- componentChanges.put("SPOUT_FAKE", -10); //try to remove a component that does not exist
- int numContainersBeforeRepack = 2;
- doDefaultScalingTest(componentChanges, numContainersBeforeRepack);
- }
-
- /**
- * Test invalid RAM for instance
- */
- @Test(expected = PackingException.class)
- public void testInvalidRamInstance() throws Exception {
- ByteAmount maxContainerRam = ByteAmount.fromGigabytes(10);
- int defaultNumInstancesperContainer = 4;
-
- // Explicit set component RAM map
- ByteAmount boltRam = ByteAmount.ZERO;
-
- topologyConfig.setContainerMaxRamHint(maxContainerRam);
- topologyConfig.setComponentRam(BOLT_NAME, boltRam);
-
- TopologyAPI.Topology topologyExplicitRamMap =
- getTopology(spoutParallelism, boltParallelism, topologyConfig);
- PackingPlan packingPlanExplicitRamMap = pack(topologyExplicitRamMap);
- Assert.assertEquals(totalInstances, packingPlanExplicitRamMap.getInstanceCount());
- AssertPacking.assertNumInstances(packingPlanExplicitRamMap.getContainers(), BOLT_NAME, 3);
- AssertPacking.assertNumInstances(packingPlanExplicitRamMap.getContainers(), SPOUT_NAME, 4);
- AssertPacking.assertContainerRam(packingPlanExplicitRamMap.getContainers(),
- instanceDefaultResources.getRam().multiply(defaultNumInstancesperContainer));
- }
-
- @Test
- public void testTwoContainersRequested() throws Exception {
- doTestContainerCountRequested(2, 2);
- }
-
- /**
- * Test the scenario where container level resource config are set
- */
- protected void doTestContainerCountRequested(int requestedContainers,
- int expectedContainer) throws Exception {
-
- // Explicit set resources for container
- topologyConfig.setContainerRamRequested(ByteAmount.fromGigabytes(10));
- topologyConfig.setContainerDiskRequested(ByteAmount.fromGigabytes(20));
- topologyConfig.setContainerCpuRequested(30);
- topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, requestedContainers);
-
- TopologyAPI.Topology topologyExplicitResourcesConfig =
- getTopology(spoutParallelism, boltParallelism, topologyConfig);
- PackingPlan packingPlanExplicitResourcesConfig = pack(topologyExplicitResourcesConfig);
-
- Assert.assertEquals(expectedContainer,
- packingPlanExplicitResourcesConfig.getContainers().size());
- Assert.assertEquals(totalInstances, packingPlanExplicitResourcesConfig.getInstanceCount());
-
- // RAM for bolt/spout should be the value in component RAM map
- for (PackingPlan.ContainerPlan containerPlan
- : packingPlanExplicitResourcesConfig.getContainers()) {
- for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
- Assert.assertEquals(instanceDefaultResources, instancePlan.getResource());
- }
- }
- }
}
diff --git a/heron/packing/tests/java/org/apache/heron/packing/binpacking/FirstFitDecreasingPackingTest.java b/heron/packing/tests/java/org/apache/heron/packing/binpacking/FirstFitDecreasingPackingTest.java
index dcb22d7..1b688f6 100644
--- a/heron/packing/tests/java/org/apache/heron/packing/binpacking/FirstFitDecreasingPackingTest.java
+++ b/heron/packing/tests/java/org/apache/heron/packing/binpacking/FirstFitDecreasingPackingTest.java
@@ -29,11 +29,13 @@ import org.junit.Test;
import org.apache.heron.api.generated.TopologyAPI;
import org.apache.heron.common.basics.ByteAmount;
+import org.apache.heron.common.basics.Pair;
import org.apache.heron.packing.AssertPacking;
import org.apache.heron.packing.CommonPackingTests;
import org.apache.heron.packing.utils.PackingUtils;
import org.apache.heron.spi.packing.IPacking;
import org.apache.heron.spi.packing.IRepacking;
+import org.apache.heron.spi.packing.InstanceId;
import org.apache.heron.spi.packing.PackingException;
import org.apache.heron.spi.packing.PackingPlan;
import org.apache.heron.spi.packing.Resource;
@@ -513,4 +515,194 @@ public class FirstFitDecreasingPackingTest extends CommonPackingTests {
AssertPacking.assertNumInstances(newPackingPlan.getContainers(),
SPOUT_NAME, noSpouts + spoutScalingUp);
}
+
+ /**
+ * Test the scenario where scaling down removes instances from containers that are most imbalanced
+ * (i.e., tending towards homogeneity) first. If there is a tie (e.g. AABB, AB), chooses from the
+ * container with the fewest instances, to favor ultimately removing containers. If there is
+ * still a tie, favor removing from higher numbered containers
+ */
+ @Test
+ public void testScaleDownOneComponentRemoveContainer() throws Exception {
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ Pair<Integer, InstanceId>[] initialComponentInstances = new Pair[] {
+ new Pair<>(1, new InstanceId(A, 1, 0)),
+ new Pair<>(1, new InstanceId(A, 2, 1)),
+ new Pair<>(1, new InstanceId(B, 3, 0)),
+ new Pair<>(3, new InstanceId(B, 4, 1)),
+ new Pair<>(3, new InstanceId(B, 5, 2)),
+ new Pair<>(4, new InstanceId(B, 6, 3)),
+ new Pair<>(4, new InstanceId(B, 7, 4))
+ };
+
+ Map<String, Integer> componentChanges = new HashMap<>();
+ componentChanges.put(B, -2);
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ Pair<Integer, InstanceId>[] expectedComponentInstances = new Pair[] {
+ new Pair<>(1, new InstanceId(A, 1, 0)),
+ new Pair<>(1, new InstanceId(A, 2, 1)),
+ new Pair<>(1, new InstanceId(B, 3, 0)),
+ new Pair<>(3, new InstanceId(B, 4, 1)),
+ new Pair<>(3, new InstanceId(B, 5, 2)),
+ };
+
+ doScaleDownTest(initialComponentInstances, componentChanges, expectedComponentInstances);
+ }
+
+ @Test
+ public void testScaleDownTwoComponentsRemoveContainer() throws Exception {
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ Pair<Integer, InstanceId>[] initialComponentInstances = new Pair[] {
+ new Pair<>(1, new InstanceId(A, 1, 0)),
+ new Pair<>(1, new InstanceId(A, 2, 1)),
+ new Pair<>(1, new InstanceId(B, 3, 0)),
+ new Pair<>(1, new InstanceId(B, 4, 1)),
+ new Pair<>(3, new InstanceId(A, 5, 2)),
+ new Pair<>(3, new InstanceId(A, 6, 3)),
+ new Pair<>(3, new InstanceId(B, 7, 2)),
+ new Pair<>(3, new InstanceId(B, 8, 3))
+ };
+
+ Map<String, Integer> componentChanges = new HashMap<>();
+ componentChanges.put(A, -2);
+ componentChanges.put(B, -2);
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ Pair<Integer, InstanceId>[] expectedComponentInstances = new Pair[] {
+ new Pair<>(1, new InstanceId(A, 1, 0)),
+ new Pair<>(1, new InstanceId(A, 2, 1)),
+ new Pair<>(1, new InstanceId(B, 3, 0)),
+ new Pair<>(1, new InstanceId(B, 4, 1)),
+ };
+
+ doScaleDownTest(initialComponentInstances, componentChanges, expectedComponentInstances);
+ }
+
+ @Test
+ public void testScaleDownHomogenousFirst() throws Exception {
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ Pair<Integer, InstanceId>[] initialComponentInstances = new Pair[] {
+ new Pair<>(1, new InstanceId(A, 1, 0)),
+ new Pair<>(1, new InstanceId(A, 2, 1)),
+ new Pair<>(1, new InstanceId(B, 3, 0)),
+ new Pair<>(3, new InstanceId(B, 4, 1)),
+ new Pair<>(3, new InstanceId(B, 5, 2)),
+ new Pair<>(3, new InstanceId(B, 6, 3)),
+ new Pair<>(3, new InstanceId(B, 7, 4))
+ };
+
+ Map<String, Integer> componentChanges = new HashMap<>();
+ componentChanges.put(B, -4);
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ Pair<Integer, InstanceId>[] expectedComponentInstances = new Pair[] {
+ new Pair<>(1, new InstanceId(A, 1, 0)),
+ new Pair<>(1, new InstanceId(A, 2, 1)),
+ new Pair<>(1, new InstanceId(B, 3, 0))
+ };
+
+ doScaleDownTest(initialComponentInstances, componentChanges, expectedComponentInstances);
+ }
+
+ /**
+ * Test the scenario where scaling down and up is simultaneously requested
+ */
+ @Test
+ public void scaleDownAndUp() throws Exception {
+ int spoutScalingDown = -4;
+ int boltScalingUp = 6;
+
+ Map<String, Integer> componentChanges = new HashMap<>();
+ componentChanges.put(SPOUT_NAME, spoutScalingDown); // 0 spouts
+ componentChanges.put(BOLT_NAME, boltScalingUp); // 9 bolts
+ int numContainersBeforeRepack = 2;
+ PackingPlan newPackingPlan = doDefaultScalingTest(componentChanges, numContainersBeforeRepack);
+
+ Assert.assertEquals(3, newPackingPlan.getContainers().size());
+ Assert.assertEquals((Integer) (totalInstances + spoutScalingDown + boltScalingUp),
+ newPackingPlan.getInstanceCount());
+ AssertPacking.assertNumInstances(newPackingPlan.getContainers(),
+ BOLT_NAME, 9);
+ AssertPacking.assertNumInstances(newPackingPlan.getContainers(),
+ SPOUT_NAME, 0);
+ }
+
+ @Test(expected = PackingException.class)
+ public void testScaleDownInvalidScaleFactor() throws Exception {
+
+ //try to remove more spout instances than possible
+ int spoutScalingDown = -5;
+ Map<String, Integer> componentChanges = new HashMap<>();
+ componentChanges.put(SPOUT_NAME, spoutScalingDown);
+
+ int numContainersBeforeRepack = 2;
+ doDefaultScalingTest(componentChanges, numContainersBeforeRepack);
+ }
+
+ @Test(expected = PackingException.class)
+ public void testScaleDownInvalidComponent() throws Exception {
+ Map<String, Integer> componentChanges = new HashMap<>();
+ componentChanges.put("SPOUT_FAKE", -10); //try to remove a component that does not exist
+ int numContainersBeforeRepack = 2;
+ doDefaultScalingTest(componentChanges, numContainersBeforeRepack);
+ }
+
+ /**
+ * Test invalid RAM for instance
+ */
+ @Test(expected = PackingException.class)
+ public void testInvalidRamInstance() throws Exception {
+ ByteAmount maxContainerRam = ByteAmount.fromGigabytes(10);
+ int defaultNumInstancesperContainer = 4;
+
+ // Explicit set component RAM map
+ ByteAmount boltRam = ByteAmount.ZERO;
+
+ topologyConfig.setContainerMaxRamHint(maxContainerRam);
+ topologyConfig.setComponentRam(BOLT_NAME, boltRam);
+
+ TopologyAPI.Topology topologyExplicitRamMap =
+ getTopology(spoutParallelism, boltParallelism, topologyConfig);
+ PackingPlan packingPlanExplicitRamMap = pack(topologyExplicitRamMap);
+ Assert.assertEquals(totalInstances, packingPlanExplicitRamMap.getInstanceCount());
+ AssertPacking.assertNumInstances(packingPlanExplicitRamMap.getContainers(), BOLT_NAME, 3);
+ AssertPacking.assertNumInstances(packingPlanExplicitRamMap.getContainers(), SPOUT_NAME, 4);
+ AssertPacking.assertContainerRam(packingPlanExplicitRamMap.getContainers(),
+ instanceDefaultResources.getRam().multiply(defaultNumInstancesperContainer));
+ }
+
+ @Test
+ public void testTwoContainersRequested() throws Exception {
+ doTestContainerCountRequested(2, 2);
+ }
+
+ /**
+ * Test the scenario where container level resource config are set
+ */
+ protected void doTestContainerCountRequested(int requestedContainers,
+ int expectedContainer) throws Exception {
+
+ // Explicit set resources for container
+ topologyConfig.setContainerRamRequested(ByteAmount.fromGigabytes(10));
+ topologyConfig.setContainerDiskRequested(ByteAmount.fromGigabytes(20));
+ topologyConfig.setContainerCpuRequested(30);
+ topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, requestedContainers);
+
+ TopologyAPI.Topology topologyExplicitResourcesConfig =
+ getTopology(spoutParallelism, boltParallelism, topologyConfig);
+ PackingPlan packingPlanExplicitResourcesConfig = pack(topologyExplicitResourcesConfig);
+
+ Assert.assertEquals(expectedContainer,
+ packingPlanExplicitResourcesConfig.getContainers().size());
+ Assert.assertEquals(totalInstances, packingPlanExplicitResourcesConfig.getInstanceCount());
+
+ // RAM for bolt/spout should be the value in component RAM map
+ for (PackingPlan.ContainerPlan containerPlan
+ : packingPlanExplicitResourcesConfig.getContainers()) {
+ for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
+ Assert.assertEquals(instanceDefaultResources, instancePlan.getResource());
+ }
+ }
+ }
}
diff --git a/heron/packing/tests/java/org/apache/heron/packing/roundrobin/ResourceCompliantRRPackingTest.java b/heron/packing/tests/java/org/apache/heron/packing/roundrobin/ResourceCompliantRRPackingTest.java
index b927997..4aa4ce0 100644
--- a/heron/packing/tests/java/org/apache/heron/packing/roundrobin/ResourceCompliantRRPackingTest.java
+++ b/heron/packing/tests/java/org/apache/heron/packing/roundrobin/ResourceCompliantRRPackingTest.java
@@ -30,11 +30,13 @@ import org.junit.Test;
import org.apache.heron.api.generated.TopologyAPI;
import org.apache.heron.api.utils.TopologyUtils;
import org.apache.heron.common.basics.ByteAmount;
+import org.apache.heron.common.basics.Pair;
import org.apache.heron.packing.AssertPacking;
import org.apache.heron.packing.CommonPackingTests;
import org.apache.heron.packing.utils.PackingUtils;
import org.apache.heron.spi.packing.IPacking;
import org.apache.heron.spi.packing.IRepacking;
+import org.apache.heron.spi.packing.InstanceId;
import org.apache.heron.spi.packing.PackingException;
import org.apache.heron.spi.packing.PackingPlan;
import org.apache.heron.spi.packing.Resource;
@@ -555,4 +557,194 @@ public class ResourceCompliantRRPackingTest extends CommonPackingTests {
AssertPacking.assertNumInstances(newPackingPlan.getContainers(),
SPOUT_NAME, spoutParallelism + spoutScalingUp);
}
+
+ /**
+ * Test the scenario where scaling down removes instances from containers that are most imbalanced
+ * (i.e., tending towards homogeneity) first. If there is a tie (e.g. AABB, AB), chooses from the
+ * container with the fewest instances, to favor ultimately removing containers. If there is
+ * still a tie, favor removing from higher numbered containers
+ */
+ @Test
+ public void testScaleDownOneComponentRemoveContainer() throws Exception {
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ Pair<Integer, InstanceId>[] initialComponentInstances = new Pair[] {
+ new Pair<>(1, new InstanceId(A, 1, 0)),
+ new Pair<>(1, new InstanceId(A, 2, 1)),
+ new Pair<>(1, new InstanceId(B, 3, 0)),
+ new Pair<>(3, new InstanceId(B, 4, 1)),
+ new Pair<>(3, new InstanceId(B, 5, 2)),
+ new Pair<>(4, new InstanceId(B, 6, 3)),
+ new Pair<>(4, new InstanceId(B, 7, 4))
+ };
+
+ Map<String, Integer> componentChanges = new HashMap<>();
+ componentChanges.put(B, -2);
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ Pair<Integer, InstanceId>[] expectedComponentInstances = new Pair[] {
+ new Pair<>(1, new InstanceId(A, 1, 0)),
+ new Pair<>(1, new InstanceId(A, 2, 1)),
+ new Pair<>(1, new InstanceId(B, 3, 0)),
+ new Pair<>(3, new InstanceId(B, 4, 1)),
+ new Pair<>(3, new InstanceId(B, 5, 2)),
+ };
+
+ doScaleDownTest(initialComponentInstances, componentChanges, expectedComponentInstances);
+ }
+
+ @Test
+ public void testScaleDownTwoComponentsRemoveContainer() throws Exception {
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ Pair<Integer, InstanceId>[] initialComponentInstances = new Pair[] {
+ new Pair<>(1, new InstanceId(A, 1, 0)),
+ new Pair<>(1, new InstanceId(A, 2, 1)),
+ new Pair<>(1, new InstanceId(B, 3, 0)),
+ new Pair<>(1, new InstanceId(B, 4, 1)),
+ new Pair<>(3, new InstanceId(A, 5, 2)),
+ new Pair<>(3, new InstanceId(A, 6, 3)),
+ new Pair<>(3, new InstanceId(B, 7, 2)),
+ new Pair<>(3, new InstanceId(B, 8, 3))
+ };
+
+ Map<String, Integer> componentChanges = new HashMap<>();
+ componentChanges.put(A, -2);
+ componentChanges.put(B, -2);
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ Pair<Integer, InstanceId>[] expectedComponentInstances = new Pair[] {
+ new Pair<>(1, new InstanceId(A, 1, 0)),
+ new Pair<>(1, new InstanceId(A, 2, 1)),
+ new Pair<>(1, new InstanceId(B, 3, 0)),
+ new Pair<>(1, new InstanceId(B, 4, 1)),
+ };
+
+ doScaleDownTest(initialComponentInstances, componentChanges, expectedComponentInstances);
+ }
+
+ @Test
+ public void testScaleDownHomogenousFirst() throws Exception {
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ Pair<Integer, InstanceId>[] initialComponentInstances = new Pair[] {
+ new Pair<>(1, new InstanceId(A, 1, 0)),
+ new Pair<>(1, new InstanceId(A, 2, 1)),
+ new Pair<>(1, new InstanceId(B, 3, 0)),
+ new Pair<>(3, new InstanceId(B, 4, 1)),
+ new Pair<>(3, new InstanceId(B, 5, 2)),
+ new Pair<>(3, new InstanceId(B, 6, 3)),
+ new Pair<>(3, new InstanceId(B, 7, 4))
+ };
+
+ Map<String, Integer> componentChanges = new HashMap<>();
+ componentChanges.put(B, -4);
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ Pair<Integer, InstanceId>[] expectedComponentInstances = new Pair[] {
+ new Pair<>(1, new InstanceId(A, 1, 0)),
+ new Pair<>(1, new InstanceId(A, 2, 1)),
+ new Pair<>(1, new InstanceId(B, 3, 0))
+ };
+
+ doScaleDownTest(initialComponentInstances, componentChanges, expectedComponentInstances);
+ }
+
+ /**
+ * Test the scenario where scaling down and up is simultaneously requested
+ */
+ @Test
+ public void scaleDownAndUp() throws Exception {
+ int spoutScalingDown = -4;
+ int boltScalingUp = 6;
+
+ Map<String, Integer> componentChanges = new HashMap<>();
+ componentChanges.put(SPOUT_NAME, spoutScalingDown); // 0 spouts
+ componentChanges.put(BOLT_NAME, boltScalingUp); // 9 bolts
+ int numContainersBeforeRepack = 2;
+ PackingPlan newPackingPlan = doDefaultScalingTest(componentChanges, numContainersBeforeRepack);
+
+ Assert.assertEquals(3, newPackingPlan.getContainers().size());
+ Assert.assertEquals((Integer) (totalInstances + spoutScalingDown + boltScalingUp),
+ newPackingPlan.getInstanceCount());
+ AssertPacking.assertNumInstances(newPackingPlan.getContainers(),
+ BOLT_NAME, 9);
+ AssertPacking.assertNumInstances(newPackingPlan.getContainers(),
+ SPOUT_NAME, 0);
+ }
+
+ @Test(expected = PackingException.class)
+ public void testScaleDownInvalidScaleFactor() throws Exception {
+
+ //try to remove more spout instances than possible
+ int spoutScalingDown = -5;
+ Map<String, Integer> componentChanges = new HashMap<>();
+ componentChanges.put(SPOUT_NAME, spoutScalingDown);
+
+ int numContainersBeforeRepack = 2;
+ doDefaultScalingTest(componentChanges, numContainersBeforeRepack);
+ }
+
+ @Test(expected = PackingException.class)
+ public void testScaleDownInvalidComponent() throws Exception {
+ Map<String, Integer> componentChanges = new HashMap<>();
+ componentChanges.put("SPOUT_FAKE", -10); //try to remove a component that does not exist
+ int numContainersBeforeRepack = 2;
+ doDefaultScalingTest(componentChanges, numContainersBeforeRepack);
+ }
+
+ /**
+ * Test invalid RAM for instance
+ */
+ @Test(expected = PackingException.class)
+ public void testInvalidRamInstance() throws Exception {
+ ByteAmount maxContainerRam = ByteAmount.fromGigabytes(10);
+ int defaultNumInstancesperContainer = 4;
+
+ // Explicit set component RAM map
+ ByteAmount boltRam = ByteAmount.ZERO;
+
+ topologyConfig.setContainerMaxRamHint(maxContainerRam);
+ topologyConfig.setComponentRam(BOLT_NAME, boltRam);
+
+ TopologyAPI.Topology topologyExplicitRamMap =
+ getTopology(spoutParallelism, boltParallelism, topologyConfig);
+ PackingPlan packingPlanExplicitRamMap = pack(topologyExplicitRamMap);
+ Assert.assertEquals(totalInstances, packingPlanExplicitRamMap.getInstanceCount());
+ AssertPacking.assertNumInstances(packingPlanExplicitRamMap.getContainers(), BOLT_NAME, 3);
+ AssertPacking.assertNumInstances(packingPlanExplicitRamMap.getContainers(), SPOUT_NAME, 4);
+ AssertPacking.assertContainerRam(packingPlanExplicitRamMap.getContainers(),
+ instanceDefaultResources.getRam().multiply(defaultNumInstancesperContainer));
+ }
+
+ @Test
+ public void testTwoContainersRequested() throws Exception {
+ doTestContainerCountRequested(2, 2);
+ }
+
+ /**
+ * Test the scenario where container level resource config are set
+ */
+ protected void doTestContainerCountRequested(int requestedContainers,
+ int expectedContainer) throws Exception {
+
+ // Explicit set resources for container
+ topologyConfig.setContainerRamRequested(ByteAmount.fromGigabytes(10));
+ topologyConfig.setContainerDiskRequested(ByteAmount.fromGigabytes(20));
+ topologyConfig.setContainerCpuRequested(30);
+ topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, requestedContainers);
+
+ TopologyAPI.Topology topologyExplicitResourcesConfig =
+ getTopology(spoutParallelism, boltParallelism, topologyConfig);
+ PackingPlan packingPlanExplicitResourcesConfig = pack(topologyExplicitResourcesConfig);
+
+ Assert.assertEquals(expectedContainer,
+ packingPlanExplicitResourcesConfig.getContainers().size());
+ Assert.assertEquals(totalInstances, packingPlanExplicitResourcesConfig.getInstanceCount());
+
+ // RAM for bolt/spout should be the value in component RAM map
+ for (PackingPlan.ContainerPlan containerPlan
+ : packingPlanExplicitResourcesConfig.getContainers()) {
+ for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
+ Assert.assertEquals(instanceDefaultResources, instancePlan.getResource());
+ }
+ }
+ }
}
diff --git a/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java b/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
index 109ae47..e56b79d 100644
--- a/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
+++ b/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
@@ -22,110 +22,83 @@ package org.apache.heron.packing.roundrobin;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
-import org.apache.heron.api.generated.TopologyAPI;
-import org.apache.heron.api.utils.TopologyUtils;
import org.apache.heron.common.basics.ByteAmount;
-import org.apache.heron.common.utils.topology.TopologyTests;
-import org.apache.heron.packing.AssertPacking;
-import org.apache.heron.spi.common.Config;
+import org.apache.heron.packing.CommonPackingTests;
+import org.apache.heron.spi.packing.IPacking;
+import org.apache.heron.spi.packing.IRepacking;
import org.apache.heron.spi.packing.PackingException;
import org.apache.heron.spi.packing.PackingPlan;
import org.apache.heron.spi.packing.Resource;
-import org.apache.heron.spi.utils.PackingTestUtils;
-
-public class RoundRobinPackingTest {
- private static final String BOLT_NAME = "bolt";
- private static final String SPOUT_NAME = "spout";
- private static final double DELTA = 0.1;
-
- private TopologyAPI.Topology getTopology(
- int spoutParallelism, int boltParallelism,
- org.apache.heron.api.Config topologyConfig) {
- return TopologyTests.createTopology("testTopology", topologyConfig, SPOUT_NAME, BOLT_NAME,
- spoutParallelism, boltParallelism);
- }
- private PackingPlan getRoundRobinPackingPlan(TopologyAPI.Topology topology) {
- Config config = PackingTestUtils.newTestConfig(topology);
+import static org.apache.heron.packing.AssertPacking.DELTA;
+
+public class RoundRobinPackingTest extends CommonPackingTests {
+ @Override
+ protected IPacking getPackingImpl() {
+ return new RoundRobinPacking();
+ }
- RoundRobinPacking packing = new RoundRobinPacking();
- packing.initialize(config, topology);
- return packing.pack();
+ @Override
+ protected IRepacking getRepackingImpl() {
+ return new RoundRobinPacking();
}
- private PackingPlan getRoundRobinRePackingPlan(
- TopologyAPI.Topology topology, Map<String, Integer> componentChanges) {
- Config config = PackingTestUtils.newTestConfig(topology);
+ private Resource getDefaultPadding() {
+ return new Resource(RoundRobinPacking.DEFAULT_CPU_PADDING_PER_CONTAINER,
+ RoundRobinPacking.DEFAULT_RAM_PADDING_PER_CONTAINER, ByteAmount.ZERO);
+ }
- RoundRobinPacking packing = new RoundRobinPacking();
- packing.initialize(config, topology);
- PackingPlan pp = packing.pack();
- return packing.repack(pp, componentChanges);
+ @Before
+ public void setUp() {
+ super.setUp();
+ numContainers = 2;
+ topologyConfig.setNumStmgrs(numContainers);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
}
@Test(expected = PackingException.class)
public void testCheckInsufficientRamFailure() throws Exception {
- int numContainers = 2;
- int spoutParallelism = 4;
- int boltParallelism = 3;
-
- // Set up the topology and its config
- org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
- topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
-
// Explicit set insufficient RAM for container
- ByteAmount containerRam = ByteAmount.fromGigabytes(0);
-
+ ByteAmount containerRam = ByteAmount.ZERO;
topologyConfig.setContainerRamRequested(containerRam);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
- TopologyAPI.Topology topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
- getRoundRobinPackingPlan(topology);
+ doPackingTest(topology, instanceDefaultResources, boltParallelism,
+ instanceDefaultResources, spoutParallelism,
+ numContainers,
+ getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+ numContainers, getDefaultPadding()).cloneWithRam(containerRam));
}
@Test(expected = PackingException.class)
public void testCheckInsufficientCpuFailure() throws Exception {
- int numContainers = 2;
- int spoutParallelism = 4;
- int boltParallelism = 3;
-
- // Set up the topology and its config
- org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
- topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
-
// Explicit set insufficient CPU for container
double containerCpu = 1.0;
-
topologyConfig.setContainerCpuRequested(containerCpu);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
- TopologyAPI.Topology topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
- getRoundRobinPackingPlan(topology);
+ doPackingTest(topology, instanceDefaultResources, boltParallelism,
+ instanceDefaultResources, spoutParallelism,
+ numContainers,
+ getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+ numContainers, getDefaultPadding()).cloneWithCpu(containerCpu));
}
@Test
public void testDefaultResources() throws Exception {
- int numContainers = 2;
- int spoutParallelism = 4;
- int boltParallelism = 3;
- Integer totalInstances = spoutParallelism + boltParallelism;
-
- // Set up the topology and its config
- org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
- topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
-
- // No explicit resources required
- TopologyAPI.Topology topologyNoExplicitResourcesConfig =
- getTopology(spoutParallelism, boltParallelism, topologyConfig);
- PackingPlan packingPlanNoExplicitResourcesConfig =
- getRoundRobinPackingPlan(topologyNoExplicitResourcesConfig);
-
- Assert.assertEquals(numContainers,
- packingPlanNoExplicitResourcesConfig.getContainers().size());
- Assert.assertEquals(totalInstances, packingPlanNoExplicitResourcesConfig.getInstanceCount());
+ doPackingTestWithPartialResource(topology,
+ Optional.empty(), Optional.empty(), boltParallelism,
+ Optional.empty(), Optional.empty(), spoutParallelism,
+ numContainers, getDefaultPadding(),
+ getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+ numContainers, getDefaultPadding()));
}
/**
@@ -133,56 +106,39 @@ public class RoundRobinPackingTest {
*/
@Test
public void testContainerRequestedResources() throws Exception {
- int numContainers = 2;
- int spoutParallelism = 4;
- int boltParallelism = 3;
- Integer totalInstances = spoutParallelism + boltParallelism;
-
- // Set up the topology and its config
- org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
- topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
// Explicit set resources for container
ByteAmount containerRam = ByteAmount.fromGigabytes(10);
ByteAmount containerDisk = ByteAmount.fromGigabytes(20);
double containerCpu = 30;
+ Resource containerResource = new Resource(containerCpu, containerRam, containerDisk);
topologyConfig.setContainerRamRequested(containerRam);
topologyConfig.setContainerDiskRequested(containerDisk);
topologyConfig.setContainerCpuRequested(containerCpu);
- TopologyAPI.Topology topologyExplicitResourcesConfig =
- getTopology(spoutParallelism, boltParallelism, topologyConfig);
- PackingPlan packingPlanExplicitResourcesConfig =
- getRoundRobinPackingPlan(topologyExplicitResourcesConfig);
-
- Assert.assertEquals(numContainers,
- packingPlanExplicitResourcesConfig.getContainers().size());
- Assert.assertEquals(totalInstances, packingPlanExplicitResourcesConfig.getInstanceCount());
-
- for (PackingPlan.ContainerPlan containerPlan
- : packingPlanExplicitResourcesConfig.getContainers()) {
- Assert.assertEquals(containerCpu, containerPlan.getRequiredResource().getCpu(), DELTA);
- Assert.assertTrue(String.format(// due to round-off when using divide()
- "expected: %s but was: %s", containerRam, containerPlan.getRequiredResource().getRam()),
- Math.abs(
- containerRam.minus(containerPlan.getRequiredResource().getRam()).asBytes()) <= 1);
- Assert.assertEquals(containerDisk, containerPlan.getRequiredResource().getDisk());
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+ PackingPlan packingPlan = doPackingTestWithPartialResource(topology,
+ Optional.empty(), Optional.empty(), boltParallelism,
+ Optional.empty(), Optional.empty(), spoutParallelism,
+ numContainers, getDefaultPadding(), containerResource);
+ for (PackingPlan.ContainerPlan containerPlan : packingPlan.getContainers()) {
// All instances' resource requirement should be equal
// So the size of set should be 1
- Set<Resource> resources = new HashSet<>();
+ Set<Resource> differentResources = new HashSet<>();
for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
- resources.add(instancePlan.getResource());
+ differentResources.add(instancePlan.getResource());
}
- Assert.assertEquals(1, resources.size());
+ Assert.assertEquals(1, differentResources.size());
int instancesCount = containerPlan.getInstances().size();
Assert.assertEquals(containerRam
- .minus(RoundRobinPacking.DEFAULT_RAM_PADDING_PER_CONTAINER).divide(instancesCount),
- resources.iterator().next().getRam());
+ .minus(RoundRobinPacking.DEFAULT_RAM_PADDING_PER_CONTAINER).divide(instancesCount),
+ differentResources.iterator().next().getRam());
Assert.assertEquals(
(containerCpu - RoundRobinPacking.DEFAULT_CPU_PADDING_PER_CONTAINER) / instancesCount,
- resources.iterator().next().getCpu(), DELTA);
+ differentResources.iterator().next().getCpu(), DELTA);
}
}
@@ -191,42 +147,25 @@ public class RoundRobinPackingTest {
*/
@Test
public void testContainerRequestedResourcesWhenRamPaddingSet() throws Exception {
- int numContainers = 2;
- int spoutParallelism = 4;
- int boltParallelism = 3;
- Integer totalInstances = spoutParallelism + boltParallelism;
-
- // Set up the topology and its config
- org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
- topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
// Explicit set resources for container
ByteAmount containerRam = ByteAmount.fromGigabytes(10);
ByteAmount containerDisk = ByteAmount.fromGigabytes(20);
- ByteAmount containerRamPadding = ByteAmount.fromMegabytes(512);
double containerCpu = 30;
+ ByteAmount containerRamPadding = ByteAmount.fromMegabytes(512);
+ Resource containerResource = new Resource(containerCpu, containerRam, containerDisk);
topologyConfig.setContainerRamRequested(containerRam);
topologyConfig.setContainerDiskRequested(containerDisk);
topologyConfig.setContainerCpuRequested(containerCpu);
topologyConfig.setContainerRamPadding(containerRamPadding);
- TopologyAPI.Topology topologyExplicitResourcesConfig =
- getTopology(spoutParallelism, boltParallelism, topologyConfig);
- PackingPlan packingPlanExplicitResourcesConfig =
- getRoundRobinPackingPlan(topologyExplicitResourcesConfig);
-
- Assert.assertEquals(numContainers,
- packingPlanExplicitResourcesConfig.getContainers().size());
- Assert.assertEquals(totalInstances, packingPlanExplicitResourcesConfig.getInstanceCount());
-
- for (PackingPlan.ContainerPlan containerPlan
- : packingPlanExplicitResourcesConfig.getContainers()) {
- Assert.assertEquals(containerCpu, containerPlan.getRequiredResource().getCpu(), DELTA);
- Assert.assertTrue(String.format(// due to round-off when using divide()
- "expected: %s but was: %s", containerRam, containerPlan.getRequiredResource().getRam()),
- Math.abs(
- containerRam.minus(containerPlan.getRequiredResource().getRam()).asBytes()) <= 1);
- Assert.assertEquals(containerDisk, containerPlan.getRequiredResource().getDisk());
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+ PackingPlan packingPlan = doPackingTestWithPartialResource(topology,
+ Optional.empty(), Optional.empty(), boltParallelism,
+ Optional.empty(), Optional.empty(), spoutParallelism,
+ numContainers, getDefaultPadding().cloneWithRam(containerRamPadding), containerResource);
+
+ for (PackingPlan.ContainerPlan containerPlan : packingPlan.getContainers()) {
// All instances' resource requirement should be equal
// So the size of set should be 1
Set<Resource> resources = new HashSet<>();
@@ -251,15 +190,6 @@ public class RoundRobinPackingTest {
*/
@Test
public void testCompleteRamMapRequestedWithExactlyEnoughResource() throws Exception {
- int numContainers = 2;
- int spoutParallelism = 4;
- int boltParallelism = 3;
- Integer totalInstances = spoutParallelism + boltParallelism;
-
- // Set up the topology and its config
- org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
- topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
-
// Explicit set resources for container
// the value should be ignored, since we set the complete component RAM map
ByteAmount containerRam = ByteAmount.fromGigabytes(8);
@@ -271,15 +201,14 @@ public class RoundRobinPackingTest {
topologyConfig.setContainerRamRequested(containerRam);
topologyConfig.setComponentRam(BOLT_NAME, boltRam);
topologyConfig.setComponentRam(SPOUT_NAME, spoutRam);
-
- TopologyAPI.Topology topologyExplicitRamMap =
- getTopology(spoutParallelism, boltParallelism, topologyConfig);
- PackingPlan packingPlanExplicitRamMap =
- getRoundRobinPackingPlan(topologyExplicitRamMap);
-
- AssertPacking.assertContainers(packingPlanExplicitRamMap.getContainers(),
- BOLT_NAME, SPOUT_NAME, boltRam, spoutRam, null);
- Assert.assertEquals(totalInstances, packingPlanExplicitRamMap.getInstanceCount());
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+ doPackingTestWithPartialResource(topology,
+ Optional.of(boltRam), Optional.empty(), boltParallelism,
+ Optional.of(spoutRam), Optional.empty(), spoutParallelism,
+ numContainers, getDefaultPadding(),
+ getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+ numContainers, getDefaultPadding()).cloneWithRam(containerRam));
}
/**
@@ -287,14 +216,6 @@ public class RoundRobinPackingTest {
*/
@Test(expected = PackingException.class)
public void testCompleteRamMapRequestedWithLessThanEnoughResource() throws Exception {
- int numContainers = 2;
- int spoutParallelism = 4;
- int boltParallelism = 3;
-
- // Set up the topology and its config
- org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
- topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
-
// Explicit set resources for container
// the value should be ignored, since we set the complete component RAM map
ByteAmount containerRam = ByteAmount.fromGigabytes(2);
@@ -306,10 +227,14 @@ public class RoundRobinPackingTest {
topologyConfig.setContainerRamRequested(containerRam);
topologyConfig.setComponentRam(BOLT_NAME, boltRam);
topologyConfig.setComponentRam(SPOUT_NAME, spoutRam);
-
- TopologyAPI.Topology topologyExplicitRamMap =
- getTopology(spoutParallelism, boltParallelism, topologyConfig);
- getRoundRobinPackingPlan(topologyExplicitRamMap);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+ doPackingTestWithPartialResource(topology,
+ Optional.of(boltRam), Optional.empty(), boltParallelism,
+ Optional.of(spoutRam), Optional.empty(), spoutParallelism,
+ numContainers, getDefaultPadding(),
+ getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+ numContainers, getDefaultPadding()).cloneWithRam(containerRam));
}
/**
@@ -317,15 +242,6 @@ public class RoundRobinPackingTest {
*/
@Test
public void testCompleteRamMapRequestedWithMoreThanEnoughResource() throws Exception {
- int numContainers = 2;
- int spoutParallelism = 4;
- int boltParallelism = 3;
- Integer totalInstances = spoutParallelism + boltParallelism;
-
- // Set up the topology and its config
- org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
- topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
-
// Explicit set resources for container
// the value should be ignored, since we set the complete component RAM map
ByteAmount containerRam = ByteAmount.fromBytes(Long.MAX_VALUE);
@@ -337,15 +253,14 @@ public class RoundRobinPackingTest {
topologyConfig.setContainerRamRequested(containerRam);
topologyConfig.setComponentRam(BOLT_NAME, boltRam);
topologyConfig.setComponentRam(SPOUT_NAME, spoutRam);
-
- TopologyAPI.Topology topologyExplicitRamMap =
- getTopology(spoutParallelism, boltParallelism, topologyConfig);
- PackingPlan packingPlanExplicitRamMap =
- getRoundRobinPackingPlan(topologyExplicitRamMap);
-
- AssertPacking.assertContainers(packingPlanExplicitRamMap.getContainers(),
- BOLT_NAME, SPOUT_NAME, boltRam, spoutRam, containerRam);
- Assert.assertEquals(totalInstances, packingPlanExplicitRamMap.getInstanceCount());
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+ doPackingTestWithPartialResource(topology,
+ Optional.of(boltRam), Optional.empty(), boltParallelism,
+ Optional.of(spoutRam), Optional.empty(), spoutParallelism,
+ numContainers, getDefaultPadding(),
+ getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+ numContainers, getDefaultPadding()).cloneWithRam(containerRam));
}
/**
@@ -353,15 +268,6 @@ public class RoundRobinPackingTest {
*/
@Test
public void testCompleteRamMapRequestedWithoutPaddingResource() throws Exception {
- int numContainers = 2;
- int spoutParallelism = 4;
- int boltParallelism = 3;
- Integer totalInstances = spoutParallelism + boltParallelism;
-
- // Set up the topology and its config
- org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
- topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
-
// Explicit set resources for container
// the value should be ignored, since we set the complete component RAM map
ByteAmount containerRam = ByteAmount.fromGigabytes(6);
@@ -373,15 +279,14 @@ public class RoundRobinPackingTest {
topologyConfig.setContainerRamRequested(containerRam);
topologyConfig.setComponentRam(BOLT_NAME, boltRam);
topologyConfig.setComponentRam(SPOUT_NAME, spoutRam);
-
- TopologyAPI.Topology topologyExplicitRamMap =
- getTopology(spoutParallelism, boltParallelism, topologyConfig);
- PackingPlan packingPlanExplicitRamMap =
- getRoundRobinPackingPlan(topologyExplicitRamMap);
-
- AssertPacking.assertContainers(packingPlanExplicitRamMap.getContainers(),
- BOLT_NAME, SPOUT_NAME, boltRam, spoutRam, null);
- Assert.assertEquals(totalInstances, packingPlanExplicitRamMap.getInstanceCount());
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+ doPackingTestWithPartialResource(topology,
+ Optional.of(boltRam), Optional.empty(), boltParallelism,
+ Optional.of(spoutRam), Optional.empty(), spoutParallelism,
+ numContainers, getDefaultPadding(),
+ getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+ numContainers, getDefaultPadding()).cloneWithRam(containerRam));
}
/**
@@ -389,15 +294,6 @@ public class RoundRobinPackingTest {
*/
@Test
public void testCompleteCpuMapRequestedWithExactlyEnoughResource() throws Exception {
- int numContainers = 2;
- int spoutParallelism = 4;
- int boltParallelism = 3;
- Integer totalInstances = spoutParallelism + boltParallelism;
-
- // Set up the topology and its config
- org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
- topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
-
// Explicit set resources for container
double containerCpu = 17;
@@ -408,15 +304,14 @@ public class RoundRobinPackingTest {
topologyConfig.setContainerCpuRequested(containerCpu);
topologyConfig.setComponentCpu(BOLT_NAME, boltCpu);
topologyConfig.setComponentCpu(SPOUT_NAME, spoutCpu);
-
- TopologyAPI.Topology topologyExplicitCpuMap =
- getTopology(spoutParallelism, boltParallelism, topologyConfig);
- PackingPlan packingPlanExplicitCpuMap =
- getRoundRobinPackingPlan(topologyExplicitCpuMap);
-
- AssertPacking.assertContainers(packingPlanExplicitCpuMap.getContainers(),
- BOLT_NAME, SPOUT_NAME, boltCpu, spoutCpu, null);
- Assert.assertEquals(totalInstances, packingPlanExplicitCpuMap.getInstanceCount());
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+ doPackingTestWithPartialResource(topology,
+ Optional.empty(), Optional.of(boltCpu), boltParallelism,
+ Optional.empty(), Optional.of(spoutCpu), spoutParallelism,
+ numContainers, getDefaultPadding(),
+ getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+ numContainers, getDefaultPadding()).cloneWithCpu(containerCpu));
}
/**
@@ -424,15 +319,6 @@ public class RoundRobinPackingTest {
*/
@Test
public void testCompleteCpuMapRequestedWithMoreThanEnoughResource() throws Exception {
- int numContainers = 2;
- int spoutParallelism = 4;
- int boltParallelism = 3;
- Integer totalInstances = spoutParallelism + boltParallelism;
-
- // Set up the topology and its config
- org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
- topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
-
// Explicit set resources for container
double containerCpu = 30;
@@ -443,15 +329,14 @@ public class RoundRobinPackingTest {
topologyConfig.setContainerCpuRequested(containerCpu);
topologyConfig.setComponentCpu(BOLT_NAME, boltCpu);
topologyConfig.setComponentCpu(SPOUT_NAME, spoutCpu);
-
- TopologyAPI.Topology topologyExplicitCpuMap =
- getTopology(spoutParallelism, boltParallelism, topologyConfig);
- PackingPlan packingPlanExplicitCpuMap =
- getRoundRobinPackingPlan(topologyExplicitCpuMap);
-
- AssertPacking.assertContainers(packingPlanExplicitCpuMap.getContainers(),
- BOLT_NAME, SPOUT_NAME, boltCpu, spoutCpu, null);
- Assert.assertEquals(totalInstances, packingPlanExplicitCpuMap.getInstanceCount());
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+ doPackingTestWithPartialResource(topology,
+ Optional.empty(), Optional.of(boltCpu), boltParallelism,
+ Optional.empty(), Optional.of(spoutCpu), spoutParallelism,
+ numContainers, getDefaultPadding(),
+ getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+ numContainers, getDefaultPadding()).cloneWithCpu(containerCpu));
}
/**
@@ -459,15 +344,6 @@ public class RoundRobinPackingTest {
*/
@Test(expected = PackingException.class)
public void testCompleteCpuMapRequestedWithLessThanEnoughResource() throws Exception {
- int numContainers = 2;
- int spoutParallelism = 4;
- int boltParallelism = 3;
- Integer totalInstances = spoutParallelism + boltParallelism;
-
- // Set up the topology and its config
- org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
- topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
-
// Explicit set resources for container
double containerCpu = 10;
@@ -478,14 +354,14 @@ public class RoundRobinPackingTest {
topologyConfig.setContainerCpuRequested(containerCpu);
topologyConfig.setComponentCpu(BOLT_NAME, boltCpu);
topologyConfig.setComponentCpu(SPOUT_NAME, spoutCpu);
-
- TopologyAPI.Topology topologyExplicitCpuMap =
- getTopology(spoutParallelism, boltParallelism, topologyConfig);
- PackingPlan packingPlanExplicitCpuMap = getRoundRobinPackingPlan(topologyExplicitCpuMap);
-
- AssertPacking.assertContainers(packingPlanExplicitCpuMap.getContainers(),
- BOLT_NAME, SPOUT_NAME, boltCpu, spoutCpu, null);
- Assert.assertEquals(totalInstances, packingPlanExplicitCpuMap.getInstanceCount());
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+ doPackingTestWithPartialResource(topology,
+ Optional.empty(), Optional.of(boltCpu), boltParallelism,
+ Optional.empty(), Optional.of(spoutCpu), spoutParallelism,
+ numContainers, getDefaultPadding(),
+ getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+ numContainers, getDefaultPadding()).cloneWithCpu(containerCpu));
}
/**
@@ -494,14 +370,6 @@ public class RoundRobinPackingTest {
*/
@Test
public void testCompleteCpuMapRequestedWithoutPaddingResource() throws Exception {
- int numContainers = 2;
- int spoutParallelism = 4;
- int boltParallelism = 3;
-
- // Set up the topology and its config
- org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
- topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
-
// Explicit set resources for container
double containerCpu = 16;
@@ -512,10 +380,14 @@ public class RoundRobinPackingTest {
topologyConfig.setContainerCpuRequested(containerCpu);
topologyConfig.setComponentCpu(BOLT_NAME, boltCpu);
topologyConfig.setComponentCpu(SPOUT_NAME, spoutCpu);
-
- TopologyAPI.Topology topologyExplicitCpuMap =
- getTopology(spoutParallelism, boltParallelism, topologyConfig);
- getRoundRobinPackingPlan(topologyExplicitCpuMap);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+ doPackingTestWithPartialResource(topology,
+ Optional.empty(), Optional.of(boltCpu), boltParallelism,
+ Optional.empty(), Optional.of(spoutCpu), spoutParallelism,
+ numContainers, getDefaultPadding(),
+ getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+ numContainers, getDefaultPadding()).cloneWithCpu(containerCpu));
}
/**
@@ -523,15 +395,6 @@ public class RoundRobinPackingTest {
*/
@Test
public void testPartialRamMap() throws Exception {
- int numContainers = 2;
- int spoutParallelism = 4;
- int boltParallelism = 3;
- Integer totalInstances = spoutParallelism + boltParallelism;
-
- // Set up the topology and its config
- org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
- topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
-
// Explicit set resources for container
ByteAmount containerRam = ByteAmount.fromGigabytes(10);
@@ -540,39 +403,14 @@ public class RoundRobinPackingTest {
topologyConfig.setContainerRamRequested(containerRam);
topologyConfig.setComponentRam(BOLT_NAME, boltRam);
-
- TopologyAPI.Topology topologyExplicitRamMap =
- getTopology(spoutParallelism, boltParallelism, topologyConfig);
- PackingPlan packingPlanExplicitRamMap =
- getRoundRobinPackingPlan(topologyExplicitRamMap);
- Assert.assertEquals(totalInstances, packingPlanExplicitRamMap.getInstanceCount());
-
- // RAM for bolt should be the value in component RAM map
- for (PackingPlan.ContainerPlan containerPlan : packingPlanExplicitRamMap.getContainers()) {
- Assert.assertEquals(containerRam, containerPlan.getRequiredResource().getRam());
- int boltCount = 0;
- int instancesCount = containerPlan.getInstances().size();
- for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
- if (instancePlan.getComponentName().equals(BOLT_NAME)) {
- Assert.assertEquals(boltRam, instancePlan.getResource().getRam());
- boltCount++;
- }
- }
-
- // Ram for spout should be:
- // (containerRam - all RAM for bolt - RAM for padding) / (# of spouts)
- int spoutCount = instancesCount - boltCount;
- for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
- if (instancePlan.getComponentName().equals(SPOUT_NAME)) {
- Assert.assertEquals(
- containerRam
- .minus(boltRam.multiply(boltCount))
- .minus(RoundRobinPacking.DEFAULT_RAM_PADDING_PER_CONTAINER)
- .divide(spoutCount),
- instancePlan.getResource().getRam());
- }
- }
- }
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+ doPackingTestWithPartialResource(topology,
+ Optional.of(boltRam), Optional.empty(), boltParallelism,
+ Optional.empty(), Optional.empty(), spoutParallelism,
+ numContainers, getDefaultPadding(),
+ getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+ numContainers, getDefaultPadding()).cloneWithRam(containerRam));
}
/**
@@ -580,15 +418,6 @@ public class RoundRobinPackingTest {
*/
@Test
public void testPartialCpuMap() throws Exception {
- int numContainers = 2;
- int spoutParallelism = 4;
- int boltParallelism = 3;
- Integer totalInstances = spoutParallelism + boltParallelism;
-
- // Set up the topology and its config
- org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
- topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
-
// Explicit set resources for container
double containerCpu = 17;
@@ -597,39 +426,14 @@ public class RoundRobinPackingTest {
topologyConfig.setContainerCpuRequested(containerCpu);
topologyConfig.setComponentCpu(BOLT_NAME, boltCpu);
-
- TopologyAPI.Topology topologyExplicitCpuMap =
- getTopology(spoutParallelism, boltParallelism, topologyConfig);
- PackingPlan packingPlanExplicitCpuMap =
- getRoundRobinPackingPlan(topologyExplicitCpuMap);
- Assert.assertEquals(totalInstances, packingPlanExplicitCpuMap.getInstanceCount());
-
- // CPU for bolt should be the value in component CPU map
- for (PackingPlan.ContainerPlan containerPlan : packingPlanExplicitCpuMap.getContainers()) {
- Assert.assertEquals(containerCpu, containerPlan.getRequiredResource().getCpu(), DELTA);
- int boltCount = 0;
- int instancesCount = containerPlan.getInstances().size();
- for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
- if (instancePlan.getComponentName().equals(BOLT_NAME)) {
- Assert.assertEquals(boltCpu, instancePlan.getResource().getCpu(), DELTA);
- boltCount++;
- }
- }
-
- // CPU for spout should be:
- // (containerCpu - all CPU for bolt - CPU for padding) / (# of spouts)
- int spoutCount = instancesCount - boltCount;
- for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
- if (instancePlan.getComponentName().equals(SPOUT_NAME)) {
- Assert.assertEquals(
- (containerCpu
- - boltCpu * boltCount
- - RoundRobinPacking.DEFAULT_CPU_PADDING_PER_CONTAINER)
- / spoutCount,
- instancePlan.getResource().getCpu(), DELTA);
- }
- }
- }
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+ doPackingTestWithPartialResource(topology,
+ Optional.empty(), Optional.of(boltCpu), boltParallelism,
+ Optional.empty(), Optional.empty(), spoutParallelism,
+ numContainers, getDefaultPadding(),
+ getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+ numContainers, getDefaultPadding()).cloneWithCpu(containerCpu));
}
/**
@@ -637,30 +441,18 @@ public class RoundRobinPackingTest {
*/
@Test
public void testEvenPacking() throws Exception {
- int numContainers = 2;
int componentParallelism = 4;
+ boltParallelism = componentParallelism;
+ spoutParallelism = componentParallelism;
- // Set up the topology and its config
- org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
- topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
-
- TopologyAPI.Topology topology =
- getTopology(componentParallelism, componentParallelism, topologyConfig);
-
- int numInstance = TopologyUtils.getTotalInstance(topology);
- // Two components
- Assert.assertEquals(2 * componentParallelism, numInstance);
- PackingPlan output = getRoundRobinPackingPlan(topology);
- Assert.assertEquals(numContainers, output.getContainers().size());
- Assert.assertEquals((Integer) numInstance, output.getInstanceCount());
-
- for (PackingPlan.ContainerPlan container : output.getContainers()) {
- Assert.assertEquals(numInstance / numContainers, container.getInstances().size());
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
- // Verify each container got 2 spout and 2 bolt and container 1 got
- assertComponentCount(container, "spout", 2);
- assertComponentCount(container, "bolt", 2);
- }
+ doPackingTestWithPartialResource(topology,
+ Optional.empty(), Optional.empty(), boltParallelism,
+ Optional.empty(), Optional.empty(), spoutParallelism,
+ numContainers, getDefaultPadding(),
+ getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+ numContainers, getDefaultPadding()));
}
@@ -669,42 +461,29 @@ public class RoundRobinPackingTest {
*/
@Test
public void testRepackingWithSameTotalInstances() throws Exception {
- int numContainers = 2;
int componentParallelism = 4;
+ boltParallelism = componentParallelism;
+ spoutParallelism = componentParallelism;
- // Set up the topology and its config
- org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
- topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
-
- TopologyAPI.Topology topology =
- getTopology(componentParallelism, componentParallelism, topologyConfig);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
- int numInstance = TopologyUtils.getTotalInstance(topology);
- // Two components
- Assert.assertEquals(2 * componentParallelism, numInstance);
+ PackingPlan packingPlan = doPackingTestWithPartialResource(topology,
+ Optional.empty(), Optional.empty(), boltParallelism,
+ Optional.empty(), Optional.empty(), spoutParallelism,
+ numContainers, getDefaultPadding(),
+ getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+ numContainers, getDefaultPadding()));
Map<String, Integer> componentChanges = new HashMap<>();
componentChanges.put(SPOUT_NAME, -1);
componentChanges.put(BOLT_NAME, +1);
- PackingPlan output = getRoundRobinRePackingPlan(topology, componentChanges);
- Assert.assertEquals(numContainers, output.getContainers().size());
- Assert.assertEquals((Integer) numInstance, output.getInstanceCount());
-
- int spoutCount = 0;
- int boltCount = 0;
- for (PackingPlan.ContainerPlan container : output.getContainers()) {
- Assert.assertEquals(numInstance / numContainers, container.getInstances().size());
-
- for (PackingPlan.InstancePlan instancePlan : container.getInstances()) {
- if (SPOUT_NAME.equals(instancePlan.getComponentName())) {
- spoutCount++;
- } else if (BOLT_NAME.equals(instancePlan.getComponentName())) {
- boltCount++;
- }
- }
- }
- Assert.assertEquals(componentParallelism - 1, spoutCount);
- Assert.assertEquals(componentParallelism + 1, boltCount);
+
+ doScalingTestWithPartialResource(topology, packingPlan, componentChanges,
+ Optional.empty(), Optional.empty(), boltParallelism,
+ Optional.empty(), Optional.empty(), spoutParallelism,
+ numContainers, getDefaultPadding(),
+ getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+ numContainers, getDefaultPadding()));
}
/**
@@ -712,43 +491,29 @@ public class RoundRobinPackingTest {
*/
@Test
public void testRepackingWithMoreTotalInstances() throws Exception {
- int numContainers = 2;
int componentParallelism = 4;
+ boltParallelism = componentParallelism;
+ spoutParallelism = componentParallelism;
- // Set up the topology and its config
- org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
- topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
- TopologyAPI.Topology topology =
- getTopology(componentParallelism, componentParallelism, topologyConfig);
-
- int numInstance = TopologyUtils.getTotalInstance(topology);
- // Two components
- Assert.assertEquals(2 * componentParallelism, numInstance);
+ PackingPlan packingPlan = doPackingTestWithPartialResource(topology,
+ Optional.empty(), Optional.empty(), boltParallelism,
+ Optional.empty(), Optional.empty(), spoutParallelism,
+ numContainers, getDefaultPadding(),
+ getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+ numContainers, getDefaultPadding()));
Map<String, Integer> componentChanges = new HashMap<>();
componentChanges.put(SPOUT_NAME, +1);
componentChanges.put(BOLT_NAME, +1);
- PackingPlan output = getRoundRobinRePackingPlan(topology, componentChanges);
- Assert.assertEquals(numContainers + 1, output.getContainers().size());
- Assert.assertEquals((Integer) (numInstance + 2), output.getInstanceCount());
-
- int spoutCount = 0;
- int boltCount = 0;
- for (PackingPlan.ContainerPlan container : output.getContainers()) {
- Assert.assertTrue((double) container.getInstances().size()
- <= (double) numInstance / numContainers);
-
- for (PackingPlan.InstancePlan instancePlan : container.getInstances()) {
- if (SPOUT_NAME.equals(instancePlan.getComponentName())) {
- spoutCount++;
- } else if (BOLT_NAME.equals(instancePlan.getComponentName())) {
- boltCount++;
- }
- }
- }
- Assert.assertEquals(componentParallelism + 1, spoutCount);
- Assert.assertEquals(componentParallelism + 1, boltCount);
+
+ doScalingTestWithPartialResource(topology, packingPlan, componentChanges,
+ Optional.empty(), Optional.empty(), boltParallelism,
+ Optional.empty(), Optional.empty(), spoutParallelism,
+ numContainers + 1, getDefaultPadding(),
+ getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+ numContainers, getDefaultPadding()));
}
/**
@@ -756,54 +521,28 @@ public class RoundRobinPackingTest {
*/
@Test
public void testRepackingWithFewerTotalInstances() throws Exception {
- int numContainers = 2;
int componentParallelism = 4;
+ boltParallelism = componentParallelism;
+ spoutParallelism = componentParallelism;
- // Set up the topology and its config
- org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
- topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
-
- TopologyAPI.Topology topology =
- getTopology(componentParallelism, componentParallelism, topologyConfig);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
- int numInstance = TopologyUtils.getTotalInstance(topology);
- // Two components
- Assert.assertEquals(2 * componentParallelism, numInstance);
+ PackingPlan packingPlan = doPackingTestWithPartialResource(topology,
+ Optional.empty(), Optional.empty(), boltParallelism,
+ Optional.empty(), Optional.empty(), spoutParallelism,
+ numContainers, getDefaultPadding(),
+ getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+ numContainers, getDefaultPadding()));
Map<String, Integer> componentChanges = new HashMap<>();
componentChanges.put(SPOUT_NAME, -2);
componentChanges.put(BOLT_NAME, -2);
- PackingPlan output = getRoundRobinRePackingPlan(topology, componentChanges);
- Assert.assertEquals(numContainers - 1, output.getContainers().size());
- Assert.assertEquals((Integer) (numInstance - 4), output.getInstanceCount());
-
- int spoutCount = 0;
- int boltCount = 0;
- for (PackingPlan.ContainerPlan container : output.getContainers()) {
- Assert.assertTrue((double) container.getInstances().size()
- <= (double) numInstance / numContainers);
-
- for (PackingPlan.InstancePlan instancePlan : container.getInstances()) {
- if (SPOUT_NAME.equals(instancePlan.getComponentName())) {
- spoutCount++;
- } else if (BOLT_NAME.equals(instancePlan.getComponentName())) {
- boltCount++;
- }
- }
- }
- Assert.assertEquals(componentParallelism - 2, spoutCount);
- Assert.assertEquals(componentParallelism - 2, boltCount);
- }
- private static void assertComponentCount(
- PackingPlan.ContainerPlan containerPlan, String componentName, int expectedCount) {
- int count = 0;
- for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
- if (componentName.equals(instancePlan.getComponentName())) {
- count++;
- }
- }
- Assert.assertEquals(expectedCount, count);
+ doScalingTestWithPartialResource(topology, packingPlan, componentChanges,
+ Optional.empty(), Optional.empty(), boltParallelism,
+ Optional.empty(), Optional.empty(), spoutParallelism,
+ numContainers - 1, getDefaultPadding(),
+ getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+ numContainers, getDefaultPadding()));
}
-
}
diff --git a/heron/spi/src/java/org/apache/heron/spi/packing/Resource.java b/heron/spi/src/java/org/apache/heron/spi/packing/Resource.java
index 385680b..e1b8c1e 100644
--- a/heron/spi/src/java/org/apache/heron/spi/packing/Resource.java
+++ b/heron/spi/src/java/org/apache/heron/spi/packing/Resource.java
@@ -63,6 +63,14 @@ public class Resource {
return new Resource(this.getCpu(), newRam, this.getDisk());
}
+ public Resource cloneWithCpu(double newCpu) {
+ return new Resource(newCpu, this.getRam(), this.getDisk());
+ }
+
+ public Resource cloneWithDisk(ByteAmount newDisk) {
+ return new Resource(this.getCpu(), this.getRam(), newDisk);
+ }
+
/**
* Subtracts a given resource from the current resource. The results is never negative.
*/