You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nw...@apache.org on 2019/02/15 01:15:42 UTC
[incubator-heron] branch master updated: Consolidate Packing (#3187)
This is an automated email from the ASF dual-hosted git repository.
nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new dc4c733 Consolidate Packing (#3187)
dc4c733 is described below
commit dc4c733afc92bec8c68e7e11a9050433346327ef
Author: Xiaoyao Qian <qi...@illinois.edu>
AuthorDate: Thu Feb 14 17:15:38 2019 -0800
Consolidate Packing (#3187)
* init
* abstract common behaviors in RCRR
---
.../org/apache/heron/packing/AbstractPacking.java | 176 ++++++
.../binpacking/FirstFitDecreasingPacking.java | 91 +---
.../apache/heron/packing/builder/Container.java | 65 +--
.../heron/packing/builder/PackingPlanBuilder.java | 165 ++----
.../packing/{ => builder}/RamRequirement.java | 2 +-
.../roundrobin/ResourceCompliantRRPacking.java | 86 +--
.../packing/roundrobin/RoundRobinPacking.java | 2 +-
.../apache/heron/packing/utils/PackingUtils.java | 103 ++--
.../org/apache/heron/packing/AssertPacking.java | 51 +-
.../apache/heron/packing/CommonPackingTests.java | 118 ++--
.../apache/heron/packing/PackingTestHelper.java | 49 +-
.../binpacking/FirstFitDecreasingPackingTest.java | 600 ++++++++-------------
.../packing/builder/PackingPlanBuilderTest.java | 23 +-
.../apache/heron/packing/builder/ScorerTest.java | 6 +-
.../roundrobin/ResourceCompliantRRPackingTest.java | 567 ++++++++-----------
.../packing/roundrobin/RoundRobinPackingTest.java | 2 +-
.../heron/scheduler/RuntimeManagerMainTest.java | 3 +-
.../org/apache/heron/spi/packing/Resource.java | 3 +
18 files changed, 923 insertions(+), 1189 deletions(-)
diff --git a/heron/packing/src/java/org/apache/heron/packing/AbstractPacking.java b/heron/packing/src/java/org/apache/heron/packing/AbstractPacking.java
new file mode 100644
index 0000000..9aa64fd
--- /dev/null
+++ b/heron/packing/src/java/org/apache/heron/packing/AbstractPacking.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.packing;
+
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Logger;
+
+import org.apache.heron.api.generated.TopologyAPI;
+import org.apache.heron.api.utils.TopologyUtils;
+import org.apache.heron.common.basics.ByteAmount;
+import org.apache.heron.packing.utils.PackingUtils;
+import org.apache.heron.spi.common.Config;
+import org.apache.heron.spi.common.Context;
+import org.apache.heron.spi.packing.IPacking;
+import org.apache.heron.spi.packing.IRepacking;
+import org.apache.heron.spi.packing.Resource;
+
+import static org.apache.heron.api.Config.TOPOLOGY_CONTAINER_CPU_PADDING;
+import static org.apache.heron.api.Config.TOPOLOGY_CONTAINER_CPU_REQUESTED;
+import static org.apache.heron.api.Config.TOPOLOGY_CONTAINER_DISK_REQUESTED;
+import static org.apache.heron.api.Config.TOPOLOGY_CONTAINER_MAX_NUM_INSTANCES;
+import static org.apache.heron.api.Config.TOPOLOGY_CONTAINER_PADDING_PERCENTAGE;
+import static org.apache.heron.api.Config.TOPOLOGY_CONTAINER_RAM_PADDING;
+import static org.apache.heron.api.Config.TOPOLOGY_CONTAINER_RAM_REQUESTED;
+
+/**
+ * Common configuration finalization for packing algorithms
+ * Packing algorithms that extend this class should assume that:
+ * <p>
+ * 1. Instance default resources are read from:
+ * heron.resources.instance.ram,
+ * heron.resources.instance.cpu,
+ * heron.resources.instance.disk
+ * <p>
+ * 2. Padding resource percentage is read from:
+ * topology.container.padding.percentage
+ * or taken from PackingUtils.DEFAULT_CONTAINER_PADDING_PERCENTAGE
+ * <p>
+ * 3. Padding resource values are read from:
+ * topology.container.ram.padding or taken from PackingUtils.DEFAULT_CONTAINER_RAM_PADDING,
+ * topology.container.cpu.padding or taken from PackingUtils.DEFAULT_CONTAINER_CPU_PADDING
+ * <p>
+ * 4. Max number of instances per container is read from:
+ * topology.container.max.instances
+ * or taken from PackingUtils.DEFAULT_MAX_NUM_INSTANCES_PER_CONTAINER
+ * <p>
+ * 5. Container resources requirements are read from:
+ * topology.container.cpu or calculated from maxNumInstancesPerContainer * instanceDefaultCpu
+ * topology.container.ram or calculated from maxNumInstancesPerContainer * instanceDefaultRam
+ * topology.container.disk or calculated from maxNumInstancesPerContainer * instanceDefaultDisk
+ * <p>
+ * 6. Padding resource is finalized by:
+ * Math.max(containerResource * paddingPercentage, paddingValue)
+ * <p>
+ *
+ * Subclasses that extend this class should just need to create PackingPlanBuilder with:
+ * defaultInstanceResource,
+ * maxContainerResource,
+ * containerPadding,
+ * componentResourceMap,
+ * instanceConstraints,
+ * and packingConstraints set.
+ */
+public abstract class AbstractPacking implements IPacking, IRepacking {
+ private static final Logger LOG = Logger.getLogger(AbstractPacking.class.getName());
+ protected TopologyAPI.Topology topology;
+
+ // instance & container
+ protected Resource defaultInstanceResources;
+ protected Resource maxContainerResources;
+ protected int maxNumInstancesPerContainer;
+ protected Map<String, Resource> componentResourceMap;
+
+ // padding
+ protected Resource padding;
+
+ @Override
+ public void initialize(Config config, TopologyAPI.Topology inputTopology) {
+ this.topology = inputTopology;
+ setPackingConfigs(config);
+
+ LOG.info(String.format("Initalizing Packing: \n"
+ + "Max number of instances per container: %d \n"
+ + "Default instance resource, CPU: %f, RAM: %s, DISK: %s \n"
+ + "Paddng: %s \n"
+ + "Container resource, CPU: %f, RAM: %s, DISK: %s",
+ this.maxNumInstancesPerContainer,
+ this.defaultInstanceResources.getCpu(),
+ this.defaultInstanceResources.getRam().toString(),
+ this.defaultInstanceResources.getDisk().toString(),
+ this.padding.toString(),
+ this.maxContainerResources.getCpu(),
+ this.maxContainerResources.getRam().toString(),
+ this.maxContainerResources.getDisk().toString()));
+ }
+
+ /**
+ * Instatiate the packing algorithm parameters related to this topology.
+ */
+ private void setPackingConfigs(Config config) {
+ List<TopologyAPI.Config.KeyValue> topologyConfig = topology.getTopologyConfig().getKvsList();
+
+ // instance default resources are acquired from heron system level config
+ this.defaultInstanceResources = new Resource(
+ Context.instanceCpu(config),
+ Context.instanceRam(config),
+ Context.instanceDisk(config));
+
+ int paddingPercentage = TopologyUtils.getConfigWithDefault(topologyConfig,
+ TOPOLOGY_CONTAINER_PADDING_PERCENTAGE, PackingUtils.DEFAULT_CONTAINER_PADDING_PERCENTAGE);
+ ByteAmount ramPadding = TopologyUtils.getConfigWithDefault(topologyConfig,
+ TOPOLOGY_CONTAINER_RAM_PADDING, PackingUtils.DEFAULT_CONTAINER_RAM_PADDING);
+ double cpuPadding = TopologyUtils.getConfigWithDefault(topologyConfig,
+ TOPOLOGY_CONTAINER_CPU_PADDING, PackingUtils.DEFAULT_CONTAINER_CPU_PADDING);
+ Resource preliminaryPadding = new Resource(cpuPadding, ramPadding,
+ PackingUtils.DEFAULT_CONTAINER_DISK_PADDING);
+
+ this.maxNumInstancesPerContainer = TopologyUtils.getConfigWithDefault(topologyConfig,
+ TOPOLOGY_CONTAINER_MAX_NUM_INSTANCES, PackingUtils.DEFAULT_MAX_NUM_INSTANCES_PER_CONTAINER);
+
+ // container default resources are computed as:
+ // max number of instances per container * default instance resources
+ double containerDefaultCpu = this.defaultInstanceResources.getCpu()
+ * maxNumInstancesPerContainer;
+ ByteAmount containerDefaultRam = this.defaultInstanceResources.getRam()
+ .multiply(maxNumInstancesPerContainer);
+ ByteAmount containerDefaultDisk = this.defaultInstanceResources.getDisk()
+ .multiply(maxNumInstancesPerContainer);
+
+ double containerCpu = TopologyUtils.getConfigWithDefault(topologyConfig,
+ TOPOLOGY_CONTAINER_CPU_REQUESTED, containerDefaultCpu);
+ ByteAmount containerRam = TopologyUtils.getConfigWithDefault(topologyConfig,
+ TOPOLOGY_CONTAINER_RAM_REQUESTED, containerDefaultRam);
+ ByteAmount containerDisk = TopologyUtils.getConfigWithDefault(topologyConfig,
+ TOPOLOGY_CONTAINER_DISK_REQUESTED, containerDefaultDisk);
+ Resource containerResource = new Resource(containerCpu,
+ containerRam, containerDisk);
+
+ // finalize padding
+ this.padding = PackingUtils.finalizePadding(containerResource,
+ preliminaryPadding, paddingPercentage);
+
+ // finalize container resources
+ this.maxContainerResources = containerResource;
+
+ this.componentResourceMap = PackingUtils.getComponentResourceMap(
+ TopologyUtils.getComponentParallelism(topology).keySet(),
+ TopologyUtils.getComponentRamMapConfig(topology),
+ TopologyUtils.getComponentCpuMapConfig(topology),
+ TopologyUtils.getComponentDiskMapConfig(topology),
+ defaultInstanceResources
+ );
+ }
+
+ @Override
+ public void close() {
+ }
+}
diff --git a/heron/packing/src/java/org/apache/heron/packing/binpacking/FirstFitDecreasingPacking.java b/heron/packing/src/java/org/apache/heron/packing/binpacking/FirstFitDecreasingPacking.java
index ab9b650..f8e7523 100644
--- a/heron/packing/src/java/org/apache/heron/packing/binpacking/FirstFitDecreasingPacking.java
+++ b/heron/packing/src/java/org/apache/heron/packing/binpacking/FirstFitDecreasingPacking.java
@@ -26,34 +26,24 @@ import java.util.Map;
import java.util.Set;
import java.util.logging.Logger;
-import org.apache.heron.api.generated.TopologyAPI;
import org.apache.heron.api.utils.TopologyUtils;
-import org.apache.heron.common.basics.ByteAmount;
-import org.apache.heron.packing.RamRequirement;
+import org.apache.heron.packing.AbstractPacking;
import org.apache.heron.packing.builder.Container;
import org.apache.heron.packing.builder.ContainerIdScorer;
import org.apache.heron.packing.builder.HomogeneityScorer;
import org.apache.heron.packing.builder.InstanceCountScorer;
import org.apache.heron.packing.builder.PackingPlanBuilder;
+import org.apache.heron.packing.builder.RamRequirement;
import org.apache.heron.packing.builder.Scorer;
import org.apache.heron.packing.constraints.MinRamConstraint;
import org.apache.heron.packing.constraints.ResourceConstraint;
import org.apache.heron.packing.exceptions.ConstraintViolationException;
import org.apache.heron.packing.exceptions.ResourceExceededException;
import org.apache.heron.packing.utils.PackingUtils;
-import org.apache.heron.spi.common.Config;
-import org.apache.heron.spi.common.Context;
-import org.apache.heron.spi.packing.IPacking;
-import org.apache.heron.spi.packing.IRepacking;
import org.apache.heron.spi.packing.PackingException;
import org.apache.heron.spi.packing.PackingPlan;
import org.apache.heron.spi.packing.Resource;
-import static org.apache.heron.api.Config.TOPOLOGY_CONTAINER_MAX_CPU_HINT;
-import static org.apache.heron.api.Config.TOPOLOGY_CONTAINER_MAX_DISK_HINT;
-import static org.apache.heron.api.Config.TOPOLOGY_CONTAINER_MAX_RAM_HINT;
-import static org.apache.heron.api.Config.TOPOLOGY_CONTAINER_PADDING_PERCENTAGE;
-
/**
* FirstFitDecreasing packing algorithm
* <p>
@@ -99,72 +89,18 @@ import static org.apache.heron.api.Config.TOPOLOGY_CONTAINER_PADDING_PERCENTAGE;
* 10. The pack() return null if PackingPlan fails to pass the safe check, for instance,
* the size of RAM for an instance is less than the minimal required value.
*/
-public class FirstFitDecreasingPacking implements IPacking, IRepacking {
-
- private static final int DEFAULT_CONTAINER_PADDING_PERCENTAGE = 10;
- private static final int DEFAULT_NUMBER_INSTANCES_PER_CONTAINER = 4;
+public class FirstFitDecreasingPacking extends AbstractPacking {
private static final Logger LOG = Logger.getLogger(FirstFitDecreasingPacking.class.getName());
- private TopologyAPI.Topology topology;
- private Resource defaultInstanceResources;
- private Resource maxContainerResources;
- private int paddingPercentage;
-
private int numContainers = 0;
- @Override
- public void initialize(Config config, TopologyAPI.Topology inputTopology) {
- this.topology = inputTopology;
- setPackingConfigs(config);
- LOG.info(String.format("Initalizing FirstFitDecreasingPacking. "
- + "CPU default: %f, RAM default: %s, DISK default: %s, Paddng percentage: %d, "
- + "CPU max: %f, RAM max: %s, DISK max: %s.",
- this.defaultInstanceResources.getCpu(),
- this.defaultInstanceResources.getRam().toString(),
- this.defaultInstanceResources.getDisk().toString(),
- this.paddingPercentage,
- this.maxContainerResources.getCpu(),
- this.maxContainerResources.getRam().toString(),
- this.maxContainerResources.getDisk().toString()));
- }
-
- /**
- * Instatiate the packing algorithm parameters related to this topology.
- */
- private void setPackingConfigs(Config config) {
- List<TopologyAPI.Config.KeyValue> topologyConfig = topology.getTopologyConfig().getKvsList();
-
- this.defaultInstanceResources = new Resource(
- Context.instanceCpu(config),
- Context.instanceRam(config),
- Context.instanceDisk(config));
-
- this.paddingPercentage = TopologyUtils.getConfigWithDefault(topologyConfig,
- TOPOLOGY_CONTAINER_PADDING_PERCENTAGE, DEFAULT_CONTAINER_PADDING_PERCENTAGE);
-
- double defaultCpu = this.defaultInstanceResources.getCpu()
- * DEFAULT_NUMBER_INSTANCES_PER_CONTAINER;
- ByteAmount defaultRam = this.defaultInstanceResources.getRam()
- .multiply(DEFAULT_NUMBER_INSTANCES_PER_CONTAINER);
- ByteAmount defaultDisk = this.defaultInstanceResources.getDisk()
- .multiply(DEFAULT_NUMBER_INSTANCES_PER_CONTAINER);
-
- this.maxContainerResources = new Resource(
- TopologyUtils.getConfigWithDefault(topologyConfig, TOPOLOGY_CONTAINER_MAX_CPU_HINT,
- (double) Math.round(PackingUtils.increaseBy(defaultCpu, paddingPercentage))),
- TopologyUtils.getConfigWithDefault(topologyConfig, TOPOLOGY_CONTAINER_MAX_RAM_HINT,
- defaultRam.increaseBy(paddingPercentage)),
- TopologyUtils.getConfigWithDefault(topologyConfig, TOPOLOGY_CONTAINER_MAX_DISK_HINT,
- defaultDisk.increaseBy(paddingPercentage)));
- }
-
private PackingPlanBuilder newPackingPlanBuilder(PackingPlan existingPackingPlan) {
return new PackingPlanBuilder(topology.getId(), existingPackingPlan)
- .setMaxContainerResource(maxContainerResources)
.setDefaultInstanceResource(defaultInstanceResources)
- .setRequestedContainerPadding(paddingPercentage)
- .setRequestedComponentRam(TopologyUtils.getComponentRamMapConfig(topology))
+ .setMaxContainerResource(maxContainerResources)
+ .setRequestedContainerPadding(padding)
+ .setRequestedComponentResource(componentResourceMap)
.setInstanceConstraints(Collections.singletonList(new MinRamConstraint()))
.setPackingConstraints(Collections.singletonList(new ResourceConstraint()));
}
@@ -213,11 +149,6 @@ public class FirstFitDecreasingPacking implements IPacking, IRepacking {
+ " creating a new packing plan with a new number of containers.");
}
- @Override
- public void close() {
-
- }
-
/**
* Sort the components in decreasing order based on their RAM requirements
*
@@ -225,12 +156,9 @@ public class FirstFitDecreasingPacking implements IPacking, IRepacking {
*/
private ArrayList<RamRequirement> getSortedRAMInstances(Set<String> componentNames) {
ArrayList<RamRequirement> ramRequirements = new ArrayList<>();
- Map<String, ByteAmount> ramMap = TopologyUtils.getComponentRamMapConfig(topology);
-
for (String componentName : componentNames) {
- Resource requiredResource = PackingUtils.getResourceRequirement(
- componentName, ramMap, this.defaultInstanceResources,
- this.maxContainerResources, this.paddingPercentage);
+ Resource requiredResource = this.componentResourceMap.getOrDefault(componentName,
+ defaultInstanceResources);
ramRequirements.add(new RamRequirement(componentName, requiredResource.getRam()));
}
Collections.sort(ramRequirements, Collections.reverseOrder());
@@ -285,7 +213,8 @@ public class FirstFitDecreasingPacking implements IPacking, IRepacking {
*/
private void assignInstancesToContainers(PackingPlanBuilder planBuilder,
Map<String, Integer> parallelismMap) throws ConstraintViolationException {
- ArrayList<RamRequirement> ramRequirements = getSortedRAMInstances(parallelismMap.keySet());
+ ArrayList<RamRequirement> ramRequirements
+ = getSortedRAMInstances(parallelismMap.keySet());
for (RamRequirement ramRequirement : ramRequirements) {
String componentName = ramRequirement.getComponentName();
int numInstance = parallelismMap.get(componentName);
diff --git a/heron/packing/src/java/org/apache/heron/packing/builder/Container.java b/heron/packing/src/java/org/apache/heron/packing/builder/Container.java
index 4ee84cb..806f4ca 100644
--- a/heron/packing/src/java/org/apache/heron/packing/builder/Container.java
+++ b/heron/packing/src/java/org/apache/heron/packing/builder/Container.java
@@ -23,9 +23,6 @@ import java.util.HashSet;
import com.google.common.base.Optional;
-import org.apache.heron.common.basics.ByteAmount;
-import org.apache.heron.packing.exceptions.ResourceExceededException;
-import org.apache.heron.packing.utils.PackingUtils;
import org.apache.heron.spi.packing.PackingException;
import org.apache.heron.spi.packing.PackingPlan;
import org.apache.heron.spi.packing.Resource;
@@ -39,20 +36,20 @@ public class Container {
private int containerId;
private HashSet<PackingPlan.InstancePlan> instances;
private Resource capacity;
- private int paddingPercentage;
+ private Resource padding;
/**
* Creates a container with a specific capacity which will maintain a specific percentage
* of its resources for padding.
*
* @param capacity the capacity of the container in terms of CPU, RAM and disk
- * @param paddingPercentage the padding percentage
+ * @param padding the padding
*/
- Container(int containerId, Resource capacity, int paddingPercentage) {
+ Container(int containerId, Resource capacity, Resource padding) {
this.containerId = containerId;
this.capacity = capacity;
this.instances = new HashSet<PackingPlan.InstancePlan>();
- this.paddingPercentage = paddingPercentage;
+ this.padding = padding;
}
public int getContainerId() {
@@ -67,20 +64,19 @@ public class Container {
return capacity;
}
- int getPaddingPercentage() {
- return paddingPercentage;
+ public Resource getPadding() {
+ return padding;
}
/**
* Update the resources currently used by the container, when a new instance with specific
* resource requirements has been assigned to the container.
*/
- void add(PackingPlan.InstancePlan instancePlan) throws ResourceExceededException {
+ void add(PackingPlan.InstancePlan instancePlan) {
if (this.instances.contains(instancePlan)) {
throw new PackingException(String.format(
"Instance %s already exists in container %s", instancePlan, toString()));
}
- assertHasSpace(instancePlan.getResource());
this.instances.add(instancePlan);
}
@@ -103,8 +99,8 @@ public class Container {
@Override
public String toString() {
- return String.format("{containerId=%s, instances=%s, capacity=%s, paddingPercentage=%s}",
- containerId, instances, capacity, paddingPercentage);
+ return String.format("{containerId=%s, instances=%s, capacity=%s, padding=%s}",
+ containerId, instances, capacity, padding);
}
/**
@@ -151,50 +147,15 @@ public class Container {
}
/**
- * Check whether the container can accommodate a new instance with specific resource requirements
- */
- private void assertHasSpace(Resource resource) throws ResourceExceededException {
- Resource usedResources = this.getTotalUsedResources();
- ByteAmount newRam =
- usedResources.getRam().plus(resource.getRam()).increaseBy(paddingPercentage);
- double newCpu = Math.round(
- PackingUtils.increaseBy(usedResources.getCpu() + resource.getCpu(), paddingPercentage));
- ByteAmount newDisk =
- usedResources.getDisk().plus(resource.getDisk()).increaseBy(paddingPercentage);
-
- if (newRam.greaterThan(this.capacity.getRam())) {
- throw new ResourceExceededException(String.format("Adding %s bytes of RAM to existing %s "
- + "bytes with %d percent padding would exceed capacity %s",
- resource.getRam(), usedResources.getRam(), paddingPercentage, this.capacity.getRam()));
- }
- if (newCpu > this.capacity.getCpu()) {
- throw new ResourceExceededException(String.format("Adding %s cores to existing %s "
- + "cores with %d percent padding would exceed capacity %s",
- resource.getCpu(), usedResources.getCpu(), paddingPercentage, this.capacity.getCpu()));
- }
- if (newDisk.greaterThan(this.capacity.getDisk())) {
- throw new ResourceExceededException(String.format("Adding %s bytes of disk to existing %s "
- + "bytes with %s percent padding would exceed capacity %s",
- resource.getDisk(), usedResources.getDisk(), paddingPercentage, this.capacity.getDisk()));
- }
- }
-
- /**
* Computes the used resources of the container by taking into account the resources
* allocated for each instance.
*
* @return a Resource object that describes the used CPU, RAM and disk in the container.
*/
public Resource getTotalUsedResources() {
- ByteAmount usedRam = ByteAmount.ZERO;
- double usedCpuCores = 0;
- ByteAmount usedDisk = ByteAmount.ZERO;
- for (PackingPlan.InstancePlan instancePlan : this.instances) {
- Resource resource = instancePlan.getResource();
- usedRam = usedRam.plus(resource.getRam());
- usedCpuCores += resource.getCpu();
- usedDisk = usedDisk.plus(resource.getDisk());
- }
- return new Resource(usedCpuCores, usedRam, usedDisk);
+ return getInstances().stream()
+ .map(PackingPlan.InstancePlan::getResource)
+ .reduce(Resource.EMPTY_RESOURCE, Resource::plus)
+ .plus(getPadding());
}
}
diff --git a/heron/packing/src/java/org/apache/heron/packing/builder/PackingPlanBuilder.java b/heron/packing/src/java/org/apache/heron/packing/builder/PackingPlanBuilder.java
index 64f4b8e..d9fe4db 100644
--- a/heron/packing/src/java/org/apache/heron/packing/builder/PackingPlanBuilder.java
+++ b/heron/packing/src/java/org/apache/heron/packing/builder/PackingPlanBuilder.java
@@ -24,7 +24,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
@@ -37,33 +36,33 @@ import java.util.logging.Logger;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
-import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.packing.constraints.InstanceConstraint;
import org.apache.heron.packing.constraints.PackingConstraint;
import org.apache.heron.packing.exceptions.ConstraintViolationException;
import org.apache.heron.packing.exceptions.ResourceExceededException;
-import org.apache.heron.packing.utils.PackingUtils;
import org.apache.heron.spi.packing.InstanceId;
import org.apache.heron.spi.packing.PackingException;
import org.apache.heron.spi.packing.PackingPlan;
import org.apache.heron.spi.packing.Resource;
/**
- * Class the help with building packing plans.
+ * Class that helps with building packing plans.
*/
public class PackingPlanBuilder {
private static final Logger LOG = Logger.getLogger(PackingPlanBuilder.class.getName());
private final String topologyId;
private final PackingPlan existingPacking;
+
private Resource defaultInstanceResource;
private Resource maxContainerResource;
- private Map<String, ByteAmount> componentRamMap;
- private int requestedContainerPadding;
- private int numContainers;
+ private Map<String, Resource> componentResourceMap;
+ private Resource requestedContainerPadding;
private List<PackingConstraint> packingConstraints;
private List<InstanceConstraint> instanceConstraints;
+ private int numContainers;
+
private Map<Integer, Container> containers;
private TreeSet<Integer> taskIds; // globally unique ids assigned to instances
private HashMap<String, TreeSet<Integer>> componentIndexes; // componentName -> componentIndexes
@@ -76,12 +75,15 @@ public class PackingPlanBuilder {
this.topologyId = topologyId;
this.existingPacking = existingPacking;
this.numContainers = 0;
- this.requestedContainerPadding = 0;
- this.componentRamMap = new HashMap<>();
+ this.requestedContainerPadding = Resource.EMPTY_RESOURCE;
this.packingConstraints = new ArrayList<>();
this.instanceConstraints = new ArrayList<>();
}
+ public Map<Integer, Container> getContainers() {
+ return containers;
+ }
+
// set resource settings
public PackingPlanBuilder setDefaultInstanceResource(Resource resource) {
this.defaultInstanceResource = resource;
@@ -93,13 +95,13 @@ public class PackingPlanBuilder {
return this;
}
- public PackingPlanBuilder setRequestedComponentRam(Map<String, ByteAmount> ramMap) {
- this.componentRamMap = ramMap;
+ public PackingPlanBuilder setRequestedComponentResource(Map<String, Resource> resourceMap) {
+ this.componentResourceMap = resourceMap;
return this;
}
- public PackingPlanBuilder setRequestedContainerPadding(int percent) {
- this.requestedContainerPadding = percent;
+ public PackingPlanBuilder setRequestedContainerPadding(Resource padding) {
+ this.requestedContainerPadding = padding;
return this;
}
@@ -125,17 +127,16 @@ public class PackingPlanBuilder {
// updateNumContainers method
public PackingPlanBuilder addInstance(Integer containerId,
String componentName) throws ConstraintViolationException {
+ // create container if not existed
initContainer(containerId);
Integer taskId = taskIds.isEmpty() ? 1 : taskIds.last() + 1;
- Integer componentIndex = componentIndexes.get(componentName) != null
+ Integer componentIndex = componentIndexes.containsKey(componentName)
? componentIndexes.get(componentName).last() + 1 : 0;
-
InstanceId instanceId = new InstanceId(componentName, taskId, componentIndex);
- Resource instanceResource = PackingUtils.getResourceRequirement(
- componentName, this.componentRamMap, this.defaultInstanceResource,
- this.maxContainerResource, this.requestedContainerPadding);
+ Resource instanceResource = componentResourceMap.getOrDefault(componentName,
+ defaultInstanceResource);
Container container = containers.get(containerId);
PackingPlan.InstancePlan instancePlan
@@ -158,23 +159,23 @@ public class PackingPlanBuilder {
/**
* Add an instance to the first container possible ranked by score.
* @return containerId of the container the instance was added to
- * @throws org.apache.heron.packing.ResourceExceededException if the instance could not be added
+ * @throws ResourceExceededException if the instance could not be added
*/
public int addInstance(Scorer<Container> scorer,
String componentName) throws ResourceExceededException {
List<Scorer<Container>> scorers = new LinkedList<>();
scorers.add(scorer);
- return addInstance(scorers, componentName);
+ return addInstanceToExistingContainers(scorers, componentName);
}
@SuppressWarnings("JavadocMethod")
/**
- * Add an instance to the first container possible ranked by score. If a scoring tie exists,
- * uses the next scorer in the scorers list to break the tie.
+ * Add an instance to the first container possible ranked by score.
+ * If a scoring tie exists, uses the next scorer in the scorers list to break the tie.
* @return containerId of the container the instance was added to
- * @throws org.apache.heron.packing.ResourceExceededException if the instance could not be added
+ * @throws ResourceExceededException if no existing container can accommodate the instance
*/
- private int addInstance(List<Scorer<Container>> scorers, String componentName)
+ public int addInstanceToExistingContainers(List<Scorer<Container>> scorers, String componentName)
throws ResourceExceededException {
initContainers();
for (Container container : sortContainers(scorers, this.containers.values())) {
@@ -203,7 +204,7 @@ public class PackingPlanBuilder {
container.removeAnyInstanceOfComponent(componentName);
if (instancePlan.isPresent()) {
taskIds.remove(instancePlan.get().getTaskId());
- if (componentIndexes.get(componentName) != null) {
+ if (componentIndexes.containsKey(componentName)) {
componentIndexes.get(componentName).remove(instancePlan.get().getComponentIndex());
}
} else {
@@ -248,65 +249,52 @@ public class PackingPlanBuilder {
// build container plan sets by summing up instance resources
public PackingPlan build() {
assertResourceSettings();
- Set<PackingPlan.ContainerPlan> containerPlans = buildContainerPlans(
- this.containers, this.componentRamMap,
- this.defaultInstanceResource, this.requestedContainerPadding);
-
+ Set<PackingPlan.ContainerPlan> containerPlans = buildContainerPlans(this.containers);
return new PackingPlan(topologyId, containerPlans);
}
private void initContainers() {
assertResourceSettings();
- Map<Integer, Container> newContainerMap = this.containers;
- HashMap<String, TreeSet<Integer>> newComponentIndexes = this.componentIndexes;
- TreeSet<Integer> newTaskIds = this.taskIds;
- if (newComponentIndexes == null) {
- newComponentIndexes = new HashMap<>();
+ if (this.componentIndexes == null) {
+ this.componentIndexes = new HashMap<>();
}
- if (newTaskIds == null) {
- newTaskIds = new TreeSet<>();
+ if (this.taskIds == null) {
+ this.taskIds = new TreeSet<>();
}
// if this is the first time called, initialize container map with empty or existing containers
- if (newContainerMap == null) {
+ if (this.containers == null) {
if (this.existingPacking == null) {
- newContainerMap = new HashMap<>();
+ this.containers = new HashMap<>();
for (int containerId = 1; containerId <= numContainers; containerId++) {
- newContainerMap.put(containerId, new Container(
+ this.containers.put(containerId, new Container(
containerId, this.maxContainerResource, this.requestedContainerPadding));
}
} else {
- try {
- newContainerMap = getContainers(this.existingPacking, this.requestedContainerPadding,
- newComponentIndexes, newTaskIds);
- } catch (ResourceExceededException e) {
- throw new PackingException(
- "Could not initialize containers using existing packing plan", e);
- }
+ this.containers = getContainers(this.existingPacking,
+ this.maxContainerResource,
+ this.requestedContainerPadding,
+ this.componentIndexes, this.taskIds);
}
}
- if (this.numContainers > newContainerMap.size()) {
+ if (this.numContainers > this.containers.size()) {
List<Scorer<Container>> scorers = new ArrayList<>();
scorers.add(new ContainerIdScorer());
- List<Container> sortedContainers = sortContainers(scorers, newContainerMap.values());
+ List<Container> sortedContainers = sortContainers(scorers, this.containers.values());
int nextContainerId = sortedContainers.get(sortedContainers.size() - 1).getContainerId() + 1;
Resource capacity =
- newContainerMap.get(sortedContainers.get(0).getContainerId()).getCapacity();
+ this.containers.get(sortedContainers.get(0).getContainerId()).getCapacity();
- for (int i = 0; i < numContainers - newContainerMap.size(); i++) {
- newContainerMap.put(nextContainerId,
+ for (int i = 0; i < numContainers - this.containers.size(); i++) {
+ this.containers.put(nextContainerId,
new Container(nextContainerId, capacity, this.requestedContainerPadding));
nextContainerId++;
}
}
-
- this.containers = newContainerMap;
- this.componentIndexes = newComponentIndexes;
- this.taskIds = newTaskIds;
}
private void initContainer(int containerId) {
@@ -335,10 +323,7 @@ public class PackingPlanBuilder {
* @return container plans
*/
private static Set<PackingPlan.ContainerPlan> buildContainerPlans(
- Map<Integer, Container> containerInstances,
- Map<String, ByteAmount> ramMap,
- Resource instanceDefaults,
- int paddingPercentage) {
+ Map<Integer, Container> containerInstances) {
Set<PackingPlan.ContainerPlan> containerPlans = new LinkedHashSet<>();
for (Integer containerId : containerInstances.keySet()) {
@@ -347,46 +332,13 @@ public class PackingPlanBuilder {
continue;
}
- ByteAmount containerRam = ByteAmount.ZERO;
- ByteAmount containerDiskInBytes = ByteAmount.ZERO;
- double containerCpu = 0;
-
- // Calculate the resource required for single instance
- Set<PackingPlan.InstancePlan> instancePlans = new HashSet<>();
-
- for (PackingPlan.InstancePlan instancePlan : container.getInstances()) {
- InstanceId instanceId = new InstanceId(instancePlan.getComponentName(),
- instancePlan.getTaskId(), instancePlan.getComponentIndex());
- ByteAmount instanceRam;
- if (ramMap.containsKey(instanceId.getComponentName())) {
- instanceRam = ramMap.get(instanceId.getComponentName());
- } else {
- instanceRam = instanceDefaults.getRam();
- }
- containerRam = containerRam.plus(instanceRam);
-
- // Currently not yet support disk or CPU config for different components,
- // so just use the default value.
- ByteAmount instanceDisk = instanceDefaults.getDisk();
- containerDiskInBytes = containerDiskInBytes.plus(instanceDisk);
-
- double instanceCpu = instanceDefaults.getCpu();
- containerCpu += instanceCpu;
-
- // Insert it into the map
- instancePlans.add(new PackingPlan.InstancePlan(instanceId,
- new Resource(instanceCpu, instanceRam, instanceDisk)));
- }
-
- containerCpu += (paddingPercentage * containerCpu) / 100;
- containerRam = containerRam.increaseBy(paddingPercentage);
- containerDiskInBytes = containerDiskInBytes.increaseBy(paddingPercentage);
-
- Resource resource =
- new Resource(Math.round(containerCpu), containerRam, containerDiskInBytes);
+ Resource totalUsedResources = container.getTotalUsedResources();
+ Resource resource = new Resource(
+ Math.round(totalUsedResources.getCpu()),
+ totalUsedResources.getRam(), totalUsedResources.getDisk());
PackingPlan.ContainerPlan containerPlan =
- new PackingPlan.ContainerPlan(containerId, instancePlans, resource);
+ new PackingPlan.ContainerPlan(containerId, container.getInstances(), resource);
containerPlans.add(containerPlan);
}
@@ -402,23 +354,16 @@ public class PackingPlanBuilder {
*/
@VisibleForTesting
static Map<Integer, Container> getContainers(
- PackingPlan currentPackingPlan, int paddingPercentage,
- Map<String, TreeSet<Integer>> componentIndexes, TreeSet<Integer> taskIds)
- throws ResourceExceededException {
+ PackingPlan currentPackingPlan, Resource maxContainerResource, Resource padding,
+ Map<String, TreeSet<Integer>> componentIndexes, TreeSet<Integer> taskIds) {
Map<Integer, Container> containers = new HashMap<>();
- Resource capacity = currentPackingPlan.getMaxContainerResources();
+ Resource capacity = maxContainerResource;
for (PackingPlan.ContainerPlan currentContainerPlan : currentPackingPlan.getContainers()) {
Container container =
- new Container(currentContainerPlan.getId(), capacity, paddingPercentage);
+ new Container(currentContainerPlan.getId(), capacity, padding);
for (PackingPlan.InstancePlan instancePlan : currentContainerPlan.getInstances()) {
- try {
- addToContainer(container, instancePlan, componentIndexes, taskIds);
- } catch (ResourceExceededException e) {
- throw new ResourceExceededException(String.format(
- "Insufficient container resources to add instancePlan %s to container %s",
- instancePlan, container), e);
- }
+ addToContainer(container, instancePlan, componentIndexes, taskIds);
}
containers.put(currentContainerPlan.getId(), container);
}
@@ -439,7 +384,7 @@ public class PackingPlanBuilder {
private static void addToContainer(Container container,
PackingPlan.InstancePlan instancePlan,
Map<String, TreeSet<Integer>> componentIndexes,
- Set<Integer> taskIds) throws ResourceExceededException {
+ Set<Integer> taskIds) {
container.add(instancePlan);
String componentName = instancePlan.getComponentName();
diff --git a/heron/packing/src/java/org/apache/heron/packing/RamRequirement.java b/heron/packing/src/java/org/apache/heron/packing/builder/RamRequirement.java
similarity index 97%
rename from heron/packing/src/java/org/apache/heron/packing/RamRequirement.java
rename to heron/packing/src/java/org/apache/heron/packing/builder/RamRequirement.java
index c57ab11..5c1035d 100644
--- a/heron/packing/src/java/org/apache/heron/packing/RamRequirement.java
+++ b/heron/packing/src/java/org/apache/heron/packing/builder/RamRequirement.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.heron.packing;
+package org.apache.heron.packing.builder;
import org.apache.heron.common.basics.ByteAmount;
diff --git a/heron/packing/src/java/org/apache/heron/packing/roundrobin/ResourceCompliantRRPacking.java b/heron/packing/src/java/org/apache/heron/packing/roundrobin/ResourceCompliantRRPacking.java
index df9718e..da49960 100644
--- a/heron/packing/src/java/org/apache/heron/packing/roundrobin/ResourceCompliantRRPacking.java
+++ b/heron/packing/src/java/org/apache/heron/packing/roundrobin/ResourceCompliantRRPacking.java
@@ -27,7 +27,7 @@ import java.util.logging.Logger;
import org.apache.heron.api.generated.TopologyAPI;
import org.apache.heron.api.utils.TopologyUtils;
-import org.apache.heron.common.basics.ByteAmount;
+import org.apache.heron.packing.AbstractPacking;
import org.apache.heron.packing.builder.Container;
import org.apache.heron.packing.builder.ContainerIdScorer;
import org.apache.heron.packing.builder.HomogeneityScorer;
@@ -40,18 +40,10 @@ import org.apache.heron.packing.exceptions.ConstraintViolationException;
import org.apache.heron.packing.exceptions.ResourceExceededException;
import org.apache.heron.packing.utils.PackingUtils;
import org.apache.heron.spi.common.Config;
-import org.apache.heron.spi.common.Context;
-import org.apache.heron.spi.packing.IPacking;
-import org.apache.heron.spi.packing.IRepacking;
import org.apache.heron.spi.packing.PackingException;
import org.apache.heron.spi.packing.PackingPlan;
import org.apache.heron.spi.packing.Resource;
-import static org.apache.heron.api.Config.TOPOLOGY_CONTAINER_CPU_REQUESTED;
-import static org.apache.heron.api.Config.TOPOLOGY_CONTAINER_DISK_REQUESTED;
-import static org.apache.heron.api.Config.TOPOLOGY_CONTAINER_PADDING_PERCENTAGE;
-import static org.apache.heron.api.Config.TOPOLOGY_CONTAINER_RAM_REQUESTED;
-
/**
* ResourceCompliantRoundRobin packing algorithm
* <p>
@@ -95,16 +87,9 @@ import static org.apache.heron.api.Config.TOPOLOGY_CONTAINER_RAM_REQUESTED;
* 10. The pack() return null if PackingPlan fails to pass the safe check, for instance,
* the size of RAM for an instance is less than the minimal required value.
*/
-public class ResourceCompliantRRPacking implements IPacking, IRepacking {
-
- static final int DEFAULT_CONTAINER_PADDING_PERCENTAGE = 10;
- private static final int DEFAULT_NUMBER_INSTANCES_PER_CONTAINER = 4;
-
+public class ResourceCompliantRRPacking extends AbstractPacking {
private static final Logger LOG = Logger.getLogger(ResourceCompliantRRPacking.class.getName());
- private TopologyAPI.Topology topology;
- private Resource defaultInstanceResources;
-
private int numContainers;
//ContainerId to examine next. It is set to 1 when the
//algorithm restarts with a new number of containers
@@ -124,52 +109,17 @@ public class ResourceCompliantRRPacking implements IPacking, IRepacking {
@Override
public void initialize(Config config, TopologyAPI.Topology inputTopology) {
- this.topology = inputTopology;
+ super.initialize(config, inputTopology);
this.numContainers = TopologyUtils.getNumContainers(topology);
- this.defaultInstanceResources = new Resource(
- Context.instanceCpu(config),
- Context.instanceRam(config),
- Context.instanceDisk(config));
resetToFirstContainer();
-
- LOG.info(String.format("Initializing ResourceCompliantRRPacking. "
- + "CPU default: %f, RAM default: %s, DISK default: %s.",
- this.defaultInstanceResources.getCpu(),
- this.defaultInstanceResources.getRam().toString(),
- this.defaultInstanceResources.getDisk().toString()));
}
private PackingPlanBuilder newPackingPlanBuilder(PackingPlan existingPackingPlan) {
- List<TopologyAPI.Config.KeyValue> topologyConfig = topology.getTopologyConfig().getKvsList();
-
- double defaultCpu = this.defaultInstanceResources.getCpu()
- * DEFAULT_NUMBER_INSTANCES_PER_CONTAINER;
- ByteAmount defaultRam = this.defaultInstanceResources.getRam()
- .multiply(DEFAULT_NUMBER_INSTANCES_PER_CONTAINER);
- ByteAmount defaultDisk = this.defaultInstanceResources.getDisk()
- .multiply(DEFAULT_NUMBER_INSTANCES_PER_CONTAINER);
- int paddingPercentage = TopologyUtils.getConfigWithDefault(topologyConfig,
- TOPOLOGY_CONTAINER_PADDING_PERCENTAGE, DEFAULT_CONTAINER_PADDING_PERCENTAGE);
-
- Resource maxContainerResources = new Resource(
- TopologyUtils.getConfigWithDefault(topologyConfig, TOPOLOGY_CONTAINER_CPU_REQUESTED,
- (double) Math.round(PackingUtils.increaseBy(defaultCpu, paddingPercentage))),
- TopologyUtils.getConfigWithDefault(topologyConfig, TOPOLOGY_CONTAINER_RAM_REQUESTED,
- defaultRam.increaseBy(paddingPercentage)),
- TopologyUtils.getConfigWithDefault(topologyConfig, TOPOLOGY_CONTAINER_DISK_REQUESTED,
- defaultDisk.increaseBy(paddingPercentage)));
-
- LOG.info(String.format("ResourceCompliantRRPacking newPackingPlanBuilder. "
- + "CPU max: %f, RAMmaxMax: %s, DISK max: %s, Padding percentage: %d.",
- maxContainerResources.getCpu(),
- maxContainerResources.getRam().toString(),
- maxContainerResources.getDisk().toString(),
- paddingPercentage));
return new PackingPlanBuilder(topology.getId(), existingPackingPlan)
- .setMaxContainerResource(maxContainerResources)
.setDefaultInstanceResource(defaultInstanceResources)
- .setRequestedContainerPadding(paddingPercentage)
- .setRequestedComponentRam(TopologyUtils.getComponentRamMapConfig(topology))
+ .setMaxContainerResource(maxContainerResources)
+ .setRequestedContainerPadding(padding)
+ .setRequestedComponentResource(componentResourceMap)
.setInstanceConstraints(Collections.singletonList(new MinRamConstraint()))
.setPackingConstraints(Collections.singletonList(new ResourceConstraint()));
}
@@ -190,8 +140,7 @@ public class ResourceCompliantRRPacking implements IPacking, IRepacking {
LOG.finest(String.format(
"%s Increasing the number of containers to %s and attempting to place again.",
e.getMessage(), this.numContainers + 1));
- increaseNumContainers(1);
- resetToFirstContainer();
+ retryWithAdditionalContainer();
}
}
}
@@ -224,15 +173,24 @@ public class ResourceCompliantRRPacking implements IPacking, IRepacking {
} catch (ConstraintViolationException e) {
//Not enough containers. Adjust the number of containers.
- increaseNumContainers(1);
- resetToFirstContainer();
LOG.info(String.format(
- "%s Increasing the number of containers to %s and attempting packing again.",
- e.getMessage(), this.numContainers));
+ "%s Increasing the number of containers to %s and attempting to repack again.",
+ e.getMessage(), this.numContainers + 1));
+ retryWithAdditionalContainer();
}
}
}
+ private void retryWithAdditionalContainer() {
+ increaseNumContainers(1);
+ resetToFirstContainer();
+
+ int totalInstances = TopologyUtils.getTotalInstance(topology);
+ if (numContainers > totalInstances) {
+ throw new PackingException("Cannot add to that container");
+ }
+ }
+
@Override
public PackingPlan repack(PackingPlan currentPackingPlan, int containers,
Map<String, Integer> componentChanges)
@@ -241,10 +199,6 @@ public class ResourceCompliantRRPacking implements IPacking, IRepacking {
+ "currently support creating a new packing plan with a new number of containers.");
}
- @Override
- public void close() {
- }
-
/**
* Computes the additional number of containers needed to accommodate a scale up/down operation
*
diff --git a/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java b/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
index e3508a6..3af3092 100644
--- a/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
+++ b/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
@@ -36,7 +36,7 @@ import org.apache.heron.api.utils.TopologyUtils;
import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.common.basics.CPUShare;
import org.apache.heron.common.basics.ResourceMeasure;
-import org.apache.heron.packing.RamRequirement;
+import org.apache.heron.packing.builder.RamRequirement;
import org.apache.heron.spi.common.Config;
import org.apache.heron.spi.common.Context;
import org.apache.heron.spi.packing.IPacking;
diff --git a/heron/packing/src/java/org/apache/heron/packing/utils/PackingUtils.java b/heron/packing/src/java/org/apache/heron/packing/utils/PackingUtils.java
index 9f9f55d..36ede38 100644
--- a/heron/packing/src/java/org/apache/heron/packing/utils/PackingUtils.java
+++ b/heron/packing/src/java/org/apache/heron/packing/utils/PackingUtils.java
@@ -21,12 +21,12 @@ package org.apache.heron.packing.utils;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import java.util.logging.Logger;
import org.apache.heron.api.generated.TopologyAPI;
import org.apache.heron.api.utils.TopologyUtils;
import org.apache.heron.common.basics.ByteAmount;
-import org.apache.heron.spi.packing.PackingException;
import org.apache.heron.spi.packing.Resource;
/**
@@ -34,66 +34,45 @@ import org.apache.heron.spi.packing.Resource;
*/
public final class PackingUtils {
private static final Logger LOG = Logger.getLogger(PackingUtils.class.getName());
- private static final ByteAmount MIN_RAM_PER_INSTANCE = ByteAmount.fromMegabytes(192);
+
+ // default
+ public static final int DEFAULT_CONTAINER_PADDING_PERCENTAGE = 10;
+ public static final ByteAmount DEFAULT_CONTAINER_RAM_PADDING = ByteAmount.fromGigabytes(1);
+ public static final ByteAmount DEFAULT_CONTAINER_DISK_PADDING = ByteAmount.fromGigabytes(1);
+ public static final double DEFAULT_CONTAINER_CPU_PADDING = 1.0;
+ public static final int DEFAULT_MAX_NUM_INSTANCES_PER_CONTAINER = 4;
private PackingUtils() {
}
/**
- * Verifies the Instance has enough RAM and that it can fit within the container limits.
+ * Compose the component resource map by reading from user configs or default
*
- * @param instanceResources The resources allocated to the instance
- * @throws PackingException if the instance is invalid
+ * @param components component names
+ * @param componentRamMap user configured component ram map
+ * @param componentCpuMap user configured component cpu map
+ * @param componentDiskMap user configured component disk map
+ * @param defaultInstanceResource default instance resources
+ * @return component resource map
*/
- private static void assertIsValidInstance(Resource instanceResources,
- ByteAmount minInstanceRam,
- Resource maxContainerResources,
- int paddingPercentage) throws PackingException {
-
- if (instanceResources.getRam().lessThan(minInstanceRam)) {
- throw new PackingException(String.format(
- "Instance requires RAM %s which is less than the minimum RAM per instance of %s",
- instanceResources.getRam(), minInstanceRam));
- }
-
- ByteAmount instanceRam = instanceResources.getRam().increaseBy(paddingPercentage);
- if (instanceRam.greaterThan(maxContainerResources.getRam())) {
- throw new PackingException(String.format(
- "This instance requires containers of at least %s RAM. The current max container "
- + "size is %s",
- instanceRam, maxContainerResources.getRam()));
- }
-
- double instanceCpu = Math.round(PackingUtils.increaseBy(
- instanceResources.getCpu(), paddingPercentage));
- if (instanceCpu > maxContainerResources.getCpu()) {
- throw new PackingException(String.format(
- "This instance requires containers with at least %s CPU cores. The current max container"
- + "size is %s cores",
- instanceCpu, maxContainerResources.getCpu()));
- }
-
- ByteAmount instanceDisk = instanceResources.getDisk().increaseBy(paddingPercentage);
- if (instanceDisk.greaterThan(maxContainerResources.getDisk())) {
- throw new PackingException(String.format(
- "This instance requires containers of at least %s disk. The current max container"
- + "size is %s",
- instanceDisk, maxContainerResources.getDisk()));
+ public static Map<String, Resource> getComponentResourceMap(
+ Set<String> components,
+ Map<String, ByteAmount> componentRamMap,
+ Map<String, Double> componentCpuMap,
+ Map<String, ByteAmount> componentDiskMap,
+ Resource defaultInstanceResource) {
+ Map<String, Resource> componentResourceMap = new HashMap<>();
+ for (String component : components) {
+ ByteAmount instanceRam = componentRamMap.getOrDefault(component,
+ defaultInstanceResource.getRam());
+ double instanceCpu = componentCpuMap.getOrDefault(component,
+ defaultInstanceResource.getCpu());
+ ByteAmount instanceDisk = componentDiskMap.getOrDefault(component,
+ defaultInstanceResource.getDisk());
+ componentResourceMap.put(component, new Resource(instanceCpu, instanceRam, instanceDisk));
}
- }
- public static Resource getResourceRequirement(String component,
- Map<String, ByteAmount> componentRamMap,
- Resource defaultInstanceResource,
- Resource maxContainerResource,
- int paddingPercentage) {
- ByteAmount instanceRam = defaultInstanceResource.getRam();
- if (componentRamMap.containsKey(component)) {
- instanceRam = componentRamMap.get(component);
- }
- assertIsValidInstance(defaultInstanceResource.cloneWithRam(instanceRam),
- MIN_RAM_PER_INSTANCE, maxContainerResource, paddingPercentage);
- return defaultInstanceResource.cloneWithRam(instanceRam);
+ return componentResourceMap;
}
public static long increaseBy(long value, int paddingPercentage) {
@@ -105,6 +84,26 @@ public final class PackingUtils {
}
/**
+ * Finalize padding by taking Math.max(containerResource * paddingPercent, paddingValue)
+ *
+ * @param containerResource max container resource
+ * @param padding padding value
+ * @param paddingPercentage padding percent
+ * @return finalized padding amount
+ */
+ public static Resource finalizePadding(
+ Resource containerResource, Resource padding, int paddingPercentage) {
+ double cpuPadding = Math.max(padding.getCpu(),
+ containerResource.getCpu() * paddingPercentage / 100);
+ ByteAmount ramPadding = ByteAmount.fromBytes(Math.max(padding.getRam().asBytes(),
+ containerResource.getRam().asBytes() * paddingPercentage / 100));
+ ByteAmount diskPadding = ByteAmount.fromBytes(Math.max(padding.getDisk().asBytes(),
+ containerResource.getDisk().asBytes() * paddingPercentage / 100));
+
+ return new Resource(cpuPadding, ramPadding, diskPadding);
+ }
+
+ /**
* Identifies which components need to be scaled given specific scaling direction
*
* @return Map < component name, scale factor >
diff --git a/heron/packing/tests/java/org/apache/heron/packing/AssertPacking.java b/heron/packing/tests/java/org/apache/heron/packing/AssertPacking.java
index 3858bfe..d561c9e 100644
--- a/heron/packing/tests/java/org/apache/heron/packing/AssertPacking.java
+++ b/heron/packing/tests/java/org/apache/heron/packing/AssertPacking.java
@@ -33,7 +33,6 @@ import org.apache.heron.spi.packing.InstanceId;
import org.apache.heron.spi.packing.PackingPlan;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -51,50 +50,9 @@ public final class AssertPacking {
* expectedBoltRam and likewise for spouts. If notExpectedContainerRam is not null, verifies that
* the container RAM is not that.
*/
- public static void assertContainers(Set<PackingPlan.ContainerPlan> containerPlans,
- String boltName, String spoutName,
- ByteAmount expectedBoltRam, ByteAmount expectedSpoutRam,
- ByteAmount notExpectedContainerRam) {
- boolean boltFound = false;
- boolean spoutFound = false;
- List<Integer> expectedInstanceIndices = new ArrayList<>();
- List<Integer> foundInstanceIndices = new ArrayList<>();
- int expectedInstanceIndex = 1;
- // RAM for bolt should be the value in component RAM map
- for (PackingPlan.ContainerPlan containerPlan : containerPlans) {
- if (notExpectedContainerRam != null) {
- assertNotEquals(notExpectedContainerRam, containerPlan.getRequiredResource().getRam());
- }
- for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
- expectedInstanceIndices.add(expectedInstanceIndex++);
- foundInstanceIndices.add(instancePlan.getTaskId());
- if (instancePlan.getComponentName().equals(boltName)) {
- assertEquals("Unexpected bolt RAM", expectedBoltRam, instancePlan.getResource().getRam());
- boltFound = true;
- }
- if (instancePlan.getComponentName().equals(spoutName)) {
- assertEquals(
- "Unexpected spout RAM", expectedSpoutRam, instancePlan.getResource().getRam());
- spoutFound = true;
- }
- }
- }
- assertTrue("Bolt not found in any of the container plans: " + boltName, boltFound);
- assertTrue("Spout not found in any of the container plans: " + spoutName, spoutFound);
-
- Collections.sort(foundInstanceIndices);
- assertEquals("Unexpected instance global id set found.",
- expectedInstanceIndices, foundInstanceIndices);
- }
-
- /**
- * Verifies that the containerPlan has at least one bolt named boltName with RAM equal to
- * expectedBoltRam and likewise for spouts. If notExpectedContainerRam is not null, verifies that
- * the container RAM is not that.
- */
public static void assertInstanceRam(Set<PackingPlan.ContainerPlan> containerPlans,
- String boltName, String spoutName,
- ByteAmount expectedBoltRam, ByteAmount expectedSpoutRam) {
+ String boltName, String spoutName,
+ ByteAmount expectedBoltRam, ByteAmount expectedSpoutRam) {
// RAM for bolt should be the value in component RAM map
for (PackingPlan.ContainerPlan containerPlan : containerPlans) {
for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
@@ -246,10 +204,7 @@ public final class AssertPacking {
instance.getComponentIndex() != instancePlan.getComponentIndex());
}
}
- if (componentInstances.get(instancePlan.getComponentName()) == null) {
- componentInstances.put(instancePlan.getComponentName(),
- new HashSet<PackingPlan.InstancePlan>());
- }
+ componentInstances.computeIfAbsent(instancePlan.getComponentName(), k -> new HashSet<>());
componentInstances.get(instancePlan.getComponentName()).add(instancePlan);
}
}
diff --git a/heron/packing/tests/java/org/apache/heron/packing/CommonPackingTests.java b/heron/packing/tests/java/org/apache/heron/packing/CommonPackingTests.java
index 97aaeec..d99ac92 100644
--- a/heron/packing/tests/java/org/apache/heron/packing/CommonPackingTests.java
+++ b/heron/packing/tests/java/org/apache/heron/packing/CommonPackingTests.java
@@ -30,6 +30,7 @@ import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.common.basics.Pair;
import org.apache.heron.common.utils.topology.TopologyTests;
import org.apache.heron.packing.exceptions.ConstraintViolationException;
+import org.apache.heron.packing.utils.PackingUtils;
import org.apache.heron.spi.common.Config;
import org.apache.heron.spi.common.Context;
import org.apache.heron.spi.packing.IPacking;
@@ -45,12 +46,9 @@ import static org.apache.heron.packing.AssertPacking.DELTA;
* There is some common functionality in multiple packing plans. This class contains common tests.
*/
public abstract class CommonPackingTests {
- protected static final String A = "A";
- protected static final String B = "B";
-
protected static final String BOLT_NAME = "bolt";
protected static final String SPOUT_NAME = "spout";
- protected static final int DEFAULT_CONTAINER_PADDING = 10;
+ protected static final int DEFAULT_CONTAINER_PADDING_PERCENT = 10;
protected int spoutParallelism;
protected int boltParallelism;
protected Integer totalInstances;
@@ -80,9 +78,8 @@ public abstract class CommonPackingTests {
Context.instanceCpu(config), Context.instanceRam(config), Context.instanceDisk(config));
}
- protected static TopologyAPI.Topology getTopology(
- int spoutParallelism, int boltParallelism,
- org.apache.heron.api.Config topologyConfig) {
+ protected static TopologyAPI.Topology getTopology(int spoutParallelism, int boltParallelism,
+ org.apache.heron.api.Config topologyConfig) {
return TopologyTests.createTopology("testTopology", topologyConfig, SPOUT_NAME, BOLT_NAME,
spoutParallelism, boltParallelism);
}
@@ -93,7 +90,7 @@ public abstract class CommonPackingTests {
return packing.pack();
}
- protected PackingPlan repack(TopologyAPI.Topology testTopology,
+ private PackingPlan repack(TopologyAPI.Topology testTopology,
PackingPlan initialPackingPlan,
Map<String, Integer> componentChanges) {
IRepacking repacking = getRepackingImpl();
@@ -101,6 +98,17 @@ public abstract class CommonPackingTests {
return repacking.repack(initialPackingPlan, componentChanges);
}
+ protected Resource getDefaultMaxContainerResource() {
+ return getDefaultMaxContainerResource(
+ PackingUtils.DEFAULT_MAX_NUM_INSTANCES_PER_CONTAINER);
+ }
+
+ protected Resource getDefaultMaxContainerResource(int maxNumInstancesPerContainer) {
+ return new Resource(this.instanceDefaultResources.getCpu() * maxNumInstancesPerContainer,
+ this.instanceDefaultResources.getRam().multiply(maxNumInstancesPerContainer),
+ this.instanceDefaultResources.getDisk().multiply(maxNumInstancesPerContainer));
+ }
+
protected Resource getDefaultUnspecifiedContainerResource(int testNumInstances,
int testNumContainers,
Resource padding) {
@@ -111,11 +119,43 @@ public abstract class CommonPackingTests {
}
protected PackingPlan doDefaultScalingTest(Map<String, Integer> componentChanges,
- int numContainersBeforeRepack) {
- return doScalingTest(topology, componentChanges,
- instanceDefaultResources.getRam(), boltParallelism,
- instanceDefaultResources.getRam(), spoutParallelism,
- numContainersBeforeRepack, totalInstances);
+ int numContainersBeforeRepack,
+ int numContainersAfterRepack,
+ Resource maxContainerResource) {
+ return doPackingAndScalingTest(topology, componentChanges,
+ instanceDefaultResources, boltParallelism,
+ instanceDefaultResources, spoutParallelism,
+ numContainersBeforeRepack, numContainersAfterRepack, maxContainerResource);
+ }
+
+ /**
+ * Performs a scaling test for a specific topology. It first
+ * computes an initial packing plan as a basis for scaling.
+ * Given specific component parallelism changes, a new packing plan is produced.
+ *
+ * @param testTopology Input topology
+ * @param componentChanges parallelism changes for scale up/down
+ * @param boltRes RAM allocated to bolts
+ * @param testBoltParallelism bolt parallelism
+ * @param spoutRes RAM allocated to spouts
+ * @param testSpoutParallelism spout parallelism
+ * @param numContainersBeforeRepack number of containers that the initial packing plan should use
+ * @param numContainersAfterRepack number of instances expected before scaling
+ * @return the new packing plan
+ */
+ protected PackingPlan doPackingAndScalingTest(TopologyAPI.Topology testTopology,
+ Map<String, Integer> componentChanges,
+ Resource boltRes, int testBoltParallelism,
+ Resource spoutRes, int testSpoutParallelism,
+ int numContainersBeforeRepack,
+ int numContainersAfterRepack,
+ Resource maxContainerResource) {
+ PackingPlan packingPlan = doPackingTest(testTopology, boltRes, testBoltParallelism,
+ spoutRes, testSpoutParallelism, numContainersBeforeRepack, maxContainerResource);
+ PackingPlan newPackingPlan = doScalingTest(testTopology, packingPlan, componentChanges,
+ boltRes, testBoltParallelism, spoutRes, testSpoutParallelism,
+ numContainersAfterRepack, maxContainerResource);
+ return newPackingPlan;
}
protected PackingPlan doPackingTest(TopologyAPI.Topology testTopology,
@@ -229,41 +269,27 @@ public abstract class CommonPackingTests {
return packingPlan;
}
- /**
- * Performs a scaling test for a specific topology. It first
- * computes an initial packing plan as a basis for scaling.
- * Given specific component parallelism changes, a new packing plan is produced.
- *
- * @param testTopology Input topology
- * @param componentChanges parallelism changes for scale up/down
- * @param boltRam RAM allocated to bolts
- * @param testBoltParallelism bolt parallelism
- * @param spoutRam RAM allocated to spouts
- * @param testSpoutParallelism spout parallelism
- * @param numContainersBeforeRepack number of containers that the initial packing plan should use
- * @param totalInstancesExpected number of instances expected before scaling
- * @return the new packing plan
- */
protected PackingPlan doScalingTest(TopologyAPI.Topology testTopology,
+ PackingPlan packingPlan,
Map<String, Integer> componentChanges,
- ByteAmount boltRam,
- int testBoltParallelism, ByteAmount spoutRam,
- int testSpoutParallelism,
- int numContainersBeforeRepack,
- int totalInstancesExpected) {
- PackingPlan packingPlan = pack(testTopology);
-
- Assert.assertEquals(numContainersBeforeRepack, packingPlan.getContainers().size());
- Assert.assertEquals(totalInstancesExpected, (int) packingPlan.getInstanceCount());
- AssertPacking.assertContainers(packingPlan.getContainers(),
- BOLT_NAME, SPOUT_NAME, boltRam, spoutRam, null);
- AssertPacking.assertNumInstances(packingPlan.getContainers(), BOLT_NAME, testBoltParallelism);
- AssertPacking.assertNumInstances(packingPlan.getContainers(), SPOUT_NAME, testSpoutParallelism);
+ Resource boltRes, int testBoltParallelism,
+ Resource spoutRes, int testSpoutParallelism,
+ int testNumContainers,
+ Resource maxContainerResource) {
+ PackingPlan newPackingPlan = repack(testTopology, packingPlan, componentChanges);
+ Assert.assertEquals(testNumContainers, newPackingPlan.getContainers().size());
+ AssertPacking.assertInstanceRam(newPackingPlan.getContainers(), BOLT_NAME, SPOUT_NAME,
+ boltRes.getRam(), spoutRes.getRam());
+ AssertPacking.assertInstanceCpu(newPackingPlan.getContainers(), BOLT_NAME, SPOUT_NAME,
+ boltRes.getCpu(), spoutRes.getCpu());
+// AssertPacking.assertInstanceIndices(newPackingPlan.getContainers(), BOLT_NAME, SPOUT_NAME);
+ AssertPacking.assertNumInstances(newPackingPlan.getContainers(), BOLT_NAME,
+ testBoltParallelism + componentChanges.getOrDefault(BOLT_NAME, 0));
+ AssertPacking.assertNumInstances(newPackingPlan.getContainers(), SPOUT_NAME,
+ testSpoutParallelism + componentChanges.getOrDefault(SPOUT_NAME, 0));
- PackingPlan newPackingPlan =
- repack(testTopology, packingPlan, componentChanges);
- AssertPacking.assertContainerRam(newPackingPlan.getContainers(),
- packingPlan.getMaxContainerResources().getRam());
+ AssertPacking.assertContainerRam(newPackingPlan.getContainers(), maxContainerResource.getRam());
+ AssertPacking.assertContainerCpu(newPackingPlan.getContainers(), maxContainerResource.getCpu());
return newPackingPlan;
}
@@ -372,7 +398,7 @@ public abstract class CommonPackingTests {
// reconstruct the PackingPlan, see https://github.com/apache/incubator-heron/issues/1577
PackingPlan initialPackingPlan = PackingTestHelper.addToTestPackingPlan(
topologyId, null, PackingTestHelper.toContainerIdComponentNames(initialComponentInstances),
- DEFAULT_CONTAINER_PADDING);
+ DEFAULT_CONTAINER_PADDING_PERCENT);
AssertPacking.assertPackingPlan(topologyId, initialComponentInstances, initialPackingPlan);
PackingPlan newPackingPlan = repack(this.topology, initialPackingPlan, componentChanges);
diff --git a/heron/packing/tests/java/org/apache/heron/packing/PackingTestHelper.java b/heron/packing/tests/java/org/apache/heron/packing/PackingTestHelper.java
index d4dc6fe..f0198d7 100644
--- a/heron/packing/tests/java/org/apache/heron/packing/PackingTestHelper.java
+++ b/heron/packing/tests/java/org/apache/heron/packing/PackingTestHelper.java
@@ -19,10 +19,17 @@
package org.apache.heron.packing;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.common.basics.Pair;
import org.apache.heron.packing.builder.PackingPlanBuilder;
+import org.apache.heron.packing.constraints.MinRamConstraint;
+import org.apache.heron.packing.constraints.ResourceConstraint;
import org.apache.heron.packing.exceptions.ConstraintViolationException;
+import org.apache.heron.packing.utils.PackingUtils;
import org.apache.heron.spi.packing.InstanceId;
import org.apache.heron.spi.packing.PackingPlan;
import org.apache.heron.spi.packing.Resource;
@@ -81,15 +88,21 @@ public final class PackingTestHelper {
// use basic default resource to allow all instances to fit on a single container, if that's
// what the tester desired. We can extend this to permit passing custom resource requirements
// as needed.
- builder.setDefaultInstanceResource(
- new Resource(1, ByteAmount.fromMegabytes(192), ByteAmount.fromMegabytes(1)));
+ Resource defaultInstanceResource = new Resource(1,
+ ByteAmount.fromMegabytes(192), ByteAmount.fromMegabytes(1));
+
+ Map<String, Resource> componentResourceMap = PackingUtils.getComponentResourceMap(
+ toParallelismMap(previousPackingPlan, addInstances, removeInstances).keySet(),
+ new HashMap<>(), new HashMap<>(), new HashMap<>(), defaultInstanceResource);
+
+ builder.setDefaultInstanceResource(defaultInstanceResource);
+ builder.setRequestedComponentResource(componentResourceMap);
builder.setMaxContainerResource(new Resource(
instanceCount,
ByteAmount.fromMegabytes(192).multiply(instanceCount),
ByteAmount.fromMegabytes(instanceCount)));
-
- // This setting is important, see https://github.com/apache/incubator-heron/issues/1577
- builder.setRequestedContainerPadding(containerPadding);
+ builder.setInstanceConstraints(Collections.singletonList(new MinRamConstraint()));
+ builder.setPackingConstraints(Collections.singletonList(new ResourceConstraint()));
if (addInstances != null) {
for (Pair<Integer, String> componentInstance : addInstances) {
@@ -117,4 +130,30 @@ public final class PackingTestHelper {
}
return containerIdComponentNames;
}
+
+ public static Map<String, Integer> toParallelismMap(
+ PackingPlan packingPlan,
+ Pair<Integer, String>[] containerIdInstanceIdsToAdd,
+ Pair<Integer, String>[] containerIdInstanceIdsToRemove) {
+ Map<String, Integer> parallelismMap = new HashMap<>();
+ if (packingPlan != null) {
+ parallelismMap = new HashMap<>(packingPlan.getComponentCounts());
+ }
+ if (containerIdInstanceIdsToAdd != null) {
+ for (Pair<Integer, String> containerIdInstanceId : containerIdInstanceIdsToAdd) {
+ String componentName = containerIdInstanceId.second;
+ parallelismMap.put(componentName, parallelismMap.getOrDefault(componentName, 0) + 1);
+ }
+ }
+ if (containerIdInstanceIdsToRemove != null) {
+ for (Pair<Integer, String> containerIdInstanceId : containerIdInstanceIdsToRemove) {
+ String componentName = containerIdInstanceId.second;
+ if (parallelismMap.containsKey(componentName)) {
+ parallelismMap.put(componentName, parallelismMap.get(componentName) - 1);
+ }
+ }
+ }
+
+ return parallelismMap;
+ }
}
diff --git a/heron/packing/tests/java/org/apache/heron/packing/binpacking/FirstFitDecreasingPackingTest.java b/heron/packing/tests/java/org/apache/heron/packing/binpacking/FirstFitDecreasingPackingTest.java
index 1b688f6..78245ff 100644
--- a/heron/packing/tests/java/org/apache/heron/packing/binpacking/FirstFitDecreasingPackingTest.java
+++ b/heron/packing/tests/java/org/apache/heron/packing/binpacking/FirstFitDecreasingPackingTest.java
@@ -27,10 +27,8 @@ import java.util.Set;
import org.junit.Assert;
import org.junit.Test;
-import org.apache.heron.api.generated.TopologyAPI;
import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.common.basics.Pair;
-import org.apache.heron.packing.AssertPacking;
import org.apache.heron.packing.CommonPackingTests;
import org.apache.heron.packing.utils.PackingUtils;
import org.apache.heron.spi.packing.IPacking;
@@ -53,8 +51,14 @@ public class FirstFitDecreasingPackingTest extends CommonPackingTests {
}
@Test (expected = PackingException.class)
- public void testFailureInsufficientContainerRamHint() throws Exception {
- topologyConfig.setContainerMaxRamHint(ByteAmount.ZERO);
+ public void testFailureInsufficientContainerRam() throws Exception {
+ topologyConfig.setContainerRamRequested(ByteAmount.ZERO);
+ pack(getTopology(spoutParallelism, boltParallelism, topologyConfig));
+ }
+
+ @Test (expected = PackingException.class)
+ public void testFailureInsufficientContainerCpu() throws Exception {
+ topologyConfig.setContainerCpuRequested(1.0);
pack(getTopology(spoutParallelism, boltParallelism, topologyConfig));
}
@@ -63,17 +67,10 @@ public class FirstFitDecreasingPackingTest extends CommonPackingTests {
*/
@Test
public void testDefaultContainerSize() throws Exception {
- int defaultNumInstancesperContainer = 4;
- PackingPlan packingPlan = pack(topology);
-
- Assert.assertEquals(2, packingPlan.getContainers().size());
- Assert.assertEquals(totalInstances, packingPlan.getInstanceCount());
- ByteAmount defaultRam = instanceDefaultResources.getRam()
- .multiply(defaultNumInstancesperContainer).increaseBy(DEFAULT_CONTAINER_PADDING);
-
- AssertPacking.assertContainerRam(packingPlan.getContainers(), defaultRam);
- AssertPacking.assertNumInstances(packingPlan.getContainers(), BOLT_NAME, 3);
- AssertPacking.assertNumInstances(packingPlan.getContainers(), SPOUT_NAME, 4);
+ doPackingTest(topology,
+ instanceDefaultResources, boltParallelism,
+ instanceDefaultResources, spoutParallelism,
+ 3, getDefaultMaxContainerResource());
}
/**
@@ -82,21 +79,14 @@ public class FirstFitDecreasingPackingTest extends CommonPackingTests {
@Test
public void testDefaultContainerSizeWithPadding() throws Exception {
int padding = 50;
- int defaultNumInstancesperContainer = 4;
topologyConfig.setContainerPaddingPercentage(padding);
- TopologyAPI.Topology newTopology =
- getTopology(spoutParallelism, boltParallelism, topologyConfig);
- PackingPlan packingPlan = pack(newTopology);
-
- Assert.assertEquals(2, packingPlan.getContainers().size());
- Assert.assertEquals(totalInstances, packingPlan.getInstanceCount());
- ByteAmount defaultRam = instanceDefaultResources.getRam()
- .multiply(defaultNumInstancesperContainer).increaseBy(padding);
- AssertPacking.assertContainerRam(packingPlan.getContainers(),
- defaultRam);
- AssertPacking.assertNumInstances(packingPlan.getContainers(), BOLT_NAME, 3);
- AssertPacking.assertNumInstances(packingPlan.getContainers(), SPOUT_NAME, 4);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+ doPackingTest(topology,
+ instanceDefaultResources, boltParallelism,
+ instanceDefaultResources, spoutParallelism,
+ 4, getDefaultMaxContainerResource());
}
/**
@@ -108,36 +98,38 @@ public class FirstFitDecreasingPackingTest extends CommonPackingTests {
ByteAmount containerRam = ByteAmount.fromGigabytes(10);
ByteAmount containerDisk = ByteAmount.fromGigabytes(20);
double containerCpu = 30;
-
- topologyConfig.setContainerMaxRamHint(containerRam);
- topologyConfig.setContainerMaxDiskHint(containerDisk);
- topologyConfig.setContainerMaxCpuHint(containerCpu);
-
- TopologyAPI.Topology topologyExplicitResourcesConfig =
- getTopology(spoutParallelism, boltParallelism, topologyConfig);
- PackingPlan packingPlanExplicitResourcesConfig = pack(topologyExplicitResourcesConfig);
-
- Assert.assertEquals(1, packingPlanExplicitResourcesConfig.getContainers().size());
- Assert.assertEquals(totalInstances, packingPlanExplicitResourcesConfig.getInstanceCount());
- AssertPacking.assertNumInstances(packingPlanExplicitResourcesConfig.getContainers(),
- BOLT_NAME, 3);
- AssertPacking.assertNumInstances(packingPlanExplicitResourcesConfig.getContainers(),
- SPOUT_NAME, 4);
-
- for (PackingPlan.ContainerPlan containerPlan
- : packingPlanExplicitResourcesConfig.getContainers()) {
- Assert.assertEquals(Math.round(PackingUtils.increaseBy(
- totalInstances * instanceDefaultResources.getCpu(), DEFAULT_CONTAINER_PADDING)),
+ Resource containerResource = new Resource(containerCpu, containerRam, containerDisk);
+
+ Resource padding = PackingUtils.finalizePadding(
+ new Resource(containerCpu, containerRam, containerDisk),
+ new Resource(PackingUtils.DEFAULT_CONTAINER_CPU_PADDING,
+ PackingUtils.DEFAULT_CONTAINER_RAM_PADDING,
+ PackingUtils.DEFAULT_CONTAINER_RAM_PADDING),
+ PackingUtils.DEFAULT_CONTAINER_PADDING_PERCENTAGE);
+
+ topologyConfig.setContainerRamRequested(containerRam);
+ topologyConfig.setContainerDiskRequested(containerDisk);
+ topologyConfig.setContainerCpuRequested(containerCpu);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+ PackingPlan packingPlan = doPackingTest(topology,
+ instanceDefaultResources, boltParallelism,
+ instanceDefaultResources, spoutParallelism,
+ 1, containerResource);
+
+ for (PackingPlan.ContainerPlan containerPlan : packingPlan.getContainers()) {
+ Assert.assertEquals(Math.round(totalInstances * instanceDefaultResources.getCpu()
+ + padding.getCpu()),
(long) containerPlan.getRequiredResource().getCpu());
Assert.assertEquals(instanceDefaultResources.getRam()
.multiply(totalInstances)
- .increaseBy(DEFAULT_CONTAINER_PADDING),
+ .plus(padding.getRam()),
containerPlan.getRequiredResource().getRam());
Assert.assertEquals(instanceDefaultResources.getDisk()
.multiply(totalInstances)
- .increaseBy(DEFAULT_CONTAINER_PADDING),
+ .plus(padding.getDisk()),
containerPlan.getRequiredResource().getDisk());
// All instances' resource requirement should be equal
@@ -159,32 +151,26 @@ public class FirstFitDecreasingPackingTest extends CommonPackingTests {
public void testCompleteRamMapRequested() throws Exception {
// Explicit set max resources for container
// the value should be ignored, since we set the complete component RAM map
- ByteAmount maxContainerRam = ByteAmount.fromGigabytes(15);
- ByteAmount maxContainerDisk = ByteAmount.fromGigabytes(20);
- double maxContainerCpu = 30;
+ ByteAmount containerRam = ByteAmount.fromGigabytes(15);
+ ByteAmount containerDisk = ByteAmount.fromGigabytes(20);
+ double containerCpu = 30;
+ Resource containerResource = new Resource(containerCpu, containerRam, containerDisk);
// Explicit set component RAM map
ByteAmount boltRam = ByteAmount.fromGigabytes(1);
ByteAmount spoutRam = ByteAmount.fromGigabytes(2);
- topologyConfig.setContainerMaxRamHint(maxContainerRam);
- topologyConfig.setContainerMaxDiskHint(maxContainerDisk);
- topologyConfig.setContainerMaxCpuHint(maxContainerCpu);
+ topologyConfig.setContainerRamRequested(containerRam);
+ topologyConfig.setContainerDiskRequested(containerDisk);
+ topologyConfig.setContainerCpuRequested(containerCpu);
topologyConfig.setComponentRam(BOLT_NAME, boltRam);
topologyConfig.setComponentRam(SPOUT_NAME, spoutRam);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
- TopologyAPI.Topology topologyExplicitRamMap =
- getTopology(spoutParallelism, boltParallelism, topologyConfig);
- PackingPlan packingPlanExplicitRamMap = pack(topologyExplicitRamMap);
-
- Assert.assertEquals(1, packingPlanExplicitRamMap.getContainers().size());
- Assert.assertEquals(totalInstances, packingPlanExplicitRamMap.getInstanceCount());
- AssertPacking.assertNumInstances(packingPlanExplicitRamMap.getContainers(), BOLT_NAME, 3);
- AssertPacking.assertNumInstances(packingPlanExplicitRamMap.getContainers(), SPOUT_NAME, 4);
- AssertPacking.assertContainers(packingPlanExplicitRamMap.getContainers(),
- BOLT_NAME, SPOUT_NAME, boltRam, spoutRam, maxContainerRam);
- AssertPacking.assertContainerRam(packingPlanExplicitRamMap.getContainers(),
- maxContainerRam);
+ doPackingTest(topology,
+ instanceDefaultResources.cloneWithRam(boltRam), boltParallelism,
+ instanceDefaultResources.cloneWithRam(spoutRam), spoutParallelism,
+ 1, containerResource);
}
/**
@@ -198,23 +184,15 @@ public class FirstFitDecreasingPackingTest extends CommonPackingTests {
ByteAmount boltRam = ByteAmount.fromGigabytes(1);
ByteAmount spoutRam = ByteAmount.fromGigabytes(2);
- topologyConfig.setContainerMaxRamHint(maxContainerRam);
+ topologyConfig.setContainerRamRequested(maxContainerRam);
topologyConfig.setComponentRam(BOLT_NAME, boltRam);
topologyConfig.setComponentRam(SPOUT_NAME, spoutRam);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
- TopologyAPI.Topology topologyExplicitRamMap =
- getTopology(spoutParallelism, boltParallelism, topologyConfig);
- PackingPlan packingPlanExplicitRamMap = pack(topologyExplicitRamMap);
-
- Assert.assertEquals(2, packingPlanExplicitRamMap.getContainers().size());
- Assert.assertEquals(totalInstances, packingPlanExplicitRamMap.getInstanceCount());
- AssertPacking.assertNumInstances(packingPlanExplicitRamMap.getContainers(), BOLT_NAME, 3);
- AssertPacking.assertNumInstances(packingPlanExplicitRamMap.getContainers(), SPOUT_NAME, 4);
-
- AssertPacking.assertContainers(packingPlanExplicitRamMap.getContainers(),
- BOLT_NAME, SPOUT_NAME, boltRam, spoutRam, maxContainerRam);
- AssertPacking.assertContainerRam(packingPlanExplicitRamMap.getContainers(),
- maxContainerRam);
+ doPackingTest(topology,
+ instanceDefaultResources.cloneWithRam(boltRam), boltParallelism,
+ instanceDefaultResources.cloneWithRam(spoutRam), spoutParallelism,
+ 3, getDefaultMaxContainerResource().cloneWithRam(maxContainerRam));
}
/**
@@ -228,22 +206,14 @@ public class FirstFitDecreasingPackingTest extends CommonPackingTests {
// Explicit set component RAM map
ByteAmount boltRam = ByteAmount.fromGigabytes(4);
- topologyConfig.setContainerMaxRamHint(maxContainerRam);
+ topologyConfig.setContainerRamRequested(maxContainerRam);
topologyConfig.setComponentRam(BOLT_NAME, boltRam);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
- TopologyAPI.Topology topologyExplicitRamMap =
- getTopology(spoutParallelism, boltParallelism, topologyConfig);
- PackingPlan packingPlanExplicitRamMap = pack(topologyExplicitRamMap);
-
- Assert.assertEquals(2, packingPlanExplicitRamMap.getContainers().size());
- Assert.assertEquals(totalInstances, packingPlanExplicitRamMap.getInstanceCount());
- AssertPacking.assertNumInstances(packingPlanExplicitRamMap.getContainers(), BOLT_NAME, 3);
- AssertPacking.assertNumInstances(packingPlanExplicitRamMap.getContainers(), SPOUT_NAME, 4);
-
- AssertPacking.assertContainers(packingPlanExplicitRamMap.getContainers(),
- BOLT_NAME, SPOUT_NAME, boltRam, instanceDefaultResources.getRam(), maxContainerRam);
- AssertPacking.assertContainerRam(packingPlanExplicitRamMap.getContainers(),
- maxContainerRam);
+ doPackingTest(topology,
+ instanceDefaultResources.cloneWithRam(boltRam), boltParallelism,
+ instanceDefaultResources, spoutParallelism,
+ 3, getDefaultMaxContainerResource().cloneWithRam(maxContainerRam));
}
/**
@@ -258,27 +228,14 @@ public class FirstFitDecreasingPackingTest extends CommonPackingTests {
// Explicit set component RAM map
ByteAmount boltRam = ByteAmount.fromGigabytes(4);
- topologyConfig.setContainerMaxRamHint(maxContainerRam);
+ topologyConfig.setContainerRamRequested(maxContainerRam);
topologyConfig.setComponentRam(BOLT_NAME, boltRam);
- TopologyAPI.Topology topologyExplicitRamMap =
- getTopology(spoutParallelism, boltParallelism, topologyConfig);
- PackingPlan packingPlanExplicitRamMap = pack(topologyExplicitRamMap);
-
- Assert.assertEquals(2, packingPlanExplicitRamMap.getContainers().size());
- Assert.assertEquals(totalInstances, packingPlanExplicitRamMap.getInstanceCount());
- AssertPacking.assertNumInstances(packingPlanExplicitRamMap.getContainers(), BOLT_NAME, 3);
- AssertPacking.assertNumInstances(packingPlanExplicitRamMap.getContainers(), SPOUT_NAME, 4);
-
- AssertPacking.assertContainers(packingPlanExplicitRamMap.getContainers(),
- BOLT_NAME, SPOUT_NAME, boltRam, instanceDefaultResources.getRam(), null);
- AssertPacking.assertContainerRam(packingPlanExplicitRamMap.getContainers(),
- maxContainerRam);
- }
-
- @Test
- public void testContainersRequestedExceedsInstanceCount() throws Exception {
- doTestContainerCountRequested(8, 2); // instances will fit into 2 containers
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+ doPackingTest(topology,
+ instanceDefaultResources.cloneWithRam(boltRam), boltParallelism,
+ instanceDefaultResources, spoutParallelism,
+ 3, getDefaultMaxContainerResource().cloneWithRam(maxContainerRam));
}
/**
@@ -287,34 +244,13 @@ public class FirstFitDecreasingPackingTest extends CommonPackingTests {
*/
@Test
public void testDefaultContainerSizeRepack() throws Exception {
- int defaultNumInstancesperContainer = 4;
int numScalingInstances = 5;
Map<String, Integer> componentChanges = new HashMap<>();
componentChanges.put(BOLT_NAME, numScalingInstances);
- int numContainersBeforeRepack = 2;
- PackingPlan newPackingPlan = doDefaultScalingTest(componentChanges, numContainersBeforeRepack);
- Assert.assertEquals(3, newPackingPlan.getContainers().size());
- Assert.assertEquals((Integer) (totalInstances + numScalingInstances),
- newPackingPlan.getInstanceCount());
- AssertPacking.assertContainers(newPackingPlan.getContainers(),
- BOLT_NAME, SPOUT_NAME, instanceDefaultResources.getRam(),
- instanceDefaultResources.getRam(), null);
- for (PackingPlan.ContainerPlan containerPlan
- : newPackingPlan.getContainers()) {
- Assert.assertEquals(Math.round(PackingUtils.increaseBy(
- defaultNumInstancesperContainer * instanceDefaultResources.getCpu(),
- DEFAULT_CONTAINER_PADDING)), (long) containerPlan.getRequiredResource().getCpu());
-
- Assert.assertEquals(instanceDefaultResources.getRam()
- .multiply(defaultNumInstancesperContainer)
- .increaseBy(DEFAULT_CONTAINER_PADDING),
- containerPlan.getRequiredResource().getRam());
-
- Assert.assertEquals(instanceDefaultResources.getDisk()
- .multiply(defaultNumInstancesperContainer)
- .increaseBy(DEFAULT_CONTAINER_PADDING),
- containerPlan.getRequiredResource().getDisk());
- }
+ int numContainersBeforeRepack = 3;
+ int numContainersAfterRepack = 4;
+ doDefaultScalingTest(componentChanges, numContainersBeforeRepack, numContainersAfterRepack,
+ getDefaultMaxContainerResource());
}
/**
@@ -328,39 +264,21 @@ public class FirstFitDecreasingPackingTest extends CommonPackingTests {
ByteAmount boltRam = ByteAmount.fromGigabytes(4);
ByteAmount maxContainerRam = ByteAmount.fromGigabytes(10);
topologyConfig.setComponentRam(BOLT_NAME, boltRam);
- topologyConfig.setContainerMaxRamHint(maxContainerRam);
-
- TopologyAPI.Topology topologyExplicitRamMap =
- getTopology(spoutParallelism, boltParallelism, topologyConfig);
+ topologyConfig.setContainerRamRequested(maxContainerRam);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
int numScalingInstances = 3;
Map<String, Integer> componentChanges = new HashMap<>();
componentChanges.put(BOLT_NAME, numScalingInstances);
- int numContainersBeforeRepack = 3;
- PackingPlan newPackingPlan =
- doScalingTest(topologyExplicitRamMap, componentChanges, boltRam,
- boltParallelism, instanceDefaultResources.getRam(), spoutParallelism,
- numContainersBeforeRepack, totalInstances);
-
- Assert.assertEquals(6, newPackingPlan.getContainers().size());
- Assert.assertEquals((Integer) (totalInstances + numScalingInstances),
- newPackingPlan.getInstanceCount());
- AssertPacking.assertContainers(newPackingPlan.getContainers(),
- BOLT_NAME, SPOUT_NAME, boltRam, instanceDefaultResources.getRam(), null);
-
- for (PackingPlan.ContainerPlan containerPlan : newPackingPlan.getContainers()) {
- //Each container either contains a single bolt or 1 bolt and 2 spouts
- if (containerPlan.getInstances().size() == 1) {
- Assert.assertEquals(boltRam.increaseBy(paddingPercentage),
- containerPlan.getRequiredResource().getRam());
- }
- if (containerPlan.getInstances().size() == 3) {
- ByteAmount resourceRam = boltRam.plus(instanceDefaultResources.getRam().multiply(2));
- Assert.assertEquals(resourceRam.increaseBy(paddingPercentage),
- containerPlan.getRequiredResource().getRam());
- }
- }
+ int numContainersBeforeRepack = 4;
+ int numContainersAfterRepack = 6;
+
+ doPackingAndScalingTest(topology, componentChanges,
+ instanceDefaultResources.cloneWithRam(boltRam), boltParallelism,
+ instanceDefaultResources, spoutParallelism,
+ numContainersBeforeRepack, numContainersAfterRepack,
+ getDefaultMaxContainerResource().cloneWithRam(maxContainerRam));
}
/**
@@ -368,29 +286,24 @@ public class FirstFitDecreasingPackingTest extends CommonPackingTests {
*/
@Test
public void testPartialRamMapScaling() throws Exception {
-
ByteAmount boltRam = ByteAmount.fromGigabytes(4);
ByteAmount maxContainerRam = ByteAmount.fromGigabytes(10);
- topologyConfig.setContainerMaxRamHint(maxContainerRam);
+ topologyConfig.setContainerRamRequested(maxContainerRam);
topologyConfig.setComponentRam(BOLT_NAME, boltRam);
- TopologyAPI.Topology topologyExplicitRamMap =
- getTopology(spoutParallelism, boltParallelism, topologyConfig);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
int numScalingInstances = 3;
Map<String, Integer> componentChanges = new HashMap<>();
componentChanges.put(BOLT_NAME, numScalingInstances);
- int numContainersBeforeRepack = 2;
- PackingPlan newPackingPlan = doScalingTest(topologyExplicitRamMap, componentChanges, boltRam,
- boltParallelism, instanceDefaultResources.getRam(), spoutParallelism,
- numContainersBeforeRepack, totalInstances);
-
- Assert.assertEquals(4, newPackingPlan.getContainers().size());
- Assert.assertEquals((Integer) (totalInstances + numScalingInstances),
- newPackingPlan.getInstanceCount());
- AssertPacking.assertContainers(newPackingPlan.getContainers(),
- BOLT_NAME, SPOUT_NAME, boltRam, instanceDefaultResources.getRam(), null);
+ int numContainersBeforeRepack = 3;
+ int numContainersAfterRepack = 4;
+ doPackingAndScalingTest(topology, componentChanges,
+ instanceDefaultResources.cloneWithRam(boltRam), boltParallelism,
+ instanceDefaultResources, spoutParallelism,
+ numContainersBeforeRepack, numContainersAfterRepack,
+ getDefaultMaxContainerResource().cloneWithRam(maxContainerRam));
}
/**
@@ -400,20 +313,13 @@ public class FirstFitDecreasingPackingTest extends CommonPackingTests {
public void testScaleDown() throws Exception {
int spoutScalingDown = -3;
int boltScalingDown = -2;
-
Map<String, Integer> componentChanges = new HashMap<>();
componentChanges.put(SPOUT_NAME, spoutScalingDown); //leave 1 spout
componentChanges.put(BOLT_NAME, boltScalingDown); //leave 1 bolt
- int numContainersBeforeRepack = 2;
- PackingPlan newPackingPlan = doDefaultScalingTest(componentChanges, numContainersBeforeRepack);
-
- Assert.assertEquals(2, newPackingPlan.getContainers().size());
- Assert.assertEquals((Integer) (totalInstances + spoutScalingDown + boltScalingDown),
- newPackingPlan.getInstanceCount());
- AssertPacking.assertNumInstances(newPackingPlan.getContainers(),
- BOLT_NAME, 1);
- AssertPacking.assertNumInstances(newPackingPlan.getContainers(),
- SPOUT_NAME, 1);
+ int numContainersBeforeRepack = 3;
+ int numContainersAfterRepack = 1;
+ doDefaultScalingTest(componentChanges, numContainersBeforeRepack, numContainersAfterRepack,
+ getDefaultMaxContainerResource());
}
/**
@@ -421,21 +327,13 @@ public class FirstFitDecreasingPackingTest extends CommonPackingTests {
*/
@Test
public void removeFirstContainer() throws Exception {
- /* The packing plan consists of two containers. The first one contains 4 spouts and
- the second one contains 3 bolts. During scaling we remove 4 spouts and thus the f
- first container is removed.
- */
int spoutScalingDown = -4;
Map<String, Integer> componentChanges = new HashMap<>();
componentChanges.put(SPOUT_NAME, spoutScalingDown);
- int numContainersBeforeRepack = 2;
- PackingPlan newPackingPlan = doDefaultScalingTest(componentChanges, numContainersBeforeRepack);
-
- Assert.assertEquals(1, newPackingPlan.getContainers().size());
- Assert.assertEquals((Integer) (totalInstances + spoutScalingDown),
- newPackingPlan.getInstanceCount());
- AssertPacking.assertNumInstances(newPackingPlan.getContainers(), BOLT_NAME, 3);
- AssertPacking.assertNumInstances(newPackingPlan.getContainers(), SPOUT_NAME, 0);
+ int numContainersBeforeRepack = 3;
+ int numContainersAfterRepack = 2;
+ doDefaultScalingTest(componentChanges, numContainersBeforeRepack, numContainersAfterRepack,
+ getDefaultMaxContainerResource());
}
/**
@@ -448,14 +346,12 @@ public class FirstFitDecreasingPackingTest extends CommonPackingTests {
topologyConfig.setContainerPaddingPercentage(paddingPercentage);
ByteAmount spoutRam = ByteAmount.fromGigabytes(2);
ByteAmount maxContainerRam = ByteAmount.fromGigabytes(12);
- topologyConfig.setContainerMaxRamHint(maxContainerRam);
+ topologyConfig.setContainerRamRequested(maxContainerRam);
topologyConfig.setComponentRam(SPOUT_NAME, spoutRam);
- int noBolts = 2;
- int noSpouts = 1;
-
- TopologyAPI.Topology topologyExplicitRamMap =
- getTopology(noSpouts, noBolts, topologyConfig);
+ boltParallelism = 2;
+ spoutParallelism = 1;
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
int spoutScalingUp = 1;
int boltScalingDown = -2;
@@ -463,18 +359,14 @@ public class FirstFitDecreasingPackingTest extends CommonPackingTests {
Map<String, Integer> componentChanges = new HashMap<>();
componentChanges.put(SPOUT_NAME, spoutScalingUp); // 2 spouts
componentChanges.put(BOLT_NAME, boltScalingDown); // 0 bolts
- int numContainersBeforeRepack = 1;
- PackingPlan newPackingPlan = doScalingTest(topologyExplicitRamMap, componentChanges,
- instanceDefaultResources.getRam(), noBolts, spoutRam, noSpouts,
- numContainersBeforeRepack, noSpouts + noBolts);
-
- Assert.assertEquals(1, newPackingPlan.getContainers().size());
- Assert.assertEquals((Integer) (noSpouts + noBolts + spoutScalingUp + boltScalingDown),
- newPackingPlan.getInstanceCount());
- AssertPacking.assertNumInstances(newPackingPlan.getContainers(),
- BOLT_NAME, noBolts + boltScalingDown);
- AssertPacking.assertNumInstances(newPackingPlan.getContainers(),
- SPOUT_NAME, noSpouts + spoutScalingUp);
+ int numContainersBeforeRepack = 2;
+ int numContainersAfterRepack = 1;
+
+ doPackingAndScalingTest(topology, componentChanges,
+ instanceDefaultResources, boltParallelism,
+ instanceDefaultResources.cloneWithRam(spoutRam), spoutParallelism,
+ numContainersBeforeRepack, numContainersAfterRepack,
+ getDefaultMaxContainerResource().cloneWithRam(maxContainerRam));
}
/**
@@ -487,14 +379,13 @@ public class FirstFitDecreasingPackingTest extends CommonPackingTests {
topologyConfig.setContainerPaddingPercentage(paddingPercentage);
ByteAmount spoutRam = ByteAmount.fromGigabytes(4);
ByteAmount maxContainerRam = ByteAmount.fromGigabytes(12);
- topologyConfig.setContainerMaxRamHint(maxContainerRam);
+ topologyConfig.setContainerRamRequested(maxContainerRam);
topologyConfig.setComponentRam(SPOUT_NAME, spoutRam);
- int noBolts = 3;
- int noSpouts = 1;
+ boltParallelism = 3;
+ spoutParallelism = 1;
- TopologyAPI.Topology topologyExplicitRamMap =
- getTopology(noSpouts, noBolts, topologyConfig);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
int spoutScalingUp = 1;
int boltScalingDown = -1;
@@ -502,18 +393,70 @@ public class FirstFitDecreasingPackingTest extends CommonPackingTests {
Map<String, Integer> componentChanges = new HashMap<>();
componentChanges.put(SPOUT_NAME, spoutScalingUp); // 2 spouts
componentChanges.put(BOLT_NAME, boltScalingDown); // 2 bolts
- int numContainersBeforeRepack = 1;
- PackingPlan newPackingPlan = doScalingTest(topologyExplicitRamMap, componentChanges,
- instanceDefaultResources.getRam(), noBolts, spoutRam, noSpouts,
- numContainersBeforeRepack, noSpouts + noBolts);
-
- Assert.assertEquals(2, newPackingPlan.getContainers().size());
- Assert.assertEquals((Integer) (noSpouts + noBolts + spoutScalingUp + boltScalingDown),
- newPackingPlan.getInstanceCount());
- AssertPacking.assertNumInstances(newPackingPlan.getContainers(),
- BOLT_NAME, noBolts + boltScalingDown);
- AssertPacking.assertNumInstances(newPackingPlan.getContainers(),
- SPOUT_NAME, noSpouts + spoutScalingUp);
+ int numContainersBeforeRepack = 2;
+ int numContainersAfterRepack = 2;
+
+ doPackingAndScalingTest(topology, componentChanges,
+ instanceDefaultResources, boltParallelism,
+ instanceDefaultResources.cloneWithRam(spoutRam), spoutParallelism,
+ numContainersBeforeRepack, numContainersAfterRepack,
+ getDefaultMaxContainerResource().cloneWithRam(maxContainerRam));
+ }
+
+ /**
+ * Test the scenario where scaling down and up is simultaneously requested
+ */
+ @Test
+ public void scaleDownAndUp() throws Exception {
+ int spoutScalingDown = -4;
+ int boltScalingUp = 6;
+
+ Map<String, Integer> componentChanges = new HashMap<>();
+ componentChanges.put(SPOUT_NAME, spoutScalingDown); // 0 spouts
+ componentChanges.put(BOLT_NAME, boltScalingUp); // 9 bolts
+ int numContainersBeforeRepack = 3;
+ int numContainersAfterRepack = 3;
+ doDefaultScalingTest(componentChanges, numContainersBeforeRepack, numContainersAfterRepack,
+ getDefaultMaxContainerResource());
+ }
+
+ @Test(expected = PackingException.class)
+ public void testScaleDownInvalidScaleFactor() throws Exception {
+ //try to remove more spout instances than possible
+ int spoutScalingDown = -5;
+ Map<String, Integer> componentChanges = new HashMap<>();
+ componentChanges.put(SPOUT_NAME, spoutScalingDown);
+
+ int numContainersBeforeRepack = 3;
+ doDefaultScalingTest(componentChanges, numContainersBeforeRepack, numContainersBeforeRepack,
+ getDefaultMaxContainerResource());
+ }
+
+ @Test(expected = PackingException.class)
+ public void testScaleDownInvalidComponent() throws Exception {
+ //try to remove a component that does not exist
+ Map<String, Integer> componentChanges = new HashMap<>();
+ componentChanges.put("SPOUT_FAKE", -10);
+ int numContainersBeforeRepack = 3;
+ doDefaultScalingTest(componentChanges, numContainersBeforeRepack, numContainersBeforeRepack,
+ getDefaultMaxContainerResource());
+ }
+
+ /**
+ * Test invalid RAM for instance
+ */
+ @Test(expected = PackingException.class)
+ public void testInvalidRamInstance() throws Exception {
+ ByteAmount maxContainerRam = ByteAmount.fromGigabytes(10);
+ ByteAmount boltRam = ByteAmount.ZERO;
+ topologyConfig.setContainerRamRequested(maxContainerRam);
+ topologyConfig.setComponentRam(BOLT_NAME, boltRam);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+ doPackingTest(topology,
+ instanceDefaultResources.cloneWithRam(boltRam), boltParallelism,
+ instanceDefaultResources, spoutParallelism,
+ 0, getDefaultMaxContainerResource().cloneWithRam(maxContainerRam));
}
/**
@@ -526,25 +469,25 @@ public class FirstFitDecreasingPackingTest extends CommonPackingTests {
public void testScaleDownOneComponentRemoveContainer() throws Exception {
@SuppressWarnings({"unchecked", "rawtypes"})
Pair<Integer, InstanceId>[] initialComponentInstances = new Pair[] {
- new Pair<>(1, new InstanceId(A, 1, 0)),
- new Pair<>(1, new InstanceId(A, 2, 1)),
- new Pair<>(1, new InstanceId(B, 3, 0)),
- new Pair<>(3, new InstanceId(B, 4, 1)),
- new Pair<>(3, new InstanceId(B, 5, 2)),
- new Pair<>(4, new InstanceId(B, 6, 3)),
- new Pair<>(4, new InstanceId(B, 7, 4))
+ new Pair<>(1, new InstanceId(SPOUT_NAME, 1, 0)),
+ new Pair<>(1, new InstanceId(SPOUT_NAME, 2, 1)),
+ new Pair<>(1, new InstanceId(BOLT_NAME, 3, 0)),
+ new Pair<>(3, new InstanceId(BOLT_NAME, 4, 1)),
+ new Pair<>(3, new InstanceId(BOLT_NAME, 5, 2)),
+ new Pair<>(4, new InstanceId(BOLT_NAME, 6, 3)),
+ new Pair<>(4, new InstanceId(BOLT_NAME, 7, 4))
};
Map<String, Integer> componentChanges = new HashMap<>();
- componentChanges.put(B, -2);
+ componentChanges.put(BOLT_NAME, -2);
@SuppressWarnings({"unchecked", "rawtypes"})
Pair<Integer, InstanceId>[] expectedComponentInstances = new Pair[] {
- new Pair<>(1, new InstanceId(A, 1, 0)),
- new Pair<>(1, new InstanceId(A, 2, 1)),
- new Pair<>(1, new InstanceId(B, 3, 0)),
- new Pair<>(3, new InstanceId(B, 4, 1)),
- new Pair<>(3, new InstanceId(B, 5, 2)),
+ new Pair<>(1, new InstanceId(SPOUT_NAME, 1, 0)),
+ new Pair<>(1, new InstanceId(SPOUT_NAME, 2, 1)),
+ new Pair<>(1, new InstanceId(BOLT_NAME, 3, 0)),
+ new Pair<>(3, new InstanceId(BOLT_NAME, 4, 1)),
+ new Pair<>(3, new InstanceId(BOLT_NAME, 5, 2)),
};
doScaleDownTest(initialComponentInstances, componentChanges, expectedComponentInstances);
@@ -554,26 +497,26 @@ public class FirstFitDecreasingPackingTest extends CommonPackingTests {
public void testScaleDownTwoComponentsRemoveContainer() throws Exception {
@SuppressWarnings({"unchecked", "rawtypes"})
Pair<Integer, InstanceId>[] initialComponentInstances = new Pair[] {
- new Pair<>(1, new InstanceId(A, 1, 0)),
- new Pair<>(1, new InstanceId(A, 2, 1)),
- new Pair<>(1, new InstanceId(B, 3, 0)),
- new Pair<>(1, new InstanceId(B, 4, 1)),
- new Pair<>(3, new InstanceId(A, 5, 2)),
- new Pair<>(3, new InstanceId(A, 6, 3)),
- new Pair<>(3, new InstanceId(B, 7, 2)),
- new Pair<>(3, new InstanceId(B, 8, 3))
+ new Pair<>(1, new InstanceId(SPOUT_NAME, 1, 0)),
+ new Pair<>(1, new InstanceId(SPOUT_NAME, 2, 1)),
+ new Pair<>(1, new InstanceId(BOLT_NAME, 3, 0)),
+ new Pair<>(1, new InstanceId(BOLT_NAME, 4, 1)),
+ new Pair<>(3, new InstanceId(SPOUT_NAME, 5, 2)),
+ new Pair<>(3, new InstanceId(SPOUT_NAME, 6, 3)),
+ new Pair<>(3, new InstanceId(BOLT_NAME, 7, 2)),
+ new Pair<>(3, new InstanceId(BOLT_NAME, 8, 3))
};
Map<String, Integer> componentChanges = new HashMap<>();
- componentChanges.put(A, -2);
- componentChanges.put(B, -2);
+ componentChanges.put(SPOUT_NAME, -2);
+ componentChanges.put(BOLT_NAME, -2);
@SuppressWarnings({"unchecked", "rawtypes"})
Pair<Integer, InstanceId>[] expectedComponentInstances = new Pair[] {
- new Pair<>(1, new InstanceId(A, 1, 0)),
- new Pair<>(1, new InstanceId(A, 2, 1)),
- new Pair<>(1, new InstanceId(B, 3, 0)),
- new Pair<>(1, new InstanceId(B, 4, 1)),
+ new Pair<>(1, new InstanceId(SPOUT_NAME, 1, 0)),
+ new Pair<>(1, new InstanceId(SPOUT_NAME, 2, 1)),
+ new Pair<>(1, new InstanceId(BOLT_NAME, 3, 0)),
+ new Pair<>(1, new InstanceId(BOLT_NAME, 4, 1)),
};
doScaleDownTest(initialComponentInstances, componentChanges, expectedComponentInstances);
@@ -583,126 +526,25 @@ public class FirstFitDecreasingPackingTest extends CommonPackingTests {
public void testScaleDownHomogenousFirst() throws Exception {
@SuppressWarnings({"unchecked", "rawtypes"})
Pair<Integer, InstanceId>[] initialComponentInstances = new Pair[] {
- new Pair<>(1, new InstanceId(A, 1, 0)),
- new Pair<>(1, new InstanceId(A, 2, 1)),
- new Pair<>(1, new InstanceId(B, 3, 0)),
- new Pair<>(3, new InstanceId(B, 4, 1)),
- new Pair<>(3, new InstanceId(B, 5, 2)),
- new Pair<>(3, new InstanceId(B, 6, 3)),
- new Pair<>(3, new InstanceId(B, 7, 4))
+ new Pair<>(1, new InstanceId(SPOUT_NAME, 1, 0)),
+ new Pair<>(1, new InstanceId(SPOUT_NAME, 2, 1)),
+ new Pair<>(1, new InstanceId(BOLT_NAME, 3, 0)),
+ new Pair<>(3, new InstanceId(BOLT_NAME, 4, 1)),
+ new Pair<>(3, new InstanceId(BOLT_NAME, 5, 2)),
+ new Pair<>(3, new InstanceId(BOLT_NAME, 6, 3)),
+ new Pair<>(3, new InstanceId(BOLT_NAME, 7, 4))
};
Map<String, Integer> componentChanges = new HashMap<>();
- componentChanges.put(B, -4);
+ componentChanges.put(BOLT_NAME, -4);
@SuppressWarnings({"unchecked", "rawtypes"})
Pair<Integer, InstanceId>[] expectedComponentInstances = new Pair[] {
- new Pair<>(1, new InstanceId(A, 1, 0)),
- new Pair<>(1, new InstanceId(A, 2, 1)),
- new Pair<>(1, new InstanceId(B, 3, 0))
+ new Pair<>(1, new InstanceId(SPOUT_NAME, 1, 0)),
+ new Pair<>(1, new InstanceId(SPOUT_NAME, 2, 1)),
+ new Pair<>(1, new InstanceId(BOLT_NAME, 3, 0))
};
doScaleDownTest(initialComponentInstances, componentChanges, expectedComponentInstances);
}
-
- /**
- * Test the scenario where scaling down and up is simultaneously requested
- */
- @Test
- public void scaleDownAndUp() throws Exception {
- int spoutScalingDown = -4;
- int boltScalingUp = 6;
-
- Map<String, Integer> componentChanges = new HashMap<>();
- componentChanges.put(SPOUT_NAME, spoutScalingDown); // 0 spouts
- componentChanges.put(BOLT_NAME, boltScalingUp); // 9 bolts
- int numContainersBeforeRepack = 2;
- PackingPlan newPackingPlan = doDefaultScalingTest(componentChanges, numContainersBeforeRepack);
-
- Assert.assertEquals(3, newPackingPlan.getContainers().size());
- Assert.assertEquals((Integer) (totalInstances + spoutScalingDown + boltScalingUp),
- newPackingPlan.getInstanceCount());
- AssertPacking.assertNumInstances(newPackingPlan.getContainers(),
- BOLT_NAME, 9);
- AssertPacking.assertNumInstances(newPackingPlan.getContainers(),
- SPOUT_NAME, 0);
- }
-
- @Test(expected = PackingException.class)
- public void testScaleDownInvalidScaleFactor() throws Exception {
-
- //try to remove more spout instances than possible
- int spoutScalingDown = -5;
- Map<String, Integer> componentChanges = new HashMap<>();
- componentChanges.put(SPOUT_NAME, spoutScalingDown);
-
- int numContainersBeforeRepack = 2;
- doDefaultScalingTest(componentChanges, numContainersBeforeRepack);
- }
-
- @Test(expected = PackingException.class)
- public void testScaleDownInvalidComponent() throws Exception {
- Map<String, Integer> componentChanges = new HashMap<>();
- componentChanges.put("SPOUT_FAKE", -10); //try to remove a component that does not exist
- int numContainersBeforeRepack = 2;
- doDefaultScalingTest(componentChanges, numContainersBeforeRepack);
- }
-
- /**
- * Test invalid RAM for instance
- */
- @Test(expected = PackingException.class)
- public void testInvalidRamInstance() throws Exception {
- ByteAmount maxContainerRam = ByteAmount.fromGigabytes(10);
- int defaultNumInstancesperContainer = 4;
-
- // Explicit set component RAM map
- ByteAmount boltRam = ByteAmount.ZERO;
-
- topologyConfig.setContainerMaxRamHint(maxContainerRam);
- topologyConfig.setComponentRam(BOLT_NAME, boltRam);
-
- TopologyAPI.Topology topologyExplicitRamMap =
- getTopology(spoutParallelism, boltParallelism, topologyConfig);
- PackingPlan packingPlanExplicitRamMap = pack(topologyExplicitRamMap);
- Assert.assertEquals(totalInstances, packingPlanExplicitRamMap.getInstanceCount());
- AssertPacking.assertNumInstances(packingPlanExplicitRamMap.getContainers(), BOLT_NAME, 3);
- AssertPacking.assertNumInstances(packingPlanExplicitRamMap.getContainers(), SPOUT_NAME, 4);
- AssertPacking.assertContainerRam(packingPlanExplicitRamMap.getContainers(),
- instanceDefaultResources.getRam().multiply(defaultNumInstancesperContainer));
- }
-
- @Test
- public void testTwoContainersRequested() throws Exception {
- doTestContainerCountRequested(2, 2);
- }
-
- /**
- * Test the scenario where container level resource config are set
- */
- protected void doTestContainerCountRequested(int requestedContainers,
- int expectedContainer) throws Exception {
-
- // Explicit set resources for container
- topologyConfig.setContainerRamRequested(ByteAmount.fromGigabytes(10));
- topologyConfig.setContainerDiskRequested(ByteAmount.fromGigabytes(20));
- topologyConfig.setContainerCpuRequested(30);
- topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, requestedContainers);
-
- TopologyAPI.Topology topologyExplicitResourcesConfig =
- getTopology(spoutParallelism, boltParallelism, topologyConfig);
- PackingPlan packingPlanExplicitResourcesConfig = pack(topologyExplicitResourcesConfig);
-
- Assert.assertEquals(expectedContainer,
- packingPlanExplicitResourcesConfig.getContainers().size());
- Assert.assertEquals(totalInstances, packingPlanExplicitResourcesConfig.getInstanceCount());
-
- // RAM for bolt/spout should be the value in component RAM map
- for (PackingPlan.ContainerPlan containerPlan
- : packingPlanExplicitResourcesConfig.getContainers()) {
- for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
- Assert.assertEquals(instanceDefaultResources, instancePlan.getResource());
- }
- }
- }
}
diff --git a/heron/packing/tests/java/org/apache/heron/packing/builder/PackingPlanBuilderTest.java b/heron/packing/tests/java/org/apache/heron/packing/builder/PackingPlanBuilderTest.java
index dfd0c85..ec852aa 100644
--- a/heron/packing/tests/java/org/apache/heron/packing/builder/PackingPlanBuilderTest.java
+++ b/heron/packing/tests/java/org/apache/heron/packing/builder/PackingPlanBuilderTest.java
@@ -53,9 +53,12 @@ public class PackingPlanBuilderTest {
@Before
public void init() {
testContainers = new ArrayList<>();
- testContainers.add(new Container(3, null, 5));
- testContainers.add(new Container(6, null, 20));
- testContainers.add(new Container(4, null, 20));
+ testContainers.add(new Container(3, null,
+ new Resource(5, ByteAmount.fromGigabytes(5), ByteAmount.fromGigabytes(5))));
+ testContainers.add(new Container(6, null,
+ new Resource(20, ByteAmount.fromGigabytes(20), ByteAmount.fromGigabytes(20))));
+ testContainers.add(new Container(4, null,
+ new Resource(20, ByteAmount.fromGigabytes(20), ByteAmount.fromGigabytes(20))));
}
@Test
@@ -118,9 +121,8 @@ public class PackingPlanBuilderTest {
* Tests the getContainers method.
*/
@Test
- public void testGetContainers() throws ResourceExceededException {
-
- int paddingPercentage = 10;
+ public void testGetContainers() {
+ Resource padding = new Resource(1.0, ByteAmount.fromGigabytes(1), ByteAmount.fromGigabytes(1));
Map<Integer, List<InstanceId>> packing = new HashMap<>();
packing.put(7, Arrays.asList(
new InstanceId("spout", 1, 0),
@@ -131,12 +133,12 @@ public class PackingPlanBuilderTest {
PackingPlan packingPlan = generatePacking(packing);
Map<Integer, Container> containers = PackingPlanBuilder.getContainers(
- packingPlan, paddingPercentage,
+ packingPlan, packingPlan.getMaxContainerResources(), padding,
new HashMap<String, TreeSet<Integer>>(), new TreeSet<Integer>());
assertEquals(packing.size(), containers.size());
for (Integer containerId : packing.keySet()) {
Container foundContainer = containers.get(containerId);
- assertEquals(paddingPercentage, foundContainer.getPaddingPercentage());
+ assertEquals(padding, foundContainer.getPadding());
assertEquals(packingPlan.getMaxContainerResources(), foundContainer.getCapacity());
assertEquals(2, foundContainer.getInstances().size());
}
@@ -217,7 +219,8 @@ public class PackingPlanBuilderTest {
@SuppressWarnings({"unchecked", "rawtypes"})
Pair<Integer, InstanceId>[] added = new Pair[] {
- new Pair<>(3, new InstanceId("componentB", 4, 1))
+ new Pair<>(3, new InstanceId("componentB", 4, 1)),
+ new Pair<>(3, new InstanceId("componentB", 5, 2))
};
PackingTestHelper.addToTestPackingPlan(
TOPOLOGY_ID, plan, PackingTestHelper.toContainerIdComponentNames(added), 0);
@@ -292,7 +295,7 @@ public class PackingPlanBuilderTest {
@Override
public double getScore(Container container) {
- return container.getPaddingPercentage();
+ return container.getPadding().getCpu();
}
}
}
diff --git a/heron/packing/tests/java/org/apache/heron/packing/builder/ScorerTest.java b/heron/packing/tests/java/org/apache/heron/packing/builder/ScorerTest.java
index 5845ba3..084e664 100644
--- a/heron/packing/tests/java/org/apache/heron/packing/builder/ScorerTest.java
+++ b/heron/packing/tests/java/org/apache/heron/packing/builder/ScorerTest.java
@@ -41,9 +41,9 @@ public class ScorerTest {
Resource containerCapacity
= new Resource(1000, ByteAmount.fromGigabytes(100), ByteAmount.fromGigabytes(100));
testContainers = new Container[] {
- new Container(1, containerCapacity, 0),
- new Container(3, containerCapacity, 0),
- new Container(4, containerCapacity, 0),
+ new Container(1, containerCapacity, Resource.EMPTY_RESOURCE),
+ new Container(3, containerCapacity, Resource.EMPTY_RESOURCE),
+ new Container(4, containerCapacity, Resource.EMPTY_RESOURCE),
};
addInstance(testContainers[0], "A", 0);
diff --git a/heron/packing/tests/java/org/apache/heron/packing/roundrobin/ResourceCompliantRRPackingTest.java b/heron/packing/tests/java/org/apache/heron/packing/roundrobin/ResourceCompliantRRPackingTest.java
index 4aa4ce0..179ce13 100644
--- a/heron/packing/tests/java/org/apache/heron/packing/roundrobin/ResourceCompliantRRPackingTest.java
+++ b/heron/packing/tests/java/org/apache/heron/packing/roundrobin/ResourceCompliantRRPackingTest.java
@@ -28,10 +28,8 @@ import org.junit.Assert;
import org.junit.Test;
import org.apache.heron.api.generated.TopologyAPI;
-import org.apache.heron.api.utils.TopologyUtils;
import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.common.basics.Pair;
-import org.apache.heron.packing.AssertPacking;
import org.apache.heron.packing.CommonPackingTests;
import org.apache.heron.packing.utils.PackingUtils;
import org.apache.heron.spi.packing.IPacking;
@@ -53,19 +51,15 @@ public class ResourceCompliantRRPackingTest extends CommonPackingTests {
return new ResourceCompliantRRPacking();
}
- private int countComponent(String component, Set<PackingPlan.InstancePlan> instances) {
- int count = 0;
- for (PackingPlan.InstancePlan instancePlan : instances) {
- if (component.equals(instancePlan.getComponentName())) {
- count++;
- }
- }
- return count;
+ @Test (expected = PackingException.class)
+ public void testFailureInsufficientContainerRam() throws Exception {
+ topologyConfig.setContainerRamRequested(ByteAmount.ZERO);
+ pack(getTopology(spoutParallelism, boltParallelism, topologyConfig));
}
- @Test(expected = PackingException.class)
- public void testFailureInsufficientContainerRamRequested() throws Exception {
- topologyConfig.setContainerRamRequested(ByteAmount.ZERO);
+ @Test (expected = PackingException.class)
+ public void testFailureInsufficientContainerCpu() throws Exception {
+ topologyConfig.setContainerCpuRequested(1.0);
pack(getTopology(spoutParallelism, boltParallelism, topologyConfig));
}
@@ -74,11 +68,10 @@ public class ResourceCompliantRRPackingTest extends CommonPackingTests {
*/
@Test
public void testDefaultResources() throws Exception {
- int numContainers = 2;
- PackingPlan packingPlanNoExplicitResourcesConfig = pack(topology);
-
- Assert.assertEquals(numContainers, packingPlanNoExplicitResourcesConfig.getContainers().size());
- Assert.assertEquals(totalInstances, packingPlanNoExplicitResourcesConfig.getInstanceCount());
+ doPackingTest(topology,
+ instanceDefaultResources, boltParallelism,
+ instanceDefaultResources, spoutParallelism,
+ 3, getDefaultMaxContainerResource());
}
/**
@@ -86,16 +79,14 @@ public class ResourceCompliantRRPackingTest extends CommonPackingTests {
*/
@Test
public void testDefaultContainerSizeWithPadding() throws Exception {
- int numContainers = 2;
int padding = 50;
topologyConfig.setContainerPaddingPercentage(padding);
- TopologyAPI.Topology newTopology = getTopology(spoutParallelism, boltParallelism,
- topologyConfig);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
- PackingPlan packingPlan = pack(newTopology);
-
- Assert.assertEquals(numContainers, packingPlan.getContainers().size());
- Assert.assertEquals(totalInstances, packingPlan.getInstanceCount());
+ doPackingTest(topology,
+ instanceDefaultResources, boltParallelism,
+ instanceDefaultResources, spoutParallelism,
+ 4, getDefaultMaxContainerResource());
}
/**
@@ -106,36 +97,43 @@ public class ResourceCompliantRRPackingTest extends CommonPackingTests {
int numContainers = 1;
// Set up the topology and its config
- topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
+ topologyConfig.setNumStmgrs(numContainers);
// Explicit set resources for container
ByteAmount containerRam = ByteAmount.fromGigabytes(10);
ByteAmount containerDisk = ByteAmount.fromGigabytes(20);
double containerCpu = 30;
+ Resource containerResource = new Resource(containerCpu, containerRam, containerDisk);
+
+ Resource padding = PackingUtils.finalizePadding(
+ new Resource(containerCpu, containerRam, containerDisk),
+ new Resource(PackingUtils.DEFAULT_CONTAINER_CPU_PADDING,
+ PackingUtils.DEFAULT_CONTAINER_RAM_PADDING,
+ PackingUtils.DEFAULT_CONTAINER_RAM_PADDING),
+ PackingUtils.DEFAULT_CONTAINER_PADDING_PERCENTAGE);
topologyConfig.setContainerRamRequested(containerRam);
topologyConfig.setContainerDiskRequested(containerDisk);
topologyConfig.setContainerCpuRequested(containerCpu);
- TopologyAPI.Topology topologyExplicitResourcesConfig =
- getTopology(spoutParallelism, boltParallelism, topologyConfig);
- PackingPlan packingPlanExplicitResourcesConfig = pack(topologyExplicitResourcesConfig);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
- Assert.assertEquals(numContainers, packingPlanExplicitResourcesConfig.getContainers().size());
- Assert.assertEquals(totalInstances, packingPlanExplicitResourcesConfig.getInstanceCount());
+ PackingPlan packingPlan = doPackingTest(topology,
+ instanceDefaultResources, boltParallelism,
+ instanceDefaultResources, spoutParallelism,
+ numContainers, containerResource);
- for (PackingPlan.ContainerPlan containerPlan
- : packingPlanExplicitResourcesConfig.getContainers()) {
- Assert.assertEquals(Math.round(PackingUtils.increaseBy(totalInstances
- * instanceDefaultResources.getCpu(), DEFAULT_CONTAINER_PADDING)),
+ for (PackingPlan.ContainerPlan containerPlan : packingPlan.getContainers()) {
+ Assert.assertEquals(Math.round(totalInstances * instanceDefaultResources.getCpu()
+ + padding.getCpu()),
(long) containerPlan.getRequiredResource().getCpu());
Assert.assertEquals(instanceDefaultResources.getRam()
.multiply(totalInstances)
- .increaseBy(DEFAULT_CONTAINER_PADDING),
+ .plus(padding.getRam()),
containerPlan.getRequiredResource().getRam());
Assert.assertEquals(instanceDefaultResources.getDisk()
- .multiply(totalInstances)
- .increaseBy(DEFAULT_CONTAINER_PADDING),
+ .multiply(totalInstances)
+ .plus(padding.getDisk()),
containerPlan.getRequiredResource().getDisk());
// All instances' resource requirement should be equal
@@ -151,7 +149,8 @@ public class ResourceCompliantRRPackingTest extends CommonPackingTests {
@Test
public void testContainersRequestedExceedsInstanceCount() throws Exception {
- doTestContainerCountRequested(8, 7); // each of the 7 instances will get their own container
+ // each of the 7 instances will get their own container
+ doTestContainerCountRequested(8, 7);
}
/**
@@ -159,7 +158,7 @@ public class ResourceCompliantRRPackingTest extends CommonPackingTests {
*/
@Test
public void testCompleteRamMapRequested() throws Exception {
- int numContainers = 2;
+ int numContainers = 3;
// Explicit set resources for container
// the value should be ignored, since we set the complete component RAM map
@@ -171,14 +170,12 @@ public class ResourceCompliantRRPackingTest extends CommonPackingTests {
topologyConfig.setContainerRamRequested(containerRam);
topologyConfig.setComponentRam(BOLT_NAME, boltRam);
- TopologyAPI.Topology topologyExplicitRamMap =
- getTopology(spoutParallelism, boltParallelism, topologyConfig);
- PackingPlan packingPlanExplicitRamMap = pack(topologyExplicitRamMap);
- Assert.assertEquals(totalInstances, packingPlanExplicitRamMap.getInstanceCount());
- Assert.assertEquals(numContainers, packingPlanExplicitRamMap.getContainers().size());
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
- AssertPacking.assertContainers(packingPlanExplicitRamMap.getContainers(),
- BOLT_NAME, SPOUT_NAME, boltRam, instanceDefaultResources.getRam(), containerRam);
+ doPackingTest(topology,
+ instanceDefaultResources.cloneWithRam(boltRam), boltParallelism,
+ instanceDefaultResources, spoutParallelism,
+ numContainers, getDefaultMaxContainerResource().cloneWithRam(containerRam));
}
/**
@@ -186,7 +183,7 @@ public class ResourceCompliantRRPackingTest extends CommonPackingTests {
*/
@Test
public void testPartialRamMap() throws Exception {
- int numContainers = 2;
+ int numContainers = 3;
// Explicit set resources for container
ByteAmount containerRam = ByteAmount.fromGigabytes(10);
@@ -199,14 +196,12 @@ public class ResourceCompliantRRPackingTest extends CommonPackingTests {
topologyConfig.setComponentRam(BOLT_NAME, boltRam);
topologyConfig.setComponentRam(SPOUT_NAME, spoutRam);
- TopologyAPI.Topology topologyExplicitRamMap =
- getTopology(spoutParallelism, boltParallelism, topologyConfig);
- PackingPlan packingPlanExplicitRamMap = pack(topologyExplicitRamMap);
- Assert.assertEquals(totalInstances, packingPlanExplicitRamMap.getInstanceCount());
- Assert.assertEquals(numContainers, packingPlanExplicitRamMap.getContainers().size());
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
- AssertPacking.assertContainers(packingPlanExplicitRamMap.getContainers(),
- BOLT_NAME, SPOUT_NAME, boltRam, spoutRam, containerRam);
+ doPackingTest(topology,
+ instanceDefaultResources.cloneWithRam(boltRam), boltParallelism,
+ instanceDefaultResources.cloneWithRam(spoutRam), spoutParallelism,
+ numContainers, getDefaultMaxContainerResource().cloneWithRam(containerRam));
}
/**
@@ -217,18 +212,17 @@ public class ResourceCompliantRRPackingTest extends CommonPackingTests {
int numContainers = 1;
// Set up the topology and its config
- topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
+ topologyConfig.setNumStmgrs(numContainers);
// Explicit set resources for container
ByteAmount containerRam = ByteAmount.fromGigabytes(2);
-
topologyConfig.setContainerRamRequested(containerRam);
- TopologyAPI.Topology newTopology =
- getTopology(spoutParallelism, boltParallelism, topologyConfig);
- PackingPlan packingPlan = pack(newTopology);
- Assert.assertEquals(7, packingPlan.getContainers().size());
- Assert.assertEquals(totalInstances, packingPlan.getInstanceCount());
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+ doPackingTest(topology,
+ instanceDefaultResources, boltParallelism,
+ instanceDefaultResources, spoutParallelism,
+ 7, getDefaultMaxContainerResource().cloneWithRam(containerRam));
}
/**
@@ -239,7 +233,7 @@ public class ResourceCompliantRRPackingTest extends CommonPackingTests {
int numContainers = 1;
// Set up the topology and its config
- topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
+ topologyConfig.setNumStmgrs(numContainers);
// Explicit set resources for container
ByteAmount containerRam = ByteAmount.fromGigabytes(3);
@@ -252,11 +246,11 @@ public class ResourceCompliantRRPackingTest extends CommonPackingTests {
topologyConfig.setComponentRam(BOLT_NAME, boltRam);
topologyConfig.setComponentRam(SPOUT_NAME, spoutRam);
- TopologyAPI.Topology topologyExplicitRamMap =
- getTopology(spoutParallelism, boltParallelism, topologyConfig);
- PackingPlan packingPlan = pack(topologyExplicitRamMap);
- Assert.assertEquals(7, packingPlan.getContainers().size());
- Assert.assertEquals(totalInstances, packingPlan.getInstanceCount());
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+ doPackingTest(topology,
+ instanceDefaultResources.cloneWithRam(boltRam), boltParallelism,
+ instanceDefaultResources.cloneWithRam(spoutRam), spoutParallelism,
+ 7, getDefaultMaxContainerResource().cloneWithRam(containerRam));
}
/**
@@ -264,27 +258,18 @@ public class ResourceCompliantRRPackingTest extends CommonPackingTests {
*/
@Test
public void testEvenPacking() throws Exception {
- int numContainers = 2;
+ int numContainers = 3;
int componentParallelism = 4;
-
+ boltParallelism = componentParallelism;
+ spoutParallelism = componentParallelism;
// Set up the topology and its config
- topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
-
- TopologyAPI.Topology newTopology =
- getTopology(componentParallelism, componentParallelism, topologyConfig);
-
- int numInstance = TopologyUtils.getTotalInstance(newTopology);
- // Two components
- Assert.assertEquals(2 * componentParallelism, numInstance);
- PackingPlan output = pack(newTopology);
- Assert.assertEquals(numContainers, output.getContainers().size());
- Assert.assertEquals((Integer) numInstance, output.getInstanceCount());
-
- for (PackingPlan.ContainerPlan container : output.getContainers()) {
- Assert.assertEquals(numInstance / numContainers, container.getInstances().size());
- Assert.assertEquals(2, countComponent("spout", container.getInstances()));
- Assert.assertEquals(2, countComponent("bolt", container.getInstances()));
- }
+ topologyConfig.setNumStmgrs(numContainers);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+ doPackingTest(topology,
+ instanceDefaultResources, boltParallelism,
+ instanceDefaultResources, spoutParallelism,
+ numContainers, getDefaultMaxContainerResource());
}
/**
@@ -296,30 +281,11 @@ public class ResourceCompliantRRPackingTest extends CommonPackingTests {
int numScalingInstances = 5;
Map<String, Integer> componentChanges = new HashMap<>();
componentChanges.put(BOLT_NAME, numScalingInstances);
- int numContainersBeforeRepack = 2;
- PackingPlan newPackingPlan = doDefaultScalingTest(componentChanges, numContainersBeforeRepack);
- Assert.assertEquals(4, newPackingPlan.getContainers().size());
- Assert.assertEquals((Integer) (totalInstances + numScalingInstances),
- newPackingPlan.getInstanceCount());
- AssertPacking.assertContainers(newPackingPlan.getContainers(),
- BOLT_NAME, SPOUT_NAME, instanceDefaultResources.getRam(),
- instanceDefaultResources.getRam(), null);
- for (PackingPlan.ContainerPlan containerPlan
- : newPackingPlan.getContainers()) {
- Assert.assertEquals(Math.round(PackingUtils.increaseBy(
- containerPlan.getInstances().size() * instanceDefaultResources.getCpu(),
- DEFAULT_CONTAINER_PADDING)), (long) containerPlan.getRequiredResource().getCpu());
-
- Assert.assertEquals(instanceDefaultResources.getRam()
- .multiply(containerPlan.getInstances().size())
- .increaseBy(DEFAULT_CONTAINER_PADDING),
- containerPlan.getRequiredResource().getRam());
+ int numContainersBeforeRepack = 3;
+ int numContainersAfterRepack = 5;
- Assert.assertEquals(instanceDefaultResources.getDisk()
- .multiply(containerPlan.getInstances().size())
- .increaseBy(DEFAULT_CONTAINER_PADDING),
- containerPlan.getRequiredResource().getDisk());
- }
+ doDefaultScalingTest(componentChanges, numContainersBeforeRepack, numContainersAfterRepack,
+ getDefaultMaxContainerResource());
}
/**
@@ -334,42 +300,20 @@ public class ResourceCompliantRRPackingTest extends CommonPackingTests {
ByteAmount maxContainerRam = ByteAmount.fromGigabytes(10);
topologyConfig.setComponentRam(BOLT_NAME, boltRam);
topologyConfig.setContainerRamRequested(maxContainerRam);
-
- TopologyAPI.Topology topologyExplicitRamMap =
- getTopology(spoutParallelism, boltParallelism, topologyConfig);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
int numScalingInstances = 3;
Map<String, Integer> componentChanges = new HashMap<>();
componentChanges.put(BOLT_NAME, numScalingInstances);
- int numContainersBeforeRepack = 3;
- PackingPlan newPackingPlan =
- doScalingTest(topologyExplicitRamMap, componentChanges, boltRam,
- boltParallelism, instanceDefaultResources.getRam(), spoutParallelism,
- numContainersBeforeRepack, totalInstances);
- Assert.assertEquals(6, newPackingPlan.getContainers().size());
- Assert.assertEquals((Integer) (totalInstances + numScalingInstances),
- newPackingPlan.getInstanceCount());
- AssertPacking.assertContainers(newPackingPlan.getContainers(),
- BOLT_NAME, SPOUT_NAME, boltRam, instanceDefaultResources.getRam(), null);
-
- for (PackingPlan.ContainerPlan containerPlan : newPackingPlan.getContainers()) {
- //Each container either contains a single bolt or 1 bolt and 2 spouts or 1 bolt and 1 spout
- if (containerPlan.getInstances().size() == 1) {
- Assert.assertEquals(boltRam.increaseBy(paddingPercentage),
- containerPlan.getRequiredResource().getRam());
- }
- if (containerPlan.getInstances().size() == 2) {
- Assert.assertEquals(boltRam.plus(instanceDefaultResources.getRam())
- .increaseBy(paddingPercentage),
- containerPlan.getRequiredResource().getRam());
- }
- if (containerPlan.getInstances().size() == 3) {
- Assert.assertEquals(boltRam.plus(instanceDefaultResources.getRam().multiply(2))
- .increaseBy(paddingPercentage),
- containerPlan.getRequiredResource().getRam());
- }
- }
+ int numContainersBeforeRepack = 4;
+ int numContainersAfterRepack = 6;
+
+ doPackingAndScalingTest(topology, componentChanges,
+ instanceDefaultResources.cloneWithRam(boltRam), boltParallelism,
+ instanceDefaultResources, spoutParallelism,
+ numContainersBeforeRepack, numContainersAfterRepack,
+ getDefaultMaxContainerResource().cloneWithRam(maxContainerRam));
}
/**
@@ -377,7 +321,6 @@ public class ResourceCompliantRRPackingTest extends CommonPackingTests {
*/
@Test
public void testPartialRamMapScaling() throws Exception {
-
// Explicit set resources for container
ByteAmount maxContainerRam = ByteAmount.fromGigabytes(10);
// Explicit set component RAM map
@@ -385,23 +328,19 @@ public class ResourceCompliantRRPackingTest extends CommonPackingTests {
topologyConfig.setContainerRamRequested(maxContainerRam);
topologyConfig.setComponentRam(BOLT_NAME, boltRam);
- TopologyAPI.Topology topologyExplicitRamMap =
- getTopology(spoutParallelism, boltParallelism, topologyConfig);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
int numScalingInstances = 3;
Map<String, Integer> componentChanges = new HashMap<>();
componentChanges.put(BOLT_NAME, numScalingInstances);
int numContainersBeforeRepack = 3;
- PackingPlan newPackingPlan = doScalingTest(topologyExplicitRamMap, componentChanges, boltRam,
- boltParallelism, instanceDefaultResources.getRam(), spoutParallelism,
- numContainersBeforeRepack, totalInstances);
-
- Assert.assertEquals(6, newPackingPlan.getContainers().size());
- Assert.assertEquals((Integer) (totalInstances + numScalingInstances),
- newPackingPlan.getInstanceCount());
- AssertPacking.assertContainers(newPackingPlan.getContainers(),
- BOLT_NAME, SPOUT_NAME, boltRam, instanceDefaultResources.getRam(), null);
+ int numContainersAfterRepack = 4;
+ doPackingAndScalingTest(topology, componentChanges,
+ instanceDefaultResources.cloneWithRam(boltRam), boltParallelism,
+ instanceDefaultResources, spoutParallelism,
+ numContainersBeforeRepack, numContainersAfterRepack,
+ getDefaultMaxContainerResource().cloneWithRam(maxContainerRam));
}
/**
@@ -415,15 +354,10 @@ public class ResourceCompliantRRPackingTest extends CommonPackingTests {
Map<String, Integer> componentChanges = new HashMap<>();
componentChanges.put(SPOUT_NAME, spoutScalingDown); //leave 2 spouts
componentChanges.put(BOLT_NAME, boltScalingDown); //leave 2 bolts
- int numContainersBeforeRepack = 2;
- PackingPlan newPackingPlan = doDefaultScalingTest(componentChanges, numContainersBeforeRepack);
- Assert.assertEquals(1, newPackingPlan.getContainers().size());
- Assert.assertEquals((Integer) (totalInstances + spoutScalingDown + boltScalingDown),
- newPackingPlan.getInstanceCount());
- AssertPacking.assertNumInstances(newPackingPlan.getContainers(),
- BOLT_NAME, 2);
- AssertPacking.assertNumInstances(newPackingPlan.getContainers(),
- SPOUT_NAME, 2);
+ int numContainersBeforeRepack = 3;
+ int numContainersAfterRepack = 2;
+ doDefaultScalingTest(componentChanges, numContainersBeforeRepack, numContainersAfterRepack,
+ getDefaultMaxContainerResource());
}
/**
@@ -441,15 +375,10 @@ public class ResourceCompliantRRPackingTest extends CommonPackingTests {
Map<String, Integer> componentChanges = new HashMap<>();
componentChanges.put(SPOUT_NAME, spoutScalingDown); //leave 1 spout
componentChanges.put(BOLT_NAME, boltScalingDown); //leave 1 bolt
- int numContainersBeforeRepack = 2;
- PackingPlan newPackingPlan = doDefaultScalingTest(componentChanges, numContainersBeforeRepack);
- Assert.assertEquals(1, newPackingPlan.getContainers().size());
- Assert.assertEquals((Integer) (totalInstances + spoutScalingDown + boltScalingDown),
- newPackingPlan.getInstanceCount());
- AssertPacking.assertNumInstances(newPackingPlan.getContainers(),
- BOLT_NAME, 0);
- AssertPacking.assertNumInstances(newPackingPlan.getContainers(),
- SPOUT_NAME, 1);
+ int numContainersBeforeRepack = 3;
+ int numContainersAfterRepack = 1;
+ doDefaultScalingTest(componentChanges, numContainersBeforeRepack, numContainersAfterRepack,
+ getDefaultMaxContainerResource());
}
/**
@@ -459,7 +388,7 @@ public class ResourceCompliantRRPackingTest extends CommonPackingTests {
@Test
public void scaleDownAndUpWithExtraPadding() throws Exception {
int paddingPercentage = 50;
- int numContainers = 1;
+ int numContainers = 2;
topologyConfig.setContainerPaddingPercentage(paddingPercentage);
// Explicit set resources for container
ByteAmount maxContainerRam = ByteAmount.fromGigabytes(12);
@@ -469,11 +398,10 @@ public class ResourceCompliantRRPackingTest extends CommonPackingTests {
topologyConfig.setComponentRam(SPOUT_NAME, spoutRam);
topologyConfig.setNumStmgrs(numContainers);
- int noBolts = 2;
- int noSpouts = 1;
+ boltParallelism = 2;
+ spoutParallelism = 1;
- TopologyAPI.Topology topologyExplicitRamMap =
- getTopology(noSpouts, noBolts, topologyConfig);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
int spoutScalingUp = 1;
int boltScalingDown = -2;
@@ -481,18 +409,14 @@ public class ResourceCompliantRRPackingTest extends CommonPackingTests {
Map<String, Integer> componentChanges = new HashMap<>();
componentChanges.put(SPOUT_NAME, spoutScalingUp); // 2 spouts
componentChanges.put(BOLT_NAME, boltScalingDown); // 0 bolts
- int numContainersBeforeRepack = 1;
- PackingPlan newPackingPlan = doScalingTest(topologyExplicitRamMap, componentChanges,
- instanceDefaultResources.getRam(), noBolts, spoutRam, noSpouts,
- numContainersBeforeRepack, noSpouts + noBolts);
-
- Assert.assertEquals(1, newPackingPlan.getContainers().size());
- Assert.assertEquals((Integer) (noSpouts + noBolts + spoutScalingUp + boltScalingDown),
- newPackingPlan.getInstanceCount());
- AssertPacking.assertNumInstances(newPackingPlan.getContainers(),
- BOLT_NAME, noBolts + boltScalingDown);
- AssertPacking.assertNumInstances(newPackingPlan.getContainers(),
- SPOUT_NAME, noSpouts + spoutScalingUp);
+ int numContainersBeforeRepack = 2;
+ int numContainersAfterRepack = 1;
+
+ doPackingAndScalingTest(topology, componentChanges,
+ instanceDefaultResources, boltParallelism,
+ instanceDefaultResources.cloneWithRam(spoutRam), spoutParallelism,
+ numContainersBeforeRepack, numContainersAfterRepack,
+ getDefaultMaxContainerResource().cloneWithRam(maxContainerRam));
}
/**
@@ -513,11 +437,10 @@ public class ResourceCompliantRRPackingTest extends CommonPackingTests {
topologyConfig.setComponentRam(SPOUT_NAME, spoutRam);
topologyConfig.setNumStmgrs(numContainers);
- int noBolts = 3;
- int noSpouts = 1;
+ boltParallelism = 3;
+ spoutParallelism = 1;
- TopologyAPI.Topology topologyExplicitRamMap =
- getTopology(noSpouts, noBolts, topologyConfig);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
int spoutScalingUp = 1;
int boltScalingDown = -1;
@@ -525,18 +448,14 @@ public class ResourceCompliantRRPackingTest extends CommonPackingTests {
Map<String, Integer> componentChanges = new HashMap<>();
componentChanges.put(SPOUT_NAME, spoutScalingUp); // 2 spouts
componentChanges.put(BOLT_NAME, boltScalingDown); // 2 bolts
- int numContainersBeforeRepack = 1;
- PackingPlan newPackingPlan = doScalingTest(topologyExplicitRamMap, componentChanges,
- instanceDefaultResources.getRam(), noBolts, spoutRam, noSpouts,
- numContainersBeforeRepack, noSpouts + noBolts);
-
- Assert.assertEquals(2, newPackingPlan.getContainers().size());
- Assert.assertEquals((Integer) (noSpouts + noBolts + spoutScalingUp + boltScalingDown),
- newPackingPlan.getInstanceCount());
- AssertPacking.assertNumInstances(newPackingPlan.getContainers(),
- BOLT_NAME, noBolts + boltScalingDown);
- AssertPacking.assertNumInstances(newPackingPlan.getContainers(),
- SPOUT_NAME, noSpouts + spoutScalingUp);
+ int numContainersBeforeRepack = 2;
+ int numContainersAfterRepack = 2;
+
+ doPackingAndScalingTest(topology, componentChanges,
+ instanceDefaultResources, boltParallelism,
+ instanceDefaultResources.cloneWithRam(spoutRam), spoutParallelism,
+ numContainersBeforeRepack, numContainersAfterRepack,
+ getDefaultMaxContainerResource().cloneWithRam(maxContainerRam));
}
@Test
@@ -547,104 +466,10 @@ public class ResourceCompliantRRPackingTest extends CommonPackingTests {
Map<String, Integer> componentChanges = new HashMap<>();
componentChanges.put(SPOUT_NAME, spoutScalingUp); // 8 spouts
componentChanges.put(BOLT_NAME, boltScalingUp); // 8 bolts
- int numContainersBeforeRepack = 2;
- PackingPlan newPackingPlan = doDefaultScalingTest(componentChanges, numContainersBeforeRepack);
- Assert.assertEquals(4, newPackingPlan.getContainers().size());
- Assert.assertEquals((Integer) (totalInstances + spoutScalingUp + boltScalingUp),
- newPackingPlan.getInstanceCount());
- AssertPacking.assertNumInstances(newPackingPlan.getContainers(),
- BOLT_NAME, boltParallelism + boltScalingUp);
- AssertPacking.assertNumInstances(newPackingPlan.getContainers(),
- SPOUT_NAME, spoutParallelism + spoutScalingUp);
- }
-
- /**
- * Test the scenario where scaling down removes instances from containers that are most imbalanced
- * (i.e., tending towards homogeneity) first. If there is a tie (e.g. AABB, AB), chooses from the
- * container with the fewest instances, to favor ultimately removing containers. If there is
- * still a tie, favor removing from higher numbered containers
- */
- @Test
- public void testScaleDownOneComponentRemoveContainer() throws Exception {
- @SuppressWarnings({"unchecked", "rawtypes"})
- Pair<Integer, InstanceId>[] initialComponentInstances = new Pair[] {
- new Pair<>(1, new InstanceId(A, 1, 0)),
- new Pair<>(1, new InstanceId(A, 2, 1)),
- new Pair<>(1, new InstanceId(B, 3, 0)),
- new Pair<>(3, new InstanceId(B, 4, 1)),
- new Pair<>(3, new InstanceId(B, 5, 2)),
- new Pair<>(4, new InstanceId(B, 6, 3)),
- new Pair<>(4, new InstanceId(B, 7, 4))
- };
-
- Map<String, Integer> componentChanges = new HashMap<>();
- componentChanges.put(B, -2);
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- Pair<Integer, InstanceId>[] expectedComponentInstances = new Pair[] {
- new Pair<>(1, new InstanceId(A, 1, 0)),
- new Pair<>(1, new InstanceId(A, 2, 1)),
- new Pair<>(1, new InstanceId(B, 3, 0)),
- new Pair<>(3, new InstanceId(B, 4, 1)),
- new Pair<>(3, new InstanceId(B, 5, 2)),
- };
-
- doScaleDownTest(initialComponentInstances, componentChanges, expectedComponentInstances);
- }
-
- @Test
- public void testScaleDownTwoComponentsRemoveContainer() throws Exception {
- @SuppressWarnings({"unchecked", "rawtypes"})
- Pair<Integer, InstanceId>[] initialComponentInstances = new Pair[] {
- new Pair<>(1, new InstanceId(A, 1, 0)),
- new Pair<>(1, new InstanceId(A, 2, 1)),
- new Pair<>(1, new InstanceId(B, 3, 0)),
- new Pair<>(1, new InstanceId(B, 4, 1)),
- new Pair<>(3, new InstanceId(A, 5, 2)),
- new Pair<>(3, new InstanceId(A, 6, 3)),
- new Pair<>(3, new InstanceId(B, 7, 2)),
- new Pair<>(3, new InstanceId(B, 8, 3))
- };
-
- Map<String, Integer> componentChanges = new HashMap<>();
- componentChanges.put(A, -2);
- componentChanges.put(B, -2);
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- Pair<Integer, InstanceId>[] expectedComponentInstances = new Pair[] {
- new Pair<>(1, new InstanceId(A, 1, 0)),
- new Pair<>(1, new InstanceId(A, 2, 1)),
- new Pair<>(1, new InstanceId(B, 3, 0)),
- new Pair<>(1, new InstanceId(B, 4, 1)),
- };
-
- doScaleDownTest(initialComponentInstances, componentChanges, expectedComponentInstances);
- }
-
- @Test
- public void testScaleDownHomogenousFirst() throws Exception {
- @SuppressWarnings({"unchecked", "rawtypes"})
- Pair<Integer, InstanceId>[] initialComponentInstances = new Pair[] {
- new Pair<>(1, new InstanceId(A, 1, 0)),
- new Pair<>(1, new InstanceId(A, 2, 1)),
- new Pair<>(1, new InstanceId(B, 3, 0)),
- new Pair<>(3, new InstanceId(B, 4, 1)),
- new Pair<>(3, new InstanceId(B, 5, 2)),
- new Pair<>(3, new InstanceId(B, 6, 3)),
- new Pair<>(3, new InstanceId(B, 7, 4))
- };
-
- Map<String, Integer> componentChanges = new HashMap<>();
- componentChanges.put(B, -4);
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- Pair<Integer, InstanceId>[] expectedComponentInstances = new Pair[] {
- new Pair<>(1, new InstanceId(A, 1, 0)),
- new Pair<>(1, new InstanceId(A, 2, 1)),
- new Pair<>(1, new InstanceId(B, 3, 0))
- };
-
- doScaleDownTest(initialComponentInstances, componentChanges, expectedComponentInstances);
+ int numContainersBeforeRepack = 3;
+ int numContainersAfterRepack = 5;
+ doDefaultScalingTest(componentChanges, numContainersBeforeRepack, numContainersAfterRepack,
+ getDefaultMaxContainerResource());
}
/**
@@ -658,36 +483,32 @@ public class ResourceCompliantRRPackingTest extends CommonPackingTests {
Map<String, Integer> componentChanges = new HashMap<>();
componentChanges.put(SPOUT_NAME, spoutScalingDown); // 0 spouts
componentChanges.put(BOLT_NAME, boltScalingUp); // 9 bolts
- int numContainersBeforeRepack = 2;
- PackingPlan newPackingPlan = doDefaultScalingTest(componentChanges, numContainersBeforeRepack);
-
- Assert.assertEquals(3, newPackingPlan.getContainers().size());
- Assert.assertEquals((Integer) (totalInstances + spoutScalingDown + boltScalingUp),
- newPackingPlan.getInstanceCount());
- AssertPacking.assertNumInstances(newPackingPlan.getContainers(),
- BOLT_NAME, 9);
- AssertPacking.assertNumInstances(newPackingPlan.getContainers(),
- SPOUT_NAME, 0);
+ int numContainersBeforeRepack = 3;
+ int numContainersAfterRepack = 4;
+ doDefaultScalingTest(componentChanges, numContainersBeforeRepack, numContainersAfterRepack,
+ getDefaultMaxContainerResource());
}
@Test(expected = PackingException.class)
public void testScaleDownInvalidScaleFactor() throws Exception {
-
//try to remove more spout instances than possible
int spoutScalingDown = -5;
Map<String, Integer> componentChanges = new HashMap<>();
componentChanges.put(SPOUT_NAME, spoutScalingDown);
- int numContainersBeforeRepack = 2;
- doDefaultScalingTest(componentChanges, numContainersBeforeRepack);
+ int numContainersBeforeRepack = 3;
+ doDefaultScalingTest(componentChanges, numContainersBeforeRepack, numContainersBeforeRepack,
+ getDefaultMaxContainerResource());
}
@Test(expected = PackingException.class)
public void testScaleDownInvalidComponent() throws Exception {
+ //try to remove a component that does not exist
Map<String, Integer> componentChanges = new HashMap<>();
- componentChanges.put("SPOUT_FAKE", -10); //try to remove a component that does not exist
- int numContainersBeforeRepack = 2;
- doDefaultScalingTest(componentChanges, numContainersBeforeRepack);
+ componentChanges.put("SPOUT_FAKE", -10);
+ int numContainersBeforeRepack = 3;
+ doDefaultScalingTest(componentChanges, numContainersBeforeRepack, numContainersBeforeRepack,
+ getDefaultMaxContainerResource());
}
/**
@@ -696,22 +517,15 @@ public class ResourceCompliantRRPackingTest extends CommonPackingTests {
@Test(expected = PackingException.class)
public void testInvalidRamInstance() throws Exception {
ByteAmount maxContainerRam = ByteAmount.fromGigabytes(10);
- int defaultNumInstancesperContainer = 4;
-
- // Explicit set component RAM map
ByteAmount boltRam = ByteAmount.ZERO;
-
- topologyConfig.setContainerMaxRamHint(maxContainerRam);
+ topologyConfig.setContainerRamRequested(maxContainerRam);
topologyConfig.setComponentRam(BOLT_NAME, boltRam);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
- TopologyAPI.Topology topologyExplicitRamMap =
- getTopology(spoutParallelism, boltParallelism, topologyConfig);
- PackingPlan packingPlanExplicitRamMap = pack(topologyExplicitRamMap);
- Assert.assertEquals(totalInstances, packingPlanExplicitRamMap.getInstanceCount());
- AssertPacking.assertNumInstances(packingPlanExplicitRamMap.getContainers(), BOLT_NAME, 3);
- AssertPacking.assertNumInstances(packingPlanExplicitRamMap.getContainers(), SPOUT_NAME, 4);
- AssertPacking.assertContainerRam(packingPlanExplicitRamMap.getContainers(),
- instanceDefaultResources.getRam().multiply(defaultNumInstancesperContainer));
+ doPackingTest(topology,
+ instanceDefaultResources.cloneWithRam(boltRam), boltParallelism,
+ instanceDefaultResources, spoutParallelism,
+ 0, getDefaultMaxContainerResource().cloneWithRam(maxContainerRam));
}
@Test
@@ -729,7 +543,7 @@ public class ResourceCompliantRRPackingTest extends CommonPackingTests {
topologyConfig.setContainerRamRequested(ByteAmount.fromGigabytes(10));
topologyConfig.setContainerDiskRequested(ByteAmount.fromGigabytes(20));
topologyConfig.setContainerCpuRequested(30);
- topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, requestedContainers);
+ topologyConfig.setNumStmgrs(requestedContainers);
TopologyAPI.Topology topologyExplicitResourcesConfig =
getTopology(spoutParallelism, boltParallelism, topologyConfig);
@@ -747,4 +561,93 @@ public class ResourceCompliantRRPackingTest extends CommonPackingTests {
}
}
}
+
+ /**
+ * Test the scenario where scaling down removes instances from containers that are most imbalanced
+ * (i.e., tending towards homogeneity) first. If there is a tie (e.g. AABB, AB), chooses from the
+ * container with the fewest instances, to favor ultimately removing containers. If there is
+ * still a tie, favor removing from higher numbered containers
+ */
+ @Test
+ public void testScaleDownOneComponentRemoveContainer() throws Exception {
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ Pair<Integer, InstanceId>[] initialComponentInstances = new Pair[] {
+ new Pair<>(1, new InstanceId(SPOUT_NAME, 1, 0)),
+ new Pair<>(1, new InstanceId(SPOUT_NAME, 2, 1)),
+ new Pair<>(1, new InstanceId(BOLT_NAME, 3, 0)),
+ new Pair<>(3, new InstanceId(BOLT_NAME, 4, 1)),
+ new Pair<>(3, new InstanceId(BOLT_NAME, 5, 2)),
+ new Pair<>(4, new InstanceId(BOLT_NAME, 6, 3)),
+ new Pair<>(4, new InstanceId(BOLT_NAME, 7, 4))
+ };
+
+ Map<String, Integer> componentChanges = new HashMap<>();
+ componentChanges.put(BOLT_NAME, -2);
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ Pair<Integer, InstanceId>[] expectedComponentInstances = new Pair[] {
+ new Pair<>(1, new InstanceId(SPOUT_NAME, 1, 0)),
+ new Pair<>(1, new InstanceId(SPOUT_NAME, 2, 1)),
+ new Pair<>(1, new InstanceId(BOLT_NAME, 3, 0)),
+ new Pair<>(3, new InstanceId(BOLT_NAME, 4, 1)),
+ new Pair<>(3, new InstanceId(BOLT_NAME, 5, 2)),
+ };
+
+ doScaleDownTest(initialComponentInstances, componentChanges, expectedComponentInstances);
+ }
+
+ @Test
+ public void testScaleDownTwoComponentsRemoveContainer() throws Exception {
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ Pair<Integer, InstanceId>[] initialComponentInstances = new Pair[] {
+ new Pair<>(1, new InstanceId(SPOUT_NAME, 1, 0)),
+ new Pair<>(1, new InstanceId(SPOUT_NAME, 2, 1)),
+ new Pair<>(1, new InstanceId(BOLT_NAME, 3, 0)),
+ new Pair<>(1, new InstanceId(BOLT_NAME, 4, 1)),
+ new Pair<>(3, new InstanceId(SPOUT_NAME, 5, 2)),
+ new Pair<>(3, new InstanceId(SPOUT_NAME, 6, 3)),
+ new Pair<>(3, new InstanceId(BOLT_NAME, 7, 2)),
+ new Pair<>(3, new InstanceId(BOLT_NAME, 8, 3))
+ };
+
+ Map<String, Integer> componentChanges = new HashMap<>();
+ componentChanges.put(SPOUT_NAME, -2);
+ componentChanges.put(BOLT_NAME, -2);
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ Pair<Integer, InstanceId>[] expectedComponentInstances = new Pair[] {
+ new Pair<>(1, new InstanceId(SPOUT_NAME, 1, 0)),
+ new Pair<>(1, new InstanceId(SPOUT_NAME, 2, 1)),
+ new Pair<>(1, new InstanceId(BOLT_NAME, 3, 0)),
+ new Pair<>(1, new InstanceId(BOLT_NAME, 4, 1)),
+ };
+
+ doScaleDownTest(initialComponentInstances, componentChanges, expectedComponentInstances);
+ }
+
+ @Test
+ public void testScaleDownHomogenousFirst() throws Exception {
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ Pair<Integer, InstanceId>[] initialComponentInstances = new Pair[] {
+ new Pair<>(1, new InstanceId(SPOUT_NAME, 1, 0)),
+ new Pair<>(1, new InstanceId(SPOUT_NAME, 2, 1)),
+ new Pair<>(1, new InstanceId(BOLT_NAME, 3, 0)),
+ new Pair<>(3, new InstanceId(BOLT_NAME, 4, 1)),
+ new Pair<>(3, new InstanceId(BOLT_NAME, 5, 2)),
+ new Pair<>(3, new InstanceId(BOLT_NAME, 6, 3)),
+ new Pair<>(3, new InstanceId(BOLT_NAME, 7, 4))
+ };
+
+ Map<String, Integer> componentChanges = new HashMap<>();
+ componentChanges.put(BOLT_NAME, -4);
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ Pair<Integer, InstanceId>[] expectedComponentInstances = new Pair[] {
+ new Pair<>(1, new InstanceId(SPOUT_NAME, 1, 0)),
+ new Pair<>(1, new InstanceId(SPOUT_NAME, 2, 1)),
+ new Pair<>(1, new InstanceId(BOLT_NAME, 3, 0))
+ };
+
+ doScaleDownTest(initialComponentInstances, componentChanges, expectedComponentInstances);
+ }
}
diff --git a/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java b/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
index e56b79d..854eef6 100644
--- a/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
+++ b/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
@@ -133,7 +133,7 @@ public class RoundRobinPackingTest extends CommonPackingTests {
Assert.assertEquals(1, differentResources.size());
int instancesCount = containerPlan.getInstances().size();
Assert.assertEquals(containerRam
- .minus(RoundRobinPacking.DEFAULT_RAM_PADDING_PER_CONTAINER).divide(instancesCount),
+ .minus(RoundRobinPacking.DEFAULT_RAM_PADDING_PER_CONTAINER).divide(instancesCount),
differentResources.iterator().next().getRam());
Assert.assertEquals(
diff --git a/heron/scheduler-core/tests/java/org/apache/heron/scheduler/RuntimeManagerMainTest.java b/heron/scheduler-core/tests/java/org/apache/heron/scheduler/RuntimeManagerMainTest.java
index e032ffa..086e4fc 100644
--- a/heron/scheduler-core/tests/java/org/apache/heron/scheduler/RuntimeManagerMainTest.java
+++ b/heron/scheduler-core/tests/java/org/apache/heron/scheduler/RuntimeManagerMainTest.java
@@ -31,7 +31,6 @@ import org.powermock.modules.junit4.PowerMockRunner;
import org.apache.heron.api.generated.TopologyAPI;
import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.packing.roundrobin.ResourceCompliantRRPacking;
-import org.apache.heron.packing.roundrobin.RoundRobinPacking;
import org.apache.heron.proto.system.ExecutionEnvironment;
import org.apache.heron.proto.system.PackingPlans;
import org.apache.heron.scheduler.client.ISchedulerClient;
@@ -307,7 +306,7 @@ public class RuntimeManagerMainTest {
SchedulerStateManagerAdaptor manager = mock(SchedulerStateManagerAdaptor.class);
PowerMockito.when(Runtime.schedulerStateManagerAdaptor(any(Config.class))).thenReturn(manager);
- RoundRobinPacking packing = new RoundRobinPacking();
+ ResourceCompliantRRPacking packing = new ResourceCompliantRRPacking();
PackingPlans.PackingPlan currentPlan =
PackingTestUtils.testProtoPackingPlan(TOPOLOGY_NAME, packing);
diff --git a/heron/spi/src/java/org/apache/heron/spi/packing/Resource.java b/heron/spi/src/java/org/apache/heron/spi/packing/Resource.java
index e1b8c1e..b30b2b0 100644
--- a/heron/spi/src/java/org/apache/heron/spi/packing/Resource.java
+++ b/heron/spi/src/java/org/apache/heron/spi/packing/Resource.java
@@ -29,6 +29,9 @@ public class Resource {
private ByteAmount ram;
private ByteAmount disk;
+ public static final Resource EMPTY_RESOURCE
+ = new Resource(0.0, ByteAmount.ZERO, ByteAmount.ZERO);
+
public Resource(double cpu, ByteAmount ram, ByteAmount disk) {
this.cpu = cpu;
this.ram = ram;