You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by ni...@apache.org on 2023/01/09 03:19:11 UTC
[incubator-heron] branch master updated: Prototype RoundRobinPackingV2 with better default container size determination (#3227)
This is an automated email from the ASF dual-hosted git repository.
nicknezis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new 384d1774e03 Prototype RoundRobinPackingV2 with better default container size determination (#3227)
384d1774e03 is described below
commit 384d1774e033af30e0d5b0d2d32fd614529ddc9e
Author: Xiaoyao Qian <xq...@twitter.com>
AuthorDate: Sun Jan 8 19:19:06 2023 -0800
Prototype RoundRobinPackingV2 with better default container size determination (#3227)
* Revamp RoundRobinPacking default container size determination
---
.../packing/roundrobin/RoundRobinPackingV2.java | 496 ++++++++++++++++++++
heron/packing/tests/java/BUILD | 9 +
.../roundrobin/RoundRobinPackingV2Test.java | 508 +++++++++++++++++++++
3 files changed, 1013 insertions(+)
diff --git a/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPackingV2.java b/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPackingV2.java
new file mode 100644
index 00000000000..21e183c5a86
--- /dev/null
+++ b/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPackingV2.java
@@ -0,0 +1,496 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.packing.roundrobin;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.heron.api.generated.TopologyAPI;
+import org.apache.heron.api.utils.TopologyUtils;
+import org.apache.heron.common.basics.ByteAmount;
+import org.apache.heron.common.basics.CPUShare;
+import org.apache.heron.common.basics.ResourceMeasure;
+import org.apache.heron.packing.builder.ResourceRequirement;
+import org.apache.heron.packing.builder.SortingStrategy;
+import org.apache.heron.spi.common.Config;
+import org.apache.heron.spi.common.Context;
+import org.apache.heron.spi.packing.IPacking;
+import org.apache.heron.spi.packing.IRepacking;
+import org.apache.heron.spi.packing.InstanceId;
+import org.apache.heron.spi.packing.PackingException;
+import org.apache.heron.spi.packing.PackingPlan;
+import org.apache.heron.spi.packing.Resource;
+
+import static org.apache.heron.api.Config.TOPOLOGY_CONTAINER_CPU_REQUESTED;
+import static org.apache.heron.api.Config.TOPOLOGY_CONTAINER_DISK_REQUESTED;
+import static org.apache.heron.api.Config.TOPOLOGY_CONTAINER_RAM_REQUESTED;
+
+/**
+ * Round-robin packing algorithm
+ * <p>
+ * This IPacking implementation generates PackingPlan: instances of the component are assigned
+ * to each container one by one in circular order, without any priority. Each container is expected
+ * to take equal number of instances if # of instances is multiple of # of containers.
+ * <p>
+ * Following semantics are guaranteed:
+ * 1. Every container requires same size of resource, i.e. same CPU, RAM and disk.
+ * Consider that instances in different containers can be different, the value of size
+ * will be aligned to the max one.
+ * <p>
+ * 2. The size of resource required by the whole topology is equal to
+ * ((# of container specified in config) + 1) * (size of resource required for a single container).
+ * The extra 1 is considered for Heron internal container,
+ * i.e. the one containing Scheduler and TMaster.
+ * <p>
+ * 3. The disk required for a container is calculated as:
+ * value for org.apache.heron.api.Config.TOPOLOGY_CONTAINER_DISK_REQUESTED if exists, otherwise,
+ * (disk for instances in container) + (disk padding for heron internal process)
+ * <p>
+ * 4. The CPU required for a container is calculated as:
+ * value for org.apache.heron.api.Config.TOPOLOGY_CONTAINER_CPU_REQUESTED if exists, otherwise,
+ * (CPU for instances in container) + (CPU padding for heron internal process)
+ * <p>
+ * 5. The RAM required for a container is calculated as:
+ * value for org.apache.heron.api.Config.TOPOLOGY_CONTAINER_RAM_REQUESTED if exists, otherwise,
+ * (RAM for instances in container) + (RAM padding for heron internal process)
+ * <p>
+ * 6. The RAM required for one instance is calculated as:
+ * value in org.apache.heron.api.Config.TOPOLOGY_COMPONENT_RAMMAP if exists, otherwise,
+ * - if org.apache.heron.api.Config.TOPOLOGY_CONTAINER_RAM_REQUESTED not exists:
+ * the default RAM value for one instance
+ * - if org.apache.heron.api.Config.TOPOLOGY_CONTAINER_RAM_REQUESTED exists:
+ * ((TOPOLOGY_CONTAINER_RAM_REQUESTED) - (RAM padding for heron internal process)
+ * - (RAM used by instances within TOPOLOGY_COMPONENT_RAMMAP config))) /
+ * (the # of instances in container not specified in TOPOLOGY_COMPONENT_RAMMAP config)
+ * 7. The pack() return null if PackingPlan fails to pass the safe check, for instance,
+ * the size of RAM for an instance is less than the minimal required value.
+ */
+public class RoundRobinPackingV2 implements IPacking, IRepacking {
+ private static final Logger LOG = Logger.getLogger(RoundRobinPackingV2.class.getName());
+
+ @VisibleForTesting
+ static final ByteAmount DEFAULT_RAM_PADDING_PER_CONTAINER = ByteAmount.fromGigabytes(2);
+ @VisibleForTesting
+ static final double DEFAULT_CPU_PADDING_PER_CONTAINER = 1.0;
+ private static final ByteAmount DEFAULT_DISK_PADDING_PER_CONTAINER = ByteAmount.fromGigabytes(12);
+
+ @VisibleForTesting
+ static final ByteAmount DEFAULT_DAEMON_PROCESS_RAM_PADDING = ByteAmount.fromGigabytes(1);
+ private static final ByteAmount MIN_RAM_PER_INSTANCE = ByteAmount.fromMegabytes(192);
+
+ // Use as a stub as default number value when getting config value
+ private static final ByteAmount NOT_SPECIFIED_BYTE_AMOUNT = ByteAmount.fromBytes(-1);
+ private static final double NOT_SPECIFIED_CPU_SHARE = -1.0;
+
+ private static final String RAM = "RAM";
+ private static final String CPU = "CPU";
+ private static final String DISK = "DISK";
+
+ private TopologyAPI.Topology topology;
+
+ private ByteAmount instanceRamDefault;
+ private double instanceCpuDefault;
+ private ByteAmount instanceDiskDefault;
+ private ByteAmount containerRamPadding = DEFAULT_RAM_PADDING_PER_CONTAINER;
+ private double containerCpuPadding = DEFAULT_CPU_PADDING_PER_CONTAINER;
+
+ @Override
+ public void initialize(Config config, TopologyAPI.Topology inputTopology) {
+ this.topology = inputTopology;
+ this.instanceCpuDefault = Context.instanceCpu(config);
+ this.instanceRamDefault = Context.instanceRam(config);
+ this.instanceDiskDefault = Context.instanceDisk(config);
+ this.containerRamPadding = getContainerRamPadding(topology.getTopologyConfig().getKvsList());
+ LOG.info(String.format("Initalizing RoundRobinPackingV2. "
+ + "CPU default: %f, RAM default: %s, DISK default: %s, RAM padding: %s.",
+ this.instanceCpuDefault,
+ this.instanceRamDefault.toString(),
+ this.instanceDiskDefault.toString(),
+ this.containerRamPadding.toString()));
+ }
+
+ @Override
+ public PackingPlan pack() {
+ int numContainer = TopologyUtils.getNumContainers(topology);
+ Map<String, Integer> parallelismMap = TopologyUtils.getComponentParallelism(topology);
+
+ return packInternal(numContainer, parallelismMap);
+ }
+
+ private PackingPlan packInternal(int numContainer, Map<String, Integer> parallelismMap) {
+ // Get the instances' round-robin allocation
+ Map<Integer, List<InstanceId>> roundRobinAllocation =
+ getRoundRobinAllocation(numContainer, parallelismMap);
+
+ Resource containerResourceHint = getContainerResourceHint(roundRobinAllocation);
+
+ // Get the RAM map for every instance
+ Map<Integer, Map<InstanceId, ByteAmount>> instancesRamMap =
+ calculateInstancesResourceMapInContainer(
+ roundRobinAllocation,
+ TopologyUtils.getComponentRamMapConfig(topology),
+ containerResourceHint.getRam(),
+ instanceRamDefault,
+ containerRamPadding,
+ ByteAmount.ZERO,
+ NOT_SPECIFIED_BYTE_AMOUNT,
+ RAM);
+
+ // Get the CPU map for every instance
+ Map<Integer, Map<InstanceId, CPUShare>> instancesCpuMap =
+ calculateInstancesResourceMapInContainer(
+ roundRobinAllocation,
+ CPUShare.convertDoubleMapToCpuShareMap(TopologyUtils.getComponentCpuMapConfig(topology)),
+ CPUShare.fromDouble(containerResourceHint.getCpu()),
+ CPUShare.fromDouble(instanceCpuDefault),
+ CPUShare.fromDouble(containerCpuPadding),
+ CPUShare.fromDouble(0.0),
+ CPUShare.fromDouble(NOT_SPECIFIED_CPU_SHARE),
+ CPU);
+
+ LOG.info(String.format("Pack internal: container CPU hint: %.3f, RAM hint: %s, disk hint: %s.",
+ containerResourceHint.getCpu(),
+ containerResourceHint.getRam().toString(),
+ containerResourceHint.getDisk().toString()));
+
+ // Construct the PackingPlan
+ Set<PackingPlan.ContainerPlan> containerPlans = new HashSet<>();
+ for (int containerId : roundRobinAllocation.keySet()) {
+ List<InstanceId> instanceList = roundRobinAllocation.get(containerId);
+
+ // Calculate the resource required for single instance
+ Map<InstanceId, PackingPlan.InstancePlan> instancePlanMap = new HashMap<>();
+ ByteAmount containerRam = ByteAmount.ZERO;
+ double containerCpu = 0.0;
+
+ for (InstanceId instanceId : instanceList) {
+ ByteAmount instanceRam = instancesRamMap.get(containerId).get(instanceId);
+ Double instanceCpu = instancesCpuMap.get(containerId).get(instanceId).getValue();
+
+ // Currently not yet support disk config for different components, just use the default.
+ ByteAmount instanceDisk = instanceDiskDefault;
+
+ Resource resource = new Resource(instanceCpu, instanceRam, instanceDisk);
+
+ // Insert it into the map
+ instancePlanMap.put(instanceId, new PackingPlan.InstancePlan(instanceId, resource));
+ containerRam = containerRam.plus(instanceRam);
+ containerCpu += instanceCpu;
+ }
+
+ // finalize container resource
+ if (!containerResourceHint.getRam().equals(NOT_SPECIFIED_BYTE_AMOUNT)) {
+ containerRam = ByteAmount.fromBytes(
+ Math.min(containerRam.plus(containerRamPadding).asBytes(),
+ containerResourceHint.getRam().asBytes()));
+ } else {
+ containerRam = containerRam.plus(containerRamPadding);
+ }
+
+ if (containerResourceHint.getCpu() != NOT_SPECIFIED_CPU_SHARE) {
+ containerCpu = Math.min(containerCpu + containerCpuPadding, containerResourceHint.getCpu());
+ } else {
+ containerCpu += containerCpuPadding;
+ }
+
+ Resource resource = new Resource(Math.max(containerCpu, containerResourceHint.getCpu()),
+ containerRam, containerResourceHint.getDisk());
+ PackingPlan.ContainerPlan containerPlan = new PackingPlan.ContainerPlan(
+ containerId, new HashSet<>(instancePlanMap.values()), resource);
+
+ containerPlans.add(containerPlan);
+
+ LOG.info(String.format("Pack internal finalized: container#%d CPU: %f, RAM: %s, disk: %s.",
+ containerId,
+ resource.getCpu(),
+ resource.getRam().toString(),
+ resource.getDisk().toString()));
+ }
+
+ PackingPlan plan = new PackingPlan(topology.getId(), containerPlans);
+
+ validatePackingPlan(plan);
+ return plan;
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ private ByteAmount getContainerRamPadding(List<TopologyAPI.Config.KeyValue> topologyConfig) {
+ ByteAmount stmgrRam = TopologyUtils.getConfigWithDefault(topologyConfig,
+ org.apache.heron.api.Config.TOPOLOGY_STMGR_RAM,
+ DEFAULT_DAEMON_PROCESS_RAM_PADDING);
+ ByteAmount metricsmgrRam = TopologyUtils.getConfigWithDefault(topologyConfig,
+ org.apache.heron.api.Config.TOPOLOGY_METRICSMGR_RAM,
+ DEFAULT_DAEMON_PROCESS_RAM_PADDING);
+ String reliabilityMode = TopologyUtils.getConfigWithDefault(topologyConfig,
+ org.apache.heron.api.Config.TOPOLOGY_RELIABILITY_MODE,
+ org.apache.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE.name());
+ boolean isStateful =
+ org.apache.heron.api.Config.TopologyReliabilityMode
+ .EFFECTIVELY_ONCE.name().equals(reliabilityMode);
+ ByteAmount ckptmgrRam = TopologyUtils.getConfigWithDefault(topologyConfig,
+ org.apache.heron.api.Config.TOPOLOGY_STATEFUL_CKPTMGR_RAM,
+ isStateful ? DEFAULT_DAEMON_PROCESS_RAM_PADDING : ByteAmount.ZERO);
+
+ ByteAmount daemonProcessPadding = stmgrRam.plus(metricsmgrRam).plus(ckptmgrRam);
+
+ // return the container padding if it's set, otherwise return the total daemon request ram
+ return TopologyUtils.getConfigWithDefault(topologyConfig,
+ org.apache.heron.api.Config.TOPOLOGY_CONTAINER_RAM_PADDING,
+ daemonProcessPadding);
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T extends ResourceMeasure> Map<Integer, Map<InstanceId, T>>
+ calculateInstancesResourceMapInContainer(
+ Map<Integer, List<InstanceId>> allocation,
+ Map<String, T> resMap,
+ T containerResHint,
+ T instanceResDefault,
+ T containerResPadding,
+ T zero,
+ T notSpecified,
+ String resourceType) {
+ Map<Integer, Map<InstanceId, T>> instancesResMapInContainer = new HashMap<>();
+
+ for (int containerId : allocation.keySet()) {
+ List<InstanceId> instanceIds = allocation.get(containerId);
+ Map<InstanceId, T> resInsideContainer = new HashMap<>();
+ instancesResMapInContainer.put(containerId, resInsideContainer);
+
+ // Register the instance resource allocation and calculate the used resource so far
+ T usedRes = zero;
+ for (InstanceId instanceId : instanceIds) {
+ String componentName = instanceId.getComponentName();
+ if (resMap.containsKey(componentName)) {
+ T res = resMap.get(componentName);
+ resInsideContainer.put(instanceId, res);
+ usedRes = (T) usedRes.plus(res);
+ } else {
+ resInsideContainer.put(instanceId, instanceResDefault);
+ usedRes = (T) usedRes.plus(instanceResDefault);
+ }
+ }
+
+ // Soft padding constraint validation: warn if padding amount cannot be accommodated
+ if (!containerResHint.equals(notSpecified)
+ && usedRes.greaterThan(containerResHint.minus(containerResPadding))) {
+ // Validate instance resources specified so far don't violate container-level constraint
+ if (usedRes.greaterThan(containerResHint)) {
+ throw new PackingException(String.format("Invalid packing plan generated. "
+ + "Total instance %s (%s) in container#%d have exceeded "
+ + "the container-level constraint of %s.",
+ resourceType, usedRes.toString(), containerId, containerResHint.toString()));
+ }
+
+ LOG.warning(String.format("Container#%d (max %s: %s) is now hosting instances that "
+ + "take up to %s %s. The container may not have enough resource to accommodate "
+ + "internal processes which take up to %s %s.",
+ containerId, resourceType, containerResHint.toString(),
+ usedRes.toString(), resourceType,
+ containerResPadding.toString(), resourceType));
+ }
+ }
+
+ return instancesResMapInContainer;
+ }
+
+ /**
+ * Get the instances' allocation basing on round robin algorithm
+ *
+ * @return containerId -> list of InstanceId belonging to this container
+ */
+ private Map<Integer, List<InstanceId>> getRoundRobinAllocation(
+ int numContainer, Map<String, Integer> parallelismMap) {
+ Map<Integer, List<InstanceId>> allocation = new HashMap<>();
+ int totalInstance = TopologyUtils.getTotalInstance(parallelismMap);
+ if (numContainer < 1) {
+ throw new RuntimeException(String.format("Invlaid number of container: %d", numContainer));
+ } else if (numContainer > totalInstance) {
+ throw new RuntimeException(
+ String.format("More containers (%d) allocated than instances (%d).",
+ numContainer, totalInstance));
+ }
+
+ for (int i = 1; i <= numContainer; ++i) {
+ allocation.put(i, new ArrayList<>());
+ }
+
+ int index = 1;
+ int globalTaskIndex = 1;
+
+ // To ensure we spread out the big instances first
+ // Only sorting by RAM here because only RAM can be explicitly capped by JVM processes
+ List<String> sortedInstances = getSortedRAMComponents(parallelismMap.keySet()).stream()
+ .map(ResourceRequirement::getComponentName).collect(Collectors.toList());
+ for (String component : sortedInstances) {
+ int numInstance = parallelismMap.get(component);
+ for (int i = 0; i < numInstance; ++i) {
+ allocation.get(index).add(new InstanceId(component, globalTaskIndex, i));
+ index = (index == numContainer) ? 1 : index + 1;
+ globalTaskIndex++;
+ }
+ }
+ return allocation;
+ }
+
+
+ /**
+ * Sort the components in decreasing order based on their RAM requirements
+ *
+ * @return The sorted list of components and their RAM requirements
+ */
+ private ArrayList<ResourceRequirement> getSortedRAMComponents(Set<String> componentNames) {
+ ArrayList<ResourceRequirement> resourceRequirements = new ArrayList<>();
+ Map<String, ByteAmount> ramMap = TopologyUtils.getComponentRamMapConfig(topology);
+
+ for (String componentName : componentNames) {
+ resourceRequirements.add(new ResourceRequirement(componentName,
+ ramMap.getOrDefault(componentName, ByteAmount.ZERO)));
+ }
+ Collections.sort(resourceRequirements, SortingStrategy.RAM_FIRST.reversed());
+ return resourceRequirements;
+ }
+
+ /**
+ * Get # of instances in the largest container
+ *
+ * @param allocation the instances' allocation
+ * @return # of instances in the largest container
+ */
+ private int getLargestContainerSize(Map<Integer, List<InstanceId>> allocation) {
+ int max = 0;
+ for (List<InstanceId> instances : allocation.values()) {
+ if (instances.size() > max) {
+ max = instances.size();
+ }
+ }
+ return max;
+ }
+
+ private Resource getContainerResourceHint(Map<Integer, List<InstanceId>> allocation) {
+ List<TopologyAPI.Config.KeyValue> topologyConfig = topology.getTopologyConfig().getKvsList();
+ int largestContainerSize = getLargestContainerSize(allocation);
+
+ return new Resource(
+ TopologyUtils.getConfigWithDefault(topologyConfig, TOPOLOGY_CONTAINER_CPU_REQUESTED,
+ NOT_SPECIFIED_CPU_SHARE),
+ TopologyUtils.getConfigWithDefault(topologyConfig, TOPOLOGY_CONTAINER_RAM_REQUESTED,
+ NOT_SPECIFIED_BYTE_AMOUNT),
+ TopologyUtils.getConfigWithDefault(topologyConfig, TOPOLOGY_CONTAINER_DISK_REQUESTED,
+ instanceDiskDefault.multiply(largestContainerSize)
+ .plus(DEFAULT_DISK_PADDING_PER_CONTAINER)));
+ }
+
+ /**
+ * Check whether the PackingPlan generated is valid
+ *
+ * @param plan The PackingPlan to check
+ * @throws PackingException if it's not a valid plan
+ */
+ private void validatePackingPlan(PackingPlan plan) throws PackingException {
+ for (PackingPlan.ContainerPlan containerPlan : plan.getContainers()) {
+ for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
+ // Safe check
+ if (instancePlan.getResource().getRam().lessThan(MIN_RAM_PER_INSTANCE)) {
+ throw new PackingException(String.format("Invalid packing plan generated. A minimum of "
+ + "%s RAM is required, but InstancePlan for component '%s' has %s",
+ MIN_RAM_PER_INSTANCE, instancePlan.getComponentName(),
+ instancePlan.getResource().getRam()));
+ }
+ }
+ }
+ }
+
+ /**
+ * Read the current packing plan with update parallelism to calculate a new packing plan.
+ * This method should determine a new number of containers based on the updated parallism
+ * while remaining the number of instances per container <= that of the old packing plan.
+ * The packing algorithm packInternal() is shared with pack()
+ * delegate to packInternal() with the new container count and component parallelism
+ *
+ * @param currentPackingPlan Existing packing plan
+ * @param componentChanges Map < componentName, new component parallelism >
+ * that contains the parallelism for each component whose parallelism has changed.
+ * @return new packing plan
+ * @throws PackingException
+ */
+ @Override
+ public PackingPlan repack(PackingPlan currentPackingPlan, Map<String, Integer> componentChanges)
+ throws PackingException {
+ int initialNumContainer = TopologyUtils.getNumContainers(topology);
+ int initialNumInstance = TopologyUtils.getTotalInstance(topology);
+ double initialNumInstancePerContainer = (double) initialNumInstance / initialNumContainer;
+
+ Map<String, Integer> newComponentParallelism =
+ getNewComponentParallelism(currentPackingPlan, componentChanges);
+
+ int newNumInstance = TopologyUtils.getTotalInstance(newComponentParallelism);
+ int newNumContainer = (int) Math.ceil(newNumInstance / initialNumInstancePerContainer);
+ return packInternal(newNumContainer, newComponentParallelism);
+ }
+
+ public Map<String, Integer> getNewComponentParallelism(PackingPlan currentPackingPlan,
+ Map<String, Integer> componentChanges) {
+ Map<String, Integer> currentComponentParallelism = currentPackingPlan.getComponentCounts();
+ for (Map.Entry<String, Integer> e : componentChanges.entrySet()) {
+ Integer newParallelism = currentComponentParallelism.get(e.getKey()) + e.getValue();
+ currentComponentParallelism.put(e.getKey(), newParallelism);
+ }
+ return currentComponentParallelism;
+ }
+
+ /**
+ * Read the current packing plan with update parallelism and number of containers
+ * to calculate a new packing plan.
+ * The packing algorithm packInternal() is shared with pack()
+ * delegate to packInternal() with the new container count and component parallelism
+ *
+ * @param currentPackingPlan Existing packing plan
+ * @param containers < the new number of containers for the topology
+ * specified by the user
+ * @param componentChanges Map < componentName, new component parallelism >
+ * that contains the parallelism for each component whose parallelism has changed.
+ * @return new packing plan
+ * @throws PackingException
+ */
+ @Override
+ public PackingPlan repack(PackingPlan currentPackingPlan, int containers, Map<String, Integer>
+ componentChanges) throws PackingException {
+ if (containers == currentPackingPlan.getContainers().size()) {
+ return repack(currentPackingPlan, componentChanges);
+ }
+ Map<String, Integer> newComponentParallelism = getNewComponentParallelism(currentPackingPlan,
+ componentChanges);
+ return packInternal(containers, newComponentParallelism);
+ }
+}
diff --git a/heron/packing/tests/java/BUILD b/heron/packing/tests/java/BUILD
index 7ce0107e8c5..b870e04e4ab 100644
--- a/heron/packing/tests/java/BUILD
+++ b/heron/packing/tests/java/BUILD
@@ -69,6 +69,15 @@ java_test(
deps = roundrobin_deps_files,
)
+java_test(
+ name = "RoundRobinPackingV2Test",
+ srcs = glob(
+ ["**/roundrobin/RoundRobinPackingV2Test.java"]
+ ),
+ deps = roundrobin_deps_files,
+ size = "small",
+)
+
java_test(
name = "ResourceCompliantRRPackingTest",
size = "small",
diff --git a/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingV2Test.java b/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingV2Test.java
new file mode 100644
index 00000000000..274c47ca705
--- /dev/null
+++ b/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingV2Test.java
@@ -0,0 +1,508 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.packing.roundrobin;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.heron.common.basics.ByteAmount;
+import org.apache.heron.packing.CommonPackingTests;
+import org.apache.heron.spi.packing.IPacking;
+import org.apache.heron.spi.packing.IRepacking;
+import org.apache.heron.spi.packing.PackingException;
+import org.apache.heron.spi.packing.PackingPlan;
+import org.apache.heron.spi.packing.Resource;
+
+public class RoundRobinPackingV2Test extends CommonPackingTests {
+ @Override
+ protected IPacking getPackingImpl() {
+ return new RoundRobinPackingV2();
+ }
+
+ @Override
+ protected IRepacking getRepackingImpl() {
+ return new RoundRobinPackingV2();
+ }
+
+ private Resource getDefaultPadding() {
+ return new Resource(RoundRobinPackingV2.DEFAULT_CPU_PADDING_PER_CONTAINER,
+ RoundRobinPackingV2.DEFAULT_RAM_PADDING_PER_CONTAINER, ByteAmount.ZERO);
+ }
+
+ @Before
+ public void setUp() {
+ super.setUp();
+ numContainers = 2;
+ topologyConfig.setNumStmgrs(numContainers);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+ }
+
+ @Test(expected = PackingException.class)
+ public void testCheckInsufficientRamFailure() throws Exception {
+ // Explicit set insufficient RAM for container
+ ByteAmount containerRam = ByteAmount.ZERO;
+ topologyConfig.setContainerRamRequested(containerRam);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+ doPackingTest(topology, instanceDefaultResources, boltParallelism,
+ instanceDefaultResources, spoutParallelism,
+ numContainers,
+ getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+ numContainers, getDefaultPadding()).cloneWithRam(containerRam));
+ }
+
+ @Test(expected = PackingException.class)
+ public void testCheckInsufficientCpuFailure() throws Exception {
+ // Explicit set insufficient CPU for container
+ double containerCpu = 1.0;
+ topologyConfig.setContainerCpuRequested(containerCpu);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+ doPackingTest(topology, instanceDefaultResources, boltParallelism,
+ instanceDefaultResources, spoutParallelism,
+ numContainers,
+ getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+ numContainers, getDefaultPadding()).cloneWithCpu(containerCpu));
+ }
+
+ @Test
+ public void testDefaultResources() throws Exception {
+ doPackingTest(topology,
+ instanceDefaultResources, boltParallelism,
+ instanceDefaultResources, spoutParallelism,
+ numContainers, getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+ numContainers, getDefaultPadding()));
+ }
+
+ /**
+ * Test the scenario container level resource config are set
+ */
+ @Test
+ public void testContainerRequestedResources() throws Exception {
+ // Explicit set resources for container
+ ByteAmount containerRam = ByteAmount.fromGigabytes(10);
+ ByteAmount containerDisk = ByteAmount.fromGigabytes(20);
+ double containerCpu = 30;
+ Resource containerResource = new Resource(containerCpu, containerRam, containerDisk);
+
+ topologyConfig.setContainerRamRequested(containerRam);
+ topologyConfig.setContainerDiskRequested(containerDisk);
+ topologyConfig.setContainerCpuRequested(containerCpu);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+ doPackingTest(topology,
+ instanceDefaultResources, boltParallelism,
+ instanceDefaultResources, spoutParallelism,
+ numContainers, containerResource);
+ }
+
+ /**
+ * Test the scenario container level resource config are set
+ */
+ @Test
+ public void testContainerRequestedResourcesWhenRamPaddingSet() throws Exception {
+ // Explicit set resources for container
+ ByteAmount containerRam = ByteAmount.fromGigabytes(10);
+ ByteAmount containerDisk = ByteAmount.fromGigabytes(20);
+ double containerCpu = 30;
+ ByteAmount containerRamPadding = ByteAmount.fromMegabytes(512);
+ Resource containerResource = new Resource(containerCpu, containerRam, containerDisk);
+
+ topologyConfig.setContainerRamRequested(containerRam);
+ topologyConfig.setContainerDiskRequested(containerDisk);
+ topologyConfig.setContainerCpuRequested(containerCpu);
+ topologyConfig.setContainerRamPadding(containerRamPadding);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+ doPackingTest(topology,
+ instanceDefaultResources, boltParallelism,
+ instanceDefaultResources, spoutParallelism,
+ numContainers, containerResource);
+ }
+
+ /**
+ * Test the scenario RAM map config is completely set
+ */
+ @Test
+ public void testCompleteRamMapRequestedWithExactlyEnoughResource() throws Exception {
+ // Explicit set resources for container
+ // the value should be ignored, since we set the complete component RAM map
+ ByteAmount containerRam = ByteAmount.fromGigabytes(8);
+
+ // Explicit set component RAM map
+ ByteAmount boltRam = ByteAmount.fromGigabytes(1);
+ ByteAmount spoutRam = ByteAmount.fromGigabytes(2);
+
+ topologyConfig.setContainerRamRequested(containerRam);
+ topologyConfig.setComponentRam(BOLT_NAME, boltRam);
+ topologyConfig.setComponentRam(SPOUT_NAME, spoutRam);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+ doPackingTest(topology,
+ instanceDefaultResources.cloneWithRam(boltRam), boltParallelism,
+ instanceDefaultResources.cloneWithRam(spoutRam), spoutParallelism,
+ numContainers, getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+ numContainers, getDefaultPadding()).cloneWithRam(containerRam));
+ }
+
+ /**
+ * Test the scenario RAM map config is completely set
+ */
+ @Test(expected = PackingException.class)
+ public void testCompleteRamMapRequestedWithLessThanEnoughResource() throws Exception {
+ // Explicit set resources for container
+ // the value should be ignored, since we set the complete component RAM map
+ ByteAmount containerRam = ByteAmount.fromGigabytes(2);
+
+ // Explicit set component RAM map
+ ByteAmount boltRam = ByteAmount.fromGigabytes(1);
+ ByteAmount spoutRam = ByteAmount.fromGigabytes(2);
+
+ topologyConfig.setContainerRamRequested(containerRam);
+ topologyConfig.setComponentRam(BOLT_NAME, boltRam);
+ topologyConfig.setComponentRam(SPOUT_NAME, spoutRam);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+ doPackingTest(topology,
+ instanceDefaultResources.cloneWithRam(boltRam), boltParallelism,
+ instanceDefaultResources.cloneWithRam(spoutRam), spoutParallelism,
+ numContainers, getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+ numContainers, getDefaultPadding()).cloneWithRam(containerRam));
+ }
+
+ /**
+ * Test the scenario RAM map config is completely set
+ */
+ @Test
+ public void testCompleteRamMapRequestedWithMoreThanEnoughResource() throws Exception {
+ // Explicit set resources for container
+ // the value should be ignored, since we set the complete component RAM map
+ ByteAmount containerRam = ByteAmount.fromBytes(Long.MAX_VALUE);
+
+ // Explicit set component RAM map
+ ByteAmount boltRam = ByteAmount.fromGigabytes(1);
+ ByteAmount spoutRam = ByteAmount.fromGigabytes(2);
+
+ topologyConfig.setContainerRamRequested(containerRam);
+ topologyConfig.setComponentRam(BOLT_NAME, boltRam);
+ topologyConfig.setComponentRam(SPOUT_NAME, spoutRam);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+ doPackingTest(topology,
+ instanceDefaultResources.cloneWithRam(boltRam), boltParallelism,
+ instanceDefaultResources.cloneWithRam(spoutRam), spoutParallelism,
+ numContainers, getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+ numContainers, getDefaultPadding()).cloneWithRam(containerRam));
+ }
+
+ /**
+ * Test the scenario RAM map config is completely set
+ */
+ @Test
+ public void testCompleteRamMapRequestedWithoutPaddingResource() throws Exception {
+ // Explicit set resources for container
+ // the value should be ignored, since we set the complete component RAM map
+ ByteAmount containerRam = ByteAmount.fromGigabytes(6);
+
+ // Explicit set component RAM map
+ ByteAmount boltRam = ByteAmount.fromGigabytes(1);
+ ByteAmount spoutRam = ByteAmount.fromGigabytes(2);
+
+ topologyConfig.setContainerRamRequested(containerRam);
+ topologyConfig.setComponentRam(BOLT_NAME, boltRam);
+ topologyConfig.setComponentRam(SPOUT_NAME, spoutRam);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+ doPackingTest(topology,
+ instanceDefaultResources.cloneWithRam(boltRam), boltParallelism,
+ instanceDefaultResources.cloneWithRam(spoutRam), spoutParallelism,
+ numContainers, getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+ numContainers, getDefaultPadding()).cloneWithRam(containerRam));
+ }
+
+ /**
+ * Test the scenario CPU map config is completely set and there are exactly enough resource
+ */
+ @Test
+ public void testCompleteCpuMapRequestedWithExactlyEnoughResource() throws Exception {
+ // Explicit set resources for container
+ double containerCpu = 17;
+
+ // Explicit set component CPU map
+ double boltCpu = 4;
+ double spoutCpu = 4;
+
+ topologyConfig.setContainerCpuRequested(containerCpu);
+ topologyConfig.setComponentCpu(BOLT_NAME, boltCpu);
+ topologyConfig.setComponentCpu(SPOUT_NAME, spoutCpu);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+ doPackingTest(topology,
+ instanceDefaultResources.cloneWithCpu(boltCpu), boltParallelism,
+ instanceDefaultResources.cloneWithCpu(spoutCpu), spoutParallelism,
+ numContainers, getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+ numContainers, getDefaultPadding()).cloneWithCpu(containerCpu));
+ }
+
+ /**
+ * Test the scenario CPU map config is completely set and there are more than enough resource
+ */
+ @Test
+ public void testCompleteCpuMapRequestedWithMoreThanEnoughResource() throws Exception {
+ // Explicit set resources for container
+ double containerCpu = 30;
+
+ // Explicit set component CPU map
+ double boltCpu = 4;
+ double spoutCpu = 4;
+
+ topologyConfig.setContainerCpuRequested(containerCpu);
+ topologyConfig.setComponentCpu(BOLT_NAME, boltCpu);
+ topologyConfig.setComponentCpu(SPOUT_NAME, spoutCpu);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+ doPackingTest(topology,
+ instanceDefaultResources.cloneWithCpu(boltCpu), boltParallelism,
+ instanceDefaultResources.cloneWithCpu(spoutCpu), spoutParallelism,
+ numContainers, getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+ numContainers, getDefaultPadding()).cloneWithCpu(containerCpu));
+ }
+
+ /**
+ * Test the scenario CPU map config is completely set and there are less than enough resource
+ */
+ @Test(expected = PackingException.class)
+ public void testCompleteCpuMapRequestedWithLessThanEnoughResource() throws Exception {
+ // Explicit set resources for container
+ double containerCpu = 10;
+
+ // Explicit set component CPU map
+ double boltCpu = 4;
+ double spoutCpu = 4;
+
+ topologyConfig.setContainerCpuRequested(containerCpu);
+ topologyConfig.setComponentCpu(BOLT_NAME, boltCpu);
+ topologyConfig.setComponentCpu(SPOUT_NAME, spoutCpu);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+ doPackingTest(topology,
+ instanceDefaultResources.cloneWithCpu(boltCpu), boltParallelism,
+ instanceDefaultResources.cloneWithCpu(spoutCpu), spoutParallelism,
+ numContainers, getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+ numContainers, getDefaultPadding()).cloneWithCpu(containerCpu));
+ }
+
+ /**
+ * Test the scenario CPU map config is completely set
+ * and there are exactly enough resource for instances, but not enough for padding
+ */
+ @Test
+ public void testCompleteCpuMapRequestedWithoutPaddingResource() throws Exception {
+ // Explicit set resources for container
+ double containerCpu = 16;
+
+ // Explicit set component CPU map
+ double boltCpu = 4;
+ double spoutCpu = 4;
+
+ topologyConfig.setContainerCpuRequested(containerCpu);
+ topologyConfig.setComponentCpu(BOLT_NAME, boltCpu);
+ topologyConfig.setComponentCpu(SPOUT_NAME, spoutCpu);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+ doPackingTest(topology,
+ instanceDefaultResources.cloneWithCpu(boltCpu), boltParallelism,
+ instanceDefaultResources.cloneWithCpu(spoutCpu), spoutParallelism,
+ numContainers, getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+ numContainers, getDefaultPadding()).cloneWithCpu(containerCpu));
+ }
+
+ /**
+ * Test the scenario RAM map config is partially set
+ */
+ @Test
+ public void testPartialRamMap() throws Exception {
+ // Explicit set resources for container
+ ByteAmount containerRam = ByteAmount.fromGigabytes(10);
+
+ // Explicit set component RAM map
+ ByteAmount boltRam = ByteAmount.fromGigabytes(1);
+
+ topologyConfig.setContainerRamRequested(containerRam);
+ topologyConfig.setComponentRam(BOLT_NAME, boltRam);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+ doPackingTest(topology,
+ instanceDefaultResources.cloneWithRam(boltRam), boltParallelism,
+ instanceDefaultResources, spoutParallelism,
+ numContainers, getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+ numContainers, getDefaultPadding()).cloneWithRam(containerRam));
+ }
+
+ /**
+ * Test the scenario CPU map config is partially set
+ */
+ @Test
+ public void testPartialCpuMap() throws Exception {
+ // Explicit set resources for container
+ double containerCpu = 17;
+
+ // Explicit set component CPU map
+ double boltCpu = 4;
+
+ topologyConfig.setContainerCpuRequested(containerCpu);
+ topologyConfig.setComponentCpu(BOLT_NAME, boltCpu);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+ doPackingTest(topology,
+ instanceDefaultResources.cloneWithCpu(boltCpu), boltParallelism,
+ instanceDefaultResources, spoutParallelism,
+ numContainers, getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+ numContainers, getDefaultPadding()).cloneWithCpu(containerCpu));
+
+ }
+
+ /**
+ * test even packing of instances
+ */
+ @Test
+ public void testEvenPacking() throws Exception {
+ int componentParallelism = 4;
+ boltParallelism = componentParallelism;
+ spoutParallelism = componentParallelism;
+
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+ doPackingTest(topology,
+ instanceDefaultResources, boltParallelism,
+ instanceDefaultResources, spoutParallelism,
+ numContainers, getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+ numContainers, getDefaultPadding()));
+ }
+
+
+ /**
+ * test re-packing with same total instances
+ */
+ @Test
+ public void testRepackingWithSameTotalInstances() throws Exception {
+ int componentParallelism = 4;
+ boltParallelism = componentParallelism;
+ spoutParallelism = componentParallelism;
+
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+ PackingPlan packingPlan = doPackingTestWithPartialResource(topology,
+ Optional.empty(), Optional.empty(), boltParallelism,
+ Optional.empty(), Optional.empty(), spoutParallelism,
+ numContainers, getDefaultPadding(),
+ getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+ numContainers, getDefaultPadding()));
+
+ Map<String, Integer> componentChanges = new HashMap<>();
+ componentChanges.put(SPOUT_NAME, -1);
+ componentChanges.put(BOLT_NAME, +1);
+
+ doScalingTest(topology, packingPlan, componentChanges,
+ instanceDefaultResources, boltParallelism,
+ instanceDefaultResources, spoutParallelism,
+ numContainers, getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+ numContainers, getDefaultPadding()));
+ }
+
+ /**
+ * test re-packing with more total instances
+ */
+ @Test
+ public void testRepackingWithMoreTotalInstances() throws Exception {
+ int componentParallelism = 4;
+ boltParallelism = componentParallelism;
+ spoutParallelism = componentParallelism;
+
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+ PackingPlan packingPlan = doPackingTestWithPartialResource(topology,
+ Optional.empty(), Optional.empty(), boltParallelism,
+ Optional.empty(), Optional.empty(), spoutParallelism,
+ numContainers, getDefaultPadding(),
+ getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+ numContainers, getDefaultPadding()));
+
+ Map<String, Integer> componentChanges = new HashMap<>();
+ componentChanges.put(SPOUT_NAME, +1);
+ componentChanges.put(BOLT_NAME, +1);
+
+ doScalingTest(topology, packingPlan, componentChanges,
+ instanceDefaultResources, boltParallelism,
+ instanceDefaultResources, spoutParallelism,
+ numContainers + 1,
+ getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+ numContainers, getDefaultPadding()));
+ }
+
+ /**
+ * test re-packing with fewer total instances
+ */
+ @Test
+ public void testRepackingWithFewerTotalInstances() throws Exception {
+ int componentParallelism = 4;
+ boltParallelism = componentParallelism;
+ spoutParallelism = componentParallelism;
+
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+ PackingPlan packingPlan = doPackingTestWithPartialResource(topology,
+ Optional.empty(), Optional.empty(), boltParallelism,
+ Optional.empty(), Optional.empty(), spoutParallelism,
+ numContainers, getDefaultPadding(),
+ getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+ numContainers, getDefaultPadding()));
+
+ Map<String, Integer> componentChanges = new HashMap<>();
+ componentChanges.put(SPOUT_NAME, -2);
+ componentChanges.put(BOLT_NAME, -2);
+
+ doScalingTest(topology, packingPlan, componentChanges,
+ instanceDefaultResources, boltParallelism,
+ instanceDefaultResources, spoutParallelism,
+ numContainers - 1,
+ getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+ numContainers, getDefaultPadding()));
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testZeroContainersFailure() throws Exception {
+ // Explicit set insufficient RAM for container
+ ByteAmount containerRam = ByteAmount.fromGigabytes(10);
+ topologyConfig.setContainerRamRequested(containerRam);
+ topologyConfig.setNumStmgrs(0);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+ doPackingTest(topology, instanceDefaultResources, boltParallelism,
+ instanceDefaultResources, spoutParallelism,
+ 0, // 0 containers
+ getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+ numContainers, getDefaultPadding()).cloneWithRam(containerRam));
+ }
+}