You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by ni...@apache.org on 2023/01/09 03:19:11 UTC

[incubator-heron] branch master updated: Prototype RoundRobinPackingV2 with better default container size determination (#3227)

This is an automated email from the ASF dual-hosted git repository.

nicknezis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 384d1774e03 Prototype RoundRobinPackingV2 with better default container size determination (#3227)
384d1774e03 is described below

commit 384d1774e033af30e0d5b0d2d32fd614529ddc9e
Author: Xiaoyao Qian <xq...@twitter.com>
AuthorDate: Sun Jan 8 19:19:06 2023 -0800

    Prototype RoundRobinPackingV2 with better default container size determination (#3227)
    
    * Revamp RoundRobinPacking default container size determination
---
 .../packing/roundrobin/RoundRobinPackingV2.java    | 496 ++++++++++++++++++++
 heron/packing/tests/java/BUILD                     |   9 +
 .../roundrobin/RoundRobinPackingV2Test.java        | 508 +++++++++++++++++++++
 3 files changed, 1013 insertions(+)

diff --git a/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPackingV2.java b/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPackingV2.java
new file mode 100644
index 00000000000..21e183c5a86
--- /dev/null
+++ b/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPackingV2.java
@@ -0,0 +1,496 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.packing.roundrobin;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.heron.api.generated.TopologyAPI;
+import org.apache.heron.api.utils.TopologyUtils;
+import org.apache.heron.common.basics.ByteAmount;
+import org.apache.heron.common.basics.CPUShare;
+import org.apache.heron.common.basics.ResourceMeasure;
+import org.apache.heron.packing.builder.ResourceRequirement;
+import org.apache.heron.packing.builder.SortingStrategy;
+import org.apache.heron.spi.common.Config;
+import org.apache.heron.spi.common.Context;
+import org.apache.heron.spi.packing.IPacking;
+import org.apache.heron.spi.packing.IRepacking;
+import org.apache.heron.spi.packing.InstanceId;
+import org.apache.heron.spi.packing.PackingException;
+import org.apache.heron.spi.packing.PackingPlan;
+import org.apache.heron.spi.packing.Resource;
+
+import static org.apache.heron.api.Config.TOPOLOGY_CONTAINER_CPU_REQUESTED;
+import static org.apache.heron.api.Config.TOPOLOGY_CONTAINER_DISK_REQUESTED;
+import static org.apache.heron.api.Config.TOPOLOGY_CONTAINER_RAM_REQUESTED;
+
+/**
+ * Round-robin packing algorithm
+ * <p>
+ * This IPacking implementation generates PackingPlan: instances of the component are assigned
+ * to each container one by one in circular order, without any priority. Each container is expected
+ * to take equal number of instances if # of instances is multiple of # of containers.
+ * <p>
+ * Following semantics are guaranteed:
+ * 1. Every container requires same size of resource, i.e. same CPU, RAM and disk.
+ * Consider that instances in different containers can be different, the value of size
+ * will be aligned to the max one.
+ * <p>
+ * 2. The size of resource required by the whole topology is equal to
+ * ((# of container specified in config) + 1) * (size of resource required for a single container).
+ * The extra 1 is considered for Heron internal container,
+ * i.e. the one containing Scheduler and TMaster.
+ * <p>
+ * 3. The disk required for a container is calculated as:
+ * value for org.apache.heron.api.Config.TOPOLOGY_CONTAINER_DISK_REQUESTED if exists, otherwise,
+ * (disk for instances in container) + (disk padding for heron internal process)
+ * <p>
+ * 4. The CPU required for a container is calculated as:
+ * value for org.apache.heron.api.Config.TOPOLOGY_CONTAINER_CPU_REQUESTED if exists, otherwise,
+ * (CPU for instances in container) + (CPU padding for heron internal process)
+ * <p>
+ * 5. The RAM required for a container is calculated as:
+ * value for org.apache.heron.api.Config.TOPOLOGY_CONTAINER_RAM_REQUESTED if exists, otherwise,
+ * (RAM for instances in container) + (RAM padding for heron internal process)
+ * <p>
+ * 6. The RAM required for one instance is calculated as:
+ * value in org.apache.heron.api.Config.TOPOLOGY_COMPONENT_RAMMAP if exists, otherwise,
+ * - if org.apache.heron.api.Config.TOPOLOGY_CONTAINER_RAM_REQUESTED not exists:
+ * the default RAM value for one instance
+ * - if org.apache.heron.api.Config.TOPOLOGY_CONTAINER_RAM_REQUESTED exists:
+ * ((TOPOLOGY_CONTAINER_RAM_REQUESTED) - (RAM padding for heron internal process)
+ * - (RAM used by instances within TOPOLOGY_COMPONENT_RAMMAP config))) /
+ * (the # of instances in container not specified in TOPOLOGY_COMPONENT_RAMMAP config)
+ * 7. The pack() return null if PackingPlan fails to pass the safe check, for instance,
+ * the size of RAM for an instance is less than the minimal required value.
+ */
+public class RoundRobinPackingV2 implements IPacking, IRepacking {
+  private static final Logger LOG = Logger.getLogger(RoundRobinPackingV2.class.getName());
+
+  @VisibleForTesting
+  static final ByteAmount DEFAULT_RAM_PADDING_PER_CONTAINER = ByteAmount.fromGigabytes(2);
+  @VisibleForTesting
+  static final double DEFAULT_CPU_PADDING_PER_CONTAINER = 1.0;
+  private static final ByteAmount DEFAULT_DISK_PADDING_PER_CONTAINER = ByteAmount.fromGigabytes(12);
+
+  @VisibleForTesting
+  static final ByteAmount DEFAULT_DAEMON_PROCESS_RAM_PADDING = ByteAmount.fromGigabytes(1);
+  private static final ByteAmount MIN_RAM_PER_INSTANCE = ByteAmount.fromMegabytes(192);
+
+  // Use as a stub as default number value when getting config value
+  private static final ByteAmount NOT_SPECIFIED_BYTE_AMOUNT = ByteAmount.fromBytes(-1);
+  private static final double NOT_SPECIFIED_CPU_SHARE = -1.0;
+
+  private static final String RAM = "RAM";
+  private static final String CPU = "CPU";
+  private static final String DISK = "DISK";
+
+  private TopologyAPI.Topology topology;
+
+  private ByteAmount instanceRamDefault;
+  private double instanceCpuDefault;
+  private ByteAmount instanceDiskDefault;
+  private ByteAmount containerRamPadding = DEFAULT_RAM_PADDING_PER_CONTAINER;
+  private double containerCpuPadding = DEFAULT_CPU_PADDING_PER_CONTAINER;
+
+  @Override
+  public void initialize(Config config, TopologyAPI.Topology inputTopology) {
+    this.topology = inputTopology;
+    this.instanceCpuDefault = Context.instanceCpu(config);
+    this.instanceRamDefault = Context.instanceRam(config);
+    this.instanceDiskDefault = Context.instanceDisk(config);
+    this.containerRamPadding = getContainerRamPadding(topology.getTopologyConfig().getKvsList());
+    LOG.info(String.format("Initalizing RoundRobinPackingV2. "
+        + "CPU default: %f, RAM default: %s, DISK default: %s, RAM padding: %s.",
+        this.instanceCpuDefault,
+        this.instanceRamDefault.toString(),
+        this.instanceDiskDefault.toString(),
+        this.containerRamPadding.toString()));
+  }
+
+  @Override
+  public PackingPlan pack() {
+    int numContainer = TopologyUtils.getNumContainers(topology);
+    Map<String, Integer> parallelismMap = TopologyUtils.getComponentParallelism(topology);
+
+    return packInternal(numContainer, parallelismMap);
+  }
+
+  private PackingPlan packInternal(int numContainer, Map<String, Integer> parallelismMap) {
+    // Get the instances' round-robin allocation
+    Map<Integer, List<InstanceId>> roundRobinAllocation =
+        getRoundRobinAllocation(numContainer, parallelismMap);
+
+    Resource containerResourceHint = getContainerResourceHint(roundRobinAllocation);
+
+    // Get the RAM map for every instance
+    Map<Integer, Map<InstanceId, ByteAmount>> instancesRamMap =
+        calculateInstancesResourceMapInContainer(
+        roundRobinAllocation,
+        TopologyUtils.getComponentRamMapConfig(topology),
+        containerResourceHint.getRam(),
+        instanceRamDefault,
+        containerRamPadding,
+        ByteAmount.ZERO,
+        NOT_SPECIFIED_BYTE_AMOUNT,
+        RAM);
+
+    // Get the CPU map for every instance
+    Map<Integer, Map<InstanceId, CPUShare>> instancesCpuMap =
+        calculateInstancesResourceMapInContainer(
+        roundRobinAllocation,
+        CPUShare.convertDoubleMapToCpuShareMap(TopologyUtils.getComponentCpuMapConfig(topology)),
+        CPUShare.fromDouble(containerResourceHint.getCpu()),
+        CPUShare.fromDouble(instanceCpuDefault),
+        CPUShare.fromDouble(containerCpuPadding),
+        CPUShare.fromDouble(0.0),
+        CPUShare.fromDouble(NOT_SPECIFIED_CPU_SHARE),
+        CPU);
+
+    LOG.info(String.format("Pack internal: container CPU hint: %.3f, RAM hint: %s, disk hint: %s.",
+        containerResourceHint.getCpu(),
+        containerResourceHint.getRam().toString(),
+        containerResourceHint.getDisk().toString()));
+
+    // Construct the PackingPlan
+    Set<PackingPlan.ContainerPlan> containerPlans = new HashSet<>();
+    for (int containerId : roundRobinAllocation.keySet()) {
+      List<InstanceId> instanceList = roundRobinAllocation.get(containerId);
+
+      // Calculate the resource required for single instance
+      Map<InstanceId, PackingPlan.InstancePlan> instancePlanMap = new HashMap<>();
+      ByteAmount containerRam = ByteAmount.ZERO;
+      double containerCpu = 0.0;
+
+      for (InstanceId instanceId : instanceList) {
+        ByteAmount instanceRam = instancesRamMap.get(containerId).get(instanceId);
+        Double instanceCpu = instancesCpuMap.get(containerId).get(instanceId).getValue();
+
+        // Currently not yet support disk config for different components, just use the default.
+        ByteAmount instanceDisk = instanceDiskDefault;
+
+        Resource resource = new Resource(instanceCpu, instanceRam, instanceDisk);
+
+        // Insert it into the map
+        instancePlanMap.put(instanceId, new PackingPlan.InstancePlan(instanceId, resource));
+        containerRam = containerRam.plus(instanceRam);
+        containerCpu += instanceCpu;
+      }
+
+      // finalize container resource
+      if (!containerResourceHint.getRam().equals(NOT_SPECIFIED_BYTE_AMOUNT)) {
+        containerRam = ByteAmount.fromBytes(
+            Math.min(containerRam.plus(containerRamPadding).asBytes(),
+                containerResourceHint.getRam().asBytes()));
+      } else {
+        containerRam = containerRam.plus(containerRamPadding);
+      }
+
+      if (containerResourceHint.getCpu() != NOT_SPECIFIED_CPU_SHARE) {
+        containerCpu = Math.min(containerCpu + containerCpuPadding, containerResourceHint.getCpu());
+      } else {
+        containerCpu += containerCpuPadding;
+      }
+
+      Resource resource = new Resource(Math.max(containerCpu, containerResourceHint.getCpu()),
+          containerRam, containerResourceHint.getDisk());
+      PackingPlan.ContainerPlan containerPlan = new PackingPlan.ContainerPlan(
+          containerId, new HashSet<>(instancePlanMap.values()), resource);
+
+      containerPlans.add(containerPlan);
+
+      LOG.info(String.format("Pack internal finalized: container#%d CPU: %f, RAM: %s, disk: %s.",
+          containerId,
+          resource.getCpu(),
+          resource.getRam().toString(),
+          resource.getDisk().toString()));
+    }
+
+    PackingPlan plan = new PackingPlan(topology.getId(), containerPlans);
+
+    validatePackingPlan(plan);
+    return plan;
+  }
+
+  @Override
+  public void close() {
+
+  }
+
+  private ByteAmount getContainerRamPadding(List<TopologyAPI.Config.KeyValue> topologyConfig) {
+    ByteAmount stmgrRam = TopologyUtils.getConfigWithDefault(topologyConfig,
+        org.apache.heron.api.Config.TOPOLOGY_STMGR_RAM,
+        DEFAULT_DAEMON_PROCESS_RAM_PADDING);
+    ByteAmount metricsmgrRam = TopologyUtils.getConfigWithDefault(topologyConfig,
+        org.apache.heron.api.Config.TOPOLOGY_METRICSMGR_RAM,
+        DEFAULT_DAEMON_PROCESS_RAM_PADDING);
+    String reliabilityMode = TopologyUtils.getConfigWithDefault(topologyConfig,
+        org.apache.heron.api.Config.TOPOLOGY_RELIABILITY_MODE,
+        org.apache.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE.name());
+    boolean isStateful =
+        org.apache.heron.api.Config.TopologyReliabilityMode
+            .EFFECTIVELY_ONCE.name().equals(reliabilityMode);
+    ByteAmount ckptmgrRam = TopologyUtils.getConfigWithDefault(topologyConfig,
+        org.apache.heron.api.Config.TOPOLOGY_STATEFUL_CKPTMGR_RAM,
+        isStateful ? DEFAULT_DAEMON_PROCESS_RAM_PADDING : ByteAmount.ZERO);
+
+    ByteAmount daemonProcessPadding = stmgrRam.plus(metricsmgrRam).plus(ckptmgrRam);
+
+    // return the container padding if it's set, otherwise return the total daemon request ram
+    return TopologyUtils.getConfigWithDefault(topologyConfig,
+        org.apache.heron.api.Config.TOPOLOGY_CONTAINER_RAM_PADDING,
+        daemonProcessPadding);
+  }
+
+  @SuppressWarnings("unchecked")
+  private <T extends ResourceMeasure> Map<Integer, Map<InstanceId, T>>
+            calculateInstancesResourceMapInContainer(
+                Map<Integer, List<InstanceId>> allocation,
+                Map<String, T> resMap,
+                T containerResHint,
+                T instanceResDefault,
+                T containerResPadding,
+                T zero,
+                T notSpecified,
+                String resourceType) {
+    Map<Integer, Map<InstanceId, T>> instancesResMapInContainer = new HashMap<>();
+
+    for (int containerId : allocation.keySet()) {
+      List<InstanceId> instanceIds = allocation.get(containerId);
+      Map<InstanceId, T> resInsideContainer = new HashMap<>();
+      instancesResMapInContainer.put(containerId, resInsideContainer);
+
+      // Register the instance resource allocation and calculate the used resource so far
+      T usedRes = zero;
+      for (InstanceId instanceId : instanceIds) {
+        String componentName = instanceId.getComponentName();
+        if (resMap.containsKey(componentName)) {
+          T res = resMap.get(componentName);
+          resInsideContainer.put(instanceId, res);
+          usedRes = (T) usedRes.plus(res);
+        } else {
+          resInsideContainer.put(instanceId, instanceResDefault);
+          usedRes = (T) usedRes.plus(instanceResDefault);
+        }
+      }
+
+      // Soft padding constraint validation: warn if padding amount cannot be accommodated
+      if (!containerResHint.equals(notSpecified)
+          && usedRes.greaterThan(containerResHint.minus(containerResPadding))) {
+        // Validate instance resources specified so far don't violate container-level constraint
+        if (usedRes.greaterThan(containerResHint)) {
+          throw new PackingException(String.format("Invalid packing plan generated. "
+                  + "Total instance %s (%s) in container#%d have exceeded "
+                  + "the container-level constraint of %s.",
+              resourceType, usedRes.toString(), containerId, containerResHint.toString()));
+        }
+
+        LOG.warning(String.format("Container#%d (max %s: %s) is now hosting instances that "
+                + "take up to %s %s. The container may not have enough resource to accommodate "
+                + "internal processes which take up to %s %s.",
+            containerId, resourceType, containerResHint.toString(),
+            usedRes.toString(), resourceType,
+            containerResPadding.toString(), resourceType));
+      }
+    }
+
+    return instancesResMapInContainer;
+  }
+
+  /**
+   * Get the instances' allocation basing on round robin algorithm
+   *
+   * @return containerId -&gt; list of InstanceId belonging to this container
+   */
+  private Map<Integer, List<InstanceId>> getRoundRobinAllocation(
+      int numContainer, Map<String, Integer> parallelismMap) {
+    Map<Integer, List<InstanceId>> allocation = new HashMap<>();
+    int totalInstance = TopologyUtils.getTotalInstance(parallelismMap);
+    if (numContainer < 1) {
+      throw new RuntimeException(String.format("Invlaid number of container: %d", numContainer));
+    } else if (numContainer > totalInstance) {
+      throw new RuntimeException(
+          String.format("More containers (%d) allocated than instances (%d).",
+              numContainer, totalInstance));
+    }
+
+    for (int i = 1; i <= numContainer; ++i) {
+      allocation.put(i, new ArrayList<>());
+    }
+
+    int index = 1;
+    int globalTaskIndex = 1;
+
+    // To ensure we spread out the big instances first
+    // Only sorting by RAM here because only RAM can be explicitly capped by JVM processes
+    List<String> sortedInstances = getSortedRAMComponents(parallelismMap.keySet()).stream()
+        .map(ResourceRequirement::getComponentName).collect(Collectors.toList());
+    for (String component : sortedInstances) {
+      int numInstance = parallelismMap.get(component);
+      for (int i = 0; i < numInstance; ++i) {
+        allocation.get(index).add(new InstanceId(component, globalTaskIndex, i));
+        index = (index == numContainer) ? 1 : index + 1;
+        globalTaskIndex++;
+      }
+    }
+    return allocation;
+  }
+
+
+  /**
+   * Sort the components in decreasing order based on their RAM requirements
+   *
+   * @return The sorted list of components and their RAM requirements
+   */
+  private ArrayList<ResourceRequirement> getSortedRAMComponents(Set<String> componentNames) {
+    ArrayList<ResourceRequirement> resourceRequirements = new ArrayList<>();
+    Map<String, ByteAmount> ramMap = TopologyUtils.getComponentRamMapConfig(topology);
+
+    for (String componentName : componentNames) {
+      resourceRequirements.add(new ResourceRequirement(componentName,
+          ramMap.getOrDefault(componentName, ByteAmount.ZERO)));
+    }
+    Collections.sort(resourceRequirements, SortingStrategy.RAM_FIRST.reversed());
+    return resourceRequirements;
+  }
+
+  /**
+   * Get # of instances in the largest container
+   *
+   * @param allocation the instances' allocation
+   * @return # of instances in the largest container
+   */
+  private int getLargestContainerSize(Map<Integer, List<InstanceId>> allocation) {
+    int max = 0;
+    for (List<InstanceId> instances : allocation.values()) {
+      if (instances.size() > max) {
+        max = instances.size();
+      }
+    }
+    return max;
+  }
+
+  private Resource getContainerResourceHint(Map<Integer, List<InstanceId>> allocation) {
+    List<TopologyAPI.Config.KeyValue> topologyConfig = topology.getTopologyConfig().getKvsList();
+    int largestContainerSize = getLargestContainerSize(allocation);
+
+    return new Resource(
+        TopologyUtils.getConfigWithDefault(topologyConfig, TOPOLOGY_CONTAINER_CPU_REQUESTED,
+            NOT_SPECIFIED_CPU_SHARE),
+        TopologyUtils.getConfigWithDefault(topologyConfig, TOPOLOGY_CONTAINER_RAM_REQUESTED,
+            NOT_SPECIFIED_BYTE_AMOUNT),
+        TopologyUtils.getConfigWithDefault(topologyConfig, TOPOLOGY_CONTAINER_DISK_REQUESTED,
+            instanceDiskDefault.multiply(largestContainerSize)
+                .plus(DEFAULT_DISK_PADDING_PER_CONTAINER)));
+  }
+
+  /**
+   * Check whether the PackingPlan generated is valid
+   *
+   * @param plan The PackingPlan to check
+   * @throws PackingException if it's not a valid plan
+   */
+  private void validatePackingPlan(PackingPlan plan) throws PackingException {
+    for (PackingPlan.ContainerPlan containerPlan : plan.getContainers()) {
+      for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
+        // Safe check
+        if (instancePlan.getResource().getRam().lessThan(MIN_RAM_PER_INSTANCE)) {
+          throw new PackingException(String.format("Invalid packing plan generated. A minimum of "
+                  + "%s RAM is required, but InstancePlan for component '%s' has %s",
+              MIN_RAM_PER_INSTANCE, instancePlan.getComponentName(),
+              instancePlan.getResource().getRam()));
+        }
+      }
+    }
+  }
+
+  /**
+   * Read the current packing plan with update parallelism to calculate a new packing plan.
+   * This method should determine a new number of containers based on the updated parallism
+   * while remaining the number of instances per container <= that of the old packing plan.
+   * The packing algorithm packInternal() is shared with pack()
+   * delegate to packInternal() with the new container count and component parallelism
+   *
+   * @param currentPackingPlan Existing packing plan
+   * @param componentChanges Map &lt; componentName, new component parallelism &gt;
+   * that contains the parallelism for each component whose parallelism has changed.
+   * @return new packing plan
+   * @throws PackingException
+   */
+  @Override
+  public PackingPlan repack(PackingPlan currentPackingPlan, Map<String, Integer> componentChanges)
+      throws PackingException {
+    int initialNumContainer = TopologyUtils.getNumContainers(topology);
+    int initialNumInstance = TopologyUtils.getTotalInstance(topology);
+    double initialNumInstancePerContainer = (double) initialNumInstance / initialNumContainer;
+
+    Map<String, Integer> newComponentParallelism =
+        getNewComponentParallelism(currentPackingPlan, componentChanges);
+
+    int newNumInstance = TopologyUtils.getTotalInstance(newComponentParallelism);
+    int newNumContainer = (int) Math.ceil(newNumInstance / initialNumInstancePerContainer);
+    return packInternal(newNumContainer, newComponentParallelism);
+  }
+
+  public Map<String, Integer> getNewComponentParallelism(PackingPlan currentPackingPlan,
+                                                         Map<String, Integer> componentChanges) {
+    Map<String, Integer> currentComponentParallelism = currentPackingPlan.getComponentCounts();
+    for (Map.Entry<String, Integer> e : componentChanges.entrySet()) {
+      Integer newParallelism = currentComponentParallelism.get(e.getKey()) + e.getValue();
+      currentComponentParallelism.put(e.getKey(), newParallelism);
+    }
+    return currentComponentParallelism;
+  }
+
+  /**
+   * Read the current packing plan with update parallelism and number of containers
+   * to calculate a new packing plan.
+   * The packing algorithm packInternal() is shared with pack()
+   * delegate to packInternal() with the new container count and component parallelism
+   *
+   * @param currentPackingPlan Existing packing plan
+   * @param containers &lt; the new number of containers for the topology
+   * specified by the user
+   * @param componentChanges Map &lt; componentName, new component parallelism &gt;
+   * that contains the parallelism for each component whose parallelism has changed.
+   * @return new packing plan
+   * @throws PackingException
+   */
+  @Override
+  public PackingPlan repack(PackingPlan currentPackingPlan, int containers, Map<String, Integer>
+      componentChanges) throws PackingException {
+    if (containers == currentPackingPlan.getContainers().size()) {
+      return repack(currentPackingPlan, componentChanges);
+    }
+    Map<String, Integer> newComponentParallelism = getNewComponentParallelism(currentPackingPlan,
+        componentChanges);
+    return packInternal(containers, newComponentParallelism);
+  }
+}
diff --git a/heron/packing/tests/java/BUILD b/heron/packing/tests/java/BUILD
index 7ce0107e8c5..b870e04e4ab 100644
--- a/heron/packing/tests/java/BUILD
+++ b/heron/packing/tests/java/BUILD
@@ -69,6 +69,15 @@ java_test(
     deps = roundrobin_deps_files,
 )
 
+java_test(
+    name = "RoundRobinPackingV2Test",
+    srcs = glob(
+        ["**/roundrobin/RoundRobinPackingV2Test.java"]
+    ),
+    deps = roundrobin_deps_files,
+    size = "small",
+)
+
 java_test(
     name = "ResourceCompliantRRPackingTest",
     size = "small",
diff --git a/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingV2Test.java b/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingV2Test.java
new file mode 100644
index 00000000000..274c47ca705
--- /dev/null
+++ b/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingV2Test.java
@@ -0,0 +1,508 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.packing.roundrobin;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.heron.common.basics.ByteAmount;
+import org.apache.heron.packing.CommonPackingTests;
+import org.apache.heron.spi.packing.IPacking;
+import org.apache.heron.spi.packing.IRepacking;
+import org.apache.heron.spi.packing.PackingException;
+import org.apache.heron.spi.packing.PackingPlan;
+import org.apache.heron.spi.packing.Resource;
+
+public class RoundRobinPackingV2Test extends CommonPackingTests {
+  @Override
+  protected IPacking getPackingImpl() {
+    return new RoundRobinPackingV2();
+  }
+
+  @Override
+  protected IRepacking getRepackingImpl() {
+    return new RoundRobinPackingV2();
+  }
+
+  private Resource getDefaultPadding() {
+    return new Resource(RoundRobinPackingV2.DEFAULT_CPU_PADDING_PER_CONTAINER,
+        RoundRobinPackingV2.DEFAULT_RAM_PADDING_PER_CONTAINER, ByteAmount.ZERO);
+  }
+
+  @Before
+  public void setUp() {
+    super.setUp();
+    numContainers = 2;
+    topologyConfig.setNumStmgrs(numContainers);
+    topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+  }
+
+  @Test(expected = PackingException.class)
+  public void testCheckInsufficientRamFailure() throws Exception {
+    // Explicit set insufficient RAM for container
+    ByteAmount containerRam = ByteAmount.ZERO;
+    topologyConfig.setContainerRamRequested(containerRam);
+    topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+    doPackingTest(topology, instanceDefaultResources, boltParallelism,
+        instanceDefaultResources, spoutParallelism,
+        numContainers,
+        getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+            numContainers, getDefaultPadding()).cloneWithRam(containerRam));
+  }
+
+  @Test(expected = PackingException.class)
+  public void testCheckInsufficientCpuFailure() throws Exception {
+    // Explicit set insufficient CPU for container
+    double containerCpu = 1.0;
+    topologyConfig.setContainerCpuRequested(containerCpu);
+    topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+    doPackingTest(topology, instanceDefaultResources, boltParallelism,
+        instanceDefaultResources, spoutParallelism,
+        numContainers,
+        getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+            numContainers, getDefaultPadding()).cloneWithCpu(containerCpu));
+  }
+
+  @Test
+  public void testDefaultResources() throws Exception {
+    doPackingTest(topology,
+        instanceDefaultResources, boltParallelism,
+        instanceDefaultResources, spoutParallelism,
+        numContainers, getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+            numContainers, getDefaultPadding()));
+  }
+
+  /**
+   * Test the scenario container level resource config are set
+   */
+  @Test
+  public void testContainerRequestedResources() throws Exception {
+    // Explicit set resources for container
+    ByteAmount containerRam = ByteAmount.fromGigabytes(10);
+    ByteAmount containerDisk = ByteAmount.fromGigabytes(20);
+    double containerCpu = 30;
+    Resource containerResource = new Resource(containerCpu, containerRam, containerDisk);
+
+    topologyConfig.setContainerRamRequested(containerRam);
+    topologyConfig.setContainerDiskRequested(containerDisk);
+    topologyConfig.setContainerCpuRequested(containerCpu);
+    topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+    doPackingTest(topology,
+        instanceDefaultResources, boltParallelism,
+        instanceDefaultResources, spoutParallelism,
+        numContainers, containerResource);
+  }
+
+  /**
+   * Test the scenario container level resource config are set
+   */
+  @Test
+  public void testContainerRequestedResourcesWhenRamPaddingSet() throws Exception {
+    // Explicit set resources for container
+    ByteAmount containerRam = ByteAmount.fromGigabytes(10);
+    ByteAmount containerDisk = ByteAmount.fromGigabytes(20);
+    double containerCpu = 30;
+    ByteAmount containerRamPadding = ByteAmount.fromMegabytes(512);
+    Resource containerResource = new Resource(containerCpu, containerRam, containerDisk);
+
+    topologyConfig.setContainerRamRequested(containerRam);
+    topologyConfig.setContainerDiskRequested(containerDisk);
+    topologyConfig.setContainerCpuRequested(containerCpu);
+    topologyConfig.setContainerRamPadding(containerRamPadding);
+    topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+    doPackingTest(topology,
+        instanceDefaultResources, boltParallelism,
+        instanceDefaultResources, spoutParallelism,
+        numContainers, containerResource);
+  }
+
+  /**
+   * Test the scenario RAM map config is completely set
+   */
+  @Test
+  public void testCompleteRamMapRequestedWithExactlyEnoughResource() throws Exception {
+    // Explicit set resources for container
+    // the value should be ignored, since we set the complete component RAM map
+    ByteAmount containerRam = ByteAmount.fromGigabytes(8);
+
+    // Explicit set component RAM map
+    ByteAmount boltRam = ByteAmount.fromGigabytes(1);
+    ByteAmount spoutRam = ByteAmount.fromGigabytes(2);
+
+    topologyConfig.setContainerRamRequested(containerRam);
+    topologyConfig.setComponentRam(BOLT_NAME, boltRam);
+    topologyConfig.setComponentRam(SPOUT_NAME, spoutRam);
+    topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+    doPackingTest(topology,
+        instanceDefaultResources.cloneWithRam(boltRam), boltParallelism,
+        instanceDefaultResources.cloneWithRam(spoutRam), spoutParallelism,
+        numContainers, getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+            numContainers, getDefaultPadding()).cloneWithRam(containerRam));
+  }
+
+  /**
+   * Test the scenario RAM map config is completely set
+   */
+  @Test(expected = PackingException.class)
+  public void testCompleteRamMapRequestedWithLessThanEnoughResource() throws Exception {
+    // Explicit set resources for container
+    // the value should be ignored, since we set the complete component RAM map
+    ByteAmount containerRam = ByteAmount.fromGigabytes(2);
+
+    // Explicit set component RAM map
+    ByteAmount boltRam = ByteAmount.fromGigabytes(1);
+    ByteAmount spoutRam = ByteAmount.fromGigabytes(2);
+
+    topologyConfig.setContainerRamRequested(containerRam);
+    topologyConfig.setComponentRam(BOLT_NAME, boltRam);
+    topologyConfig.setComponentRam(SPOUT_NAME, spoutRam);
+    topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+    doPackingTest(topology,
+        instanceDefaultResources.cloneWithRam(boltRam), boltParallelism,
+        instanceDefaultResources.cloneWithRam(spoutRam), spoutParallelism,
+        numContainers, getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+            numContainers, getDefaultPadding()).cloneWithRam(containerRam));
+  }
+
+  /**
+   * Test the scenario RAM map config is completely set
+   */
+  @Test
+  public void testCompleteRamMapRequestedWithMoreThanEnoughResource() throws Exception {
+    // Explicit set resources for container
+    // the value should be ignored, since we set the complete component RAM map
+    ByteAmount containerRam = ByteAmount.fromBytes(Long.MAX_VALUE);
+
+    // Explicit set component RAM map
+    ByteAmount boltRam = ByteAmount.fromGigabytes(1);
+    ByteAmount spoutRam = ByteAmount.fromGigabytes(2);
+
+    topologyConfig.setContainerRamRequested(containerRam);
+    topologyConfig.setComponentRam(BOLT_NAME, boltRam);
+    topologyConfig.setComponentRam(SPOUT_NAME, spoutRam);
+    topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+    doPackingTest(topology,
+        instanceDefaultResources.cloneWithRam(boltRam), boltParallelism,
+        instanceDefaultResources.cloneWithRam(spoutRam), spoutParallelism,
+        numContainers, getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+            numContainers, getDefaultPadding()).cloneWithRam(containerRam));
+  }
+
+  /**
+   * Test the scenario RAM map config is completely set
+   */
+  @Test
+  public void testCompleteRamMapRequestedWithoutPaddingResource() throws Exception {
+    // Explicit set resources for container
+    // the value should be ignored, since we set the complete component RAM map
+    ByteAmount containerRam = ByteAmount.fromGigabytes(6);
+
+    // Explicit set component RAM map
+    ByteAmount boltRam = ByteAmount.fromGigabytes(1);
+    ByteAmount spoutRam = ByteAmount.fromGigabytes(2);
+
+    topologyConfig.setContainerRamRequested(containerRam);
+    topologyConfig.setComponentRam(BOLT_NAME, boltRam);
+    topologyConfig.setComponentRam(SPOUT_NAME, spoutRam);
+    topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+    doPackingTest(topology,
+        instanceDefaultResources.cloneWithRam(boltRam), boltParallelism,
+        instanceDefaultResources.cloneWithRam(spoutRam), spoutParallelism,
+        numContainers, getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+            numContainers, getDefaultPadding()).cloneWithRam(containerRam));
+  }
+
+  /**
+   * Test the scenario CPU map config is completely set and there are exactly enough resource
+   */
+  @Test
+  public void testCompleteCpuMapRequestedWithExactlyEnoughResource() throws Exception {
+    // Explicit set resources for container
+    double containerCpu = 17;
+
+    // Explicit set component CPU map
+    double boltCpu = 4;
+    double spoutCpu = 4;
+
+    topologyConfig.setContainerCpuRequested(containerCpu);
+    topologyConfig.setComponentCpu(BOLT_NAME, boltCpu);
+    topologyConfig.setComponentCpu(SPOUT_NAME, spoutCpu);
+    topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+    doPackingTest(topology,
+        instanceDefaultResources.cloneWithCpu(boltCpu), boltParallelism,
+        instanceDefaultResources.cloneWithCpu(spoutCpu), spoutParallelism,
+        numContainers, getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+            numContainers, getDefaultPadding()).cloneWithCpu(containerCpu));
+  }
+
+  /**
+   * Test the scenario CPU map config is completely set and there are more than enough resource
+   */
+  @Test
+  public void testCompleteCpuMapRequestedWithMoreThanEnoughResource() throws Exception {
+    // Explicit set resources for container
+    double containerCpu = 30;
+
+    // Explicit set component CPU map
+    double boltCpu = 4;
+    double spoutCpu = 4;
+
+    topologyConfig.setContainerCpuRequested(containerCpu);
+    topologyConfig.setComponentCpu(BOLT_NAME, boltCpu);
+    topologyConfig.setComponentCpu(SPOUT_NAME, spoutCpu);
+    topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+    doPackingTest(topology,
+        instanceDefaultResources.cloneWithCpu(boltCpu), boltParallelism,
+        instanceDefaultResources.cloneWithCpu(spoutCpu), spoutParallelism,
+        numContainers, getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+            numContainers, getDefaultPadding()).cloneWithCpu(containerCpu));
+  }
+
+  /**
+   * Test the scenario CPU map config is completely set and there are less than enough resource
+   */
+  @Test(expected = PackingException.class)
+  public void testCompleteCpuMapRequestedWithLessThanEnoughResource() throws Exception {
+    // Explicit set resources for container
+    double containerCpu = 10;
+
+    // Explicit set component CPU map
+    double boltCpu = 4;
+    double spoutCpu = 4;
+
+    topologyConfig.setContainerCpuRequested(containerCpu);
+    topologyConfig.setComponentCpu(BOLT_NAME, boltCpu);
+    topologyConfig.setComponentCpu(SPOUT_NAME, spoutCpu);
+    topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+    doPackingTest(topology,
+        instanceDefaultResources.cloneWithCpu(boltCpu), boltParallelism,
+        instanceDefaultResources.cloneWithCpu(spoutCpu), spoutParallelism,
+        numContainers, getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+            numContainers, getDefaultPadding()).cloneWithCpu(containerCpu));
+  }
+
+  /**
+   * Test the scenario CPU map config is completely set
+   * and there are exactly enough resource for instances, but not enough for padding
+   */
+  @Test
+  public void testCompleteCpuMapRequestedWithoutPaddingResource() throws Exception {
+    // Explicit set resources for container
+    double containerCpu = 16;
+
+    // Explicit set component CPU map
+    double boltCpu = 4;
+    double spoutCpu = 4;
+
+    topologyConfig.setContainerCpuRequested(containerCpu);
+    topologyConfig.setComponentCpu(BOLT_NAME, boltCpu);
+    topologyConfig.setComponentCpu(SPOUT_NAME, spoutCpu);
+    topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+    doPackingTest(topology,
+        instanceDefaultResources.cloneWithCpu(boltCpu), boltParallelism,
+        instanceDefaultResources.cloneWithCpu(spoutCpu), spoutParallelism,
+        numContainers, getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+            numContainers, getDefaultPadding()).cloneWithCpu(containerCpu));
+  }
+
+  /**
+   * Test the scenario RAM map config is partially set
+   */
+  @Test
+  public void testPartialRamMap() throws Exception {
+    // Explicit set resources for container
+    ByteAmount containerRam = ByteAmount.fromGigabytes(10);
+
+    // Explicit set component RAM map
+    ByteAmount boltRam = ByteAmount.fromGigabytes(1);
+
+    topologyConfig.setContainerRamRequested(containerRam);
+    topologyConfig.setComponentRam(BOLT_NAME, boltRam);
+    topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+    doPackingTest(topology,
+        instanceDefaultResources.cloneWithRam(boltRam), boltParallelism,
+        instanceDefaultResources, spoutParallelism,
+        numContainers, getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+            numContainers, getDefaultPadding()).cloneWithRam(containerRam));
+  }
+
+  /**
+   * Test the scenario CPU map config is partially set
+   */
+  @Test
+  public void testPartialCpuMap() throws Exception {
+    // Explicit set resources for container
+    double containerCpu = 17;
+
+    // Explicit set component CPU map
+    double boltCpu = 4;
+
+    topologyConfig.setContainerCpuRequested(containerCpu);
+    topologyConfig.setComponentCpu(BOLT_NAME, boltCpu);
+    topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+    doPackingTest(topology,
+        instanceDefaultResources.cloneWithCpu(boltCpu), boltParallelism,
+        instanceDefaultResources, spoutParallelism,
+        numContainers, getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+            numContainers, getDefaultPadding()).cloneWithCpu(containerCpu));
+
+  }
+
+  /**
+   * test even packing of instances
+   */
+  @Test
+  public void testEvenPacking() throws Exception {
+    int componentParallelism = 4;
+    boltParallelism = componentParallelism;
+    spoutParallelism = componentParallelism;
+
+    topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+    doPackingTest(topology,
+        instanceDefaultResources, boltParallelism,
+        instanceDefaultResources, spoutParallelism,
+        numContainers, getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+            numContainers, getDefaultPadding()));
+  }
+
+
+  /**
+   * test re-packing with same total instances
+   */
+  @Test
+  public void testRepackingWithSameTotalInstances() throws Exception {
+    int componentParallelism = 4;
+    boltParallelism = componentParallelism;
+    spoutParallelism = componentParallelism;
+
+    topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+    PackingPlan packingPlan = doPackingTestWithPartialResource(topology,
+        Optional.empty(), Optional.empty(), boltParallelism,
+        Optional.empty(), Optional.empty(), spoutParallelism,
+        numContainers, getDefaultPadding(),
+        getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+            numContainers, getDefaultPadding()));
+
+    Map<String, Integer> componentChanges = new HashMap<>();
+    componentChanges.put(SPOUT_NAME, -1);
+    componentChanges.put(BOLT_NAME,  +1);
+
+    doScalingTest(topology, packingPlan, componentChanges,
+        instanceDefaultResources, boltParallelism,
+        instanceDefaultResources, spoutParallelism,
+        numContainers, getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+            numContainers, getDefaultPadding()));
+  }
+
+  /**
+   * test re-packing with more total instances
+   */
+  @Test
+  public void testRepackingWithMoreTotalInstances() throws Exception {
+    int componentParallelism = 4;
+    boltParallelism = componentParallelism;
+    spoutParallelism = componentParallelism;
+
+    topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+    PackingPlan packingPlan = doPackingTestWithPartialResource(topology,
+        Optional.empty(), Optional.empty(), boltParallelism,
+        Optional.empty(), Optional.empty(), spoutParallelism,
+        numContainers, getDefaultPadding(),
+        getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+            numContainers, getDefaultPadding()));
+
+    Map<String, Integer> componentChanges = new HashMap<>();
+    componentChanges.put(SPOUT_NAME, +1);
+    componentChanges.put(BOLT_NAME,  +1);
+
+    doScalingTest(topology, packingPlan, componentChanges,
+        instanceDefaultResources, boltParallelism,
+        instanceDefaultResources, spoutParallelism,
+        numContainers + 1,
+        getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+            numContainers, getDefaultPadding()));
+  }
+
+  /**
+   * test re-packing with fewer total instances
+   */
+  @Test
+  public void testRepackingWithFewerTotalInstances() throws Exception {
+    int componentParallelism = 4;
+    boltParallelism = componentParallelism;
+    spoutParallelism = componentParallelism;
+
+    topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+    PackingPlan packingPlan = doPackingTestWithPartialResource(topology,
+        Optional.empty(), Optional.empty(), boltParallelism,
+        Optional.empty(), Optional.empty(), spoutParallelism,
+        numContainers, getDefaultPadding(),
+        getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+            numContainers, getDefaultPadding()));
+
+    Map<String, Integer> componentChanges = new HashMap<>();
+    componentChanges.put(SPOUT_NAME, -2);
+    componentChanges.put(BOLT_NAME,  -2);
+
+    doScalingTest(topology, packingPlan, componentChanges,
+        instanceDefaultResources, boltParallelism,
+        instanceDefaultResources, spoutParallelism,
+        numContainers - 1,
+        getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+            numContainers, getDefaultPadding()));
+  }
+
+  @Test(expected = RuntimeException.class)
+  public void testZeroContainersFailure() throws Exception {
+    // Explicit set insufficient RAM for container
+    ByteAmount containerRam = ByteAmount.fromGigabytes(10);
+    topologyConfig.setContainerRamRequested(containerRam);
+    topologyConfig.setNumStmgrs(0);
+    topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+    doPackingTest(topology, instanceDefaultResources, boltParallelism,
+        instanceDefaultResources, spoutParallelism,
+        0,  // 0 containers
+        getDefaultUnspecifiedContainerResource(boltParallelism + spoutParallelism,
+            numContainers, getDefaultPadding()).cloneWithRam(containerRam));
+  }
+}