You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by GitBox <gi...@apache.org> on 2019/01/10 22:38:09 UTC

[GitHub] huijunwu closed pull request #3142: Validate resource constraint (RAM and CPU) in RoundRobinPacking

huijunwu closed pull request #3142: Validate resource constraint (RAM and CPU) in RoundRobinPacking
URL: https://github.com/apache/incubator-heron/pull/3142
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/examples/src/java/org/apache/heron/examples/api/ExampleResources.java b/examples/src/java/org/apache/heron/examples/api/ExampleResources.java
index be92dea165..6c08d492a6 100644
--- a/examples/src/java/org/apache/heron/examples/api/ExampleResources.java
+++ b/examples/src/java/org/apache/heron/examples/api/ExampleResources.java
@@ -23,7 +23,9 @@
 
 public final class ExampleResources {
 
+  static final long DEFAULT_RAM_PADDING_PER_CONTAINER = 2 * 1024;
   static final long COMPONENT_RAM_MB = 1024;
+  static final double DEFAULT_CPU_PADDING_PER_CONTAINER = 1.0;
 
   static ByteAmount getComponentRam() {
     return ByteAmount.fromMegabytes(COMPONENT_RAM_MB);
@@ -35,7 +37,13 @@ static ByteAmount getContainerDisk(int components, int containers) {
 
   static ByteAmount getContainerRam(int components, int containers) {
     final int componentsPerContainer = Math.max(components / containers, 1);
-    return ByteAmount.fromMegabytes(COMPONENT_RAM_MB * componentsPerContainer);
+    return ByteAmount.fromMegabytes(COMPONENT_RAM_MB * componentsPerContainer
+        + DEFAULT_RAM_PADDING_PER_CONTAINER);
+  }
+
+  static double getContainerCpu(int components, int containers) {
+    final int componentsPerContainer = Math.max(components / containers, 1);
+    return componentsPerContainer + DEFAULT_CPU_PADDING_PER_CONTAINER;
   }
 
   private ExampleResources() {
diff --git a/examples/src/java/org/apache/heron/examples/api/WordCountTopology.java b/examples/src/java/org/apache/heron/examples/api/WordCountTopology.java
index 5991cfd035..3b1feb40e8 100644
--- a/examples/src/java/org/apache/heron/examples/api/WordCountTopology.java
+++ b/examples/src/java/org/apache/heron/examples/api/WordCountTopology.java
@@ -188,12 +188,16 @@ public static void main(String[] args) throws AlreadyAliveException, InvalidTopo
     conf.setComponentRam("consumer",
         ByteAmount.fromMegabytes(ExampleResources.COMPONENT_RAM_MB));
 
+    conf.setComponentCpu("word", 1.0);
+    conf.setComponentCpu("consumer", 1.0);
+
     // configure container resources
     conf.setContainerDiskRequested(
         ExampleResources.getContainerDisk(2 * parallelism, parallelism));
     conf.setContainerRamRequested(
         ExampleResources.getContainerRam(2 * parallelism, parallelism));
-    conf.setContainerCpuRequested(2);
+    conf.setContainerCpuRequested(
+        ExampleResources.getContainerCpu(2 * parallelism, parallelism));
 
     HeronSubmitter.submitTopology(args[0], conf, builder.createTopology());
   }
diff --git a/heron/common/src/java/org/apache/heron/common/basics/ByteAmount.java b/heron/common/src/java/org/apache/heron/common/basics/ByteAmount.java
index 11c57459c5..5b576af92c 100644
--- a/heron/common/src/java/org/apache/heron/common/basics/ByteAmount.java
+++ b/heron/common/src/java/org/apache/heron/common/basics/ByteAmount.java
@@ -22,7 +22,7 @@
 /**
  * Class that encapsulates number of bytes, with helpers to handle units properly.
  */
-public final class ByteAmount implements Comparable<ByteAmount> {
+public final class ByteAmount extends ResourceMeasure<Long> {
   private static final long KB = 1024L;
   private static final long MB = KB * 1024;
   private static final long GB = MB * 1024;
@@ -34,10 +34,9 @@
   private static final long MAX_KB = Math.round(Long.MAX_VALUE / KB);
 
   public static final ByteAmount ZERO = ByteAmount.fromBytes(0);
-  private final long bytes;
 
-  private ByteAmount(long bytes) {
-    this.bytes = bytes;
+  private ByteAmount(Long value) {
+    super(value);
   }
 
   /**
@@ -67,7 +66,8 @@ public static ByteAmount fromMegabytes(long megabytes) {
 
   /**
    * Creates a ByteAmount value in gigabytes. If the gigabytes value
-   * is &gt;= Long.MAX_VALUE / 1024 / 1024 / 1024, the byte representation is capped at Long.MAX_VALUE.
+   * is &gt;= Long.MAX_VALUE / 1024 / 1024 / 1024,
+   * the byte representation is capped at Long.MAX_VALUE.
    *
    * @param gigabytes value in gigabytes to represent
    * @return a ByteAmount object repressing the number of GBs passed
@@ -85,7 +85,7 @@ public static ByteAmount fromGigabytes(long gigabytes) {
    * @return number of bytes
    */
   public long asBytes() {
-    return bytes;
+    return super.getValue();
   }
 
   /**
@@ -97,7 +97,7 @@ public long asBytes() {
    * @return returns the ByteValue in MBs or 0 if the value is &lt; (1024 * 1024) / 2
    */
   public long asMegabytes() {
-    return Math.round((double) bytes / MB);
+    return Math.round(value.doubleValue() / MB);
   }
 
   /**
@@ -109,7 +109,7 @@ public long asMegabytes() {
    * @return returns the ByteValue in KBs or 0 if the value is &lt; (1024) / 2
    */
   public long asKilobytes() {
-    return Math.round((double) bytes / KB);
+    return Math.round(value.doubleValue() / KB);
   }
 
   /**
@@ -121,15 +121,7 @@ public long asKilobytes() {
    * @return returns the ByteValue in GBs or 0 if the value is &lt; (1024 * 1024 * 1024) / 2
    */
   public long asGigabytes() {
-    return Math.round((double) bytes / GB);
-  }
-
-  /**
-   * Convenience methdod to determine if byte value is zero
-   * @return true if the byte value is 0
-   */
-  public boolean isZero() {
-    return ZERO.equals(this);
+    return Math.round(value.doubleValue() / GB);
   }
 
   /**
@@ -138,10 +130,11 @@ public boolean isZero() {
    * @return a new ByteValue of this minus other ByteValue
    * @throws IllegalArgumentException if subtraction would overshoot Long.MIN_VALUE
    */
-  public ByteAmount minus(ByteAmount other) {
-    checkArgument(Long.MIN_VALUE + other.asBytes() <= asBytes(), String.format(
-        "Subtracting %s from %s would overshoot Long.MIN_LONG", other, this));
-    return ByteAmount.fromBytes(asBytes() - other.asBytes());
+  @Override
+  public ByteAmount minus(ResourceMeasure<Long> other) {
+    checkArgument(Long.MIN_VALUE + other.value <= value,
+        String.format("Subtracting %s from %s would overshoot Long.MIN_LONG", other, this));
+    return ByteAmount.fromBytes(value - other.value);
   }
 
   /**
@@ -150,10 +143,11 @@ public ByteAmount minus(ByteAmount other) {
    * @return a new ByteValue of this plus other ByteValue
    * @throws IllegalArgumentException if addition would exceed Long.MAX_VALUE
    */
-  public ByteAmount plus(ByteAmount other) {
-    checkArgument(Long.MAX_VALUE - asBytes() >= other.asBytes(), String.format(
-        "Adding %s to %s would exceed Long.MAX_LONG", other, this));
-    return ByteAmount.fromBytes(asBytes() + other.asBytes());
+  @Override
+  public ByteAmount plus(ResourceMeasure<Long> other) {
+    checkArgument(Long.MAX_VALUE - value >= other.value,
+        String.format("Adding %s to %s would exceed Long.MAX_LONG", other, this));
+    return ByteAmount.fromBytes(value + other.value);
   }
 
   /**
@@ -162,10 +156,11 @@ public ByteAmount plus(ByteAmount other) {
    * @return a new ByteValue of this ByteValue multiplied by factor
    * @throws IllegalArgumentException if multiplication would exceed Long.MAX_VALUE
    */
+  @Override
   public ByteAmount multiply(int factor) {
-    checkArgument(asBytes() <= Long.MAX_VALUE / factor, String.format(
-        "Multiplying %s by %d would exceed Long.MAX_LONG", this, factor));
-    return ByteAmount.fromBytes(asBytes() * factor);
+    checkArgument(value <= Long.MAX_VALUE / factor,
+        String.format("Multiplying %s by %d would exceed Long.MAX_LONG", this, factor));
+    return ByteAmount.fromBytes(value * factor);
   }
 
   /**
@@ -175,9 +170,10 @@ public ByteAmount multiply(int factor) {
    * @param factor value to divide by
    * @return a new ByteValue of this ByteValue divided by factor
    */
+  @Override
   public ByteAmount divide(int factor) {
     checkArgument(factor != 0, String.format("Can not divide %s by 0", this));
-    return ByteAmount.fromBytes(Math.round((double) this.asBytes() / (double) factor));
+    return ByteAmount.fromBytes(Math.round(value.doubleValue() / factor));
   }
 
   /**
@@ -187,30 +183,15 @@ public ByteAmount divide(int factor) {
    * @return a new ByteValue of this ByteValue increased by percentage
    * @throws IllegalArgumentException if increase would exceed Long.MAX_VALUE
    */
+  @Override
   public ByteAmount increaseBy(int percentage) {
-    checkArgument(percentage >= 0, String.format(
-        "Increasing by negative percent (%d) not supported", percentage));
+    checkArgument(percentage >= 0,
+        String.format("Increasing by negative percent (%d) not supported", percentage));
     double factor = 1.0 + ((double) percentage / 100);
     long max = Math.round(Long.MAX_VALUE / factor);
-    checkArgument(asBytes() <= max, String.format(
-        "Increasing %s by %d percent would exceed Long.MAX_LONG", this, percentage));
-    return ByteAmount.fromBytes(Math.round((double) asBytes() * factor));
-  }
-
-  public boolean greaterThan(ByteAmount other) {
-    return this.asBytes() > other.asBytes();
-  }
-
-  public boolean greaterOrEqual(ByteAmount other) {
-    return this.asBytes() >= other.asBytes();
-  }
-
-  public boolean lessThan(ByteAmount other) {
-    return this.asBytes() < other.asBytes();
-  }
-
-  public boolean lessOrEqual(ByteAmount other) {
-    return this.asBytes() <= other.asBytes();
+    checkArgument(value <= max,
+        String.format("Increasing %s by %d percent would exceed Long.MAX_LONG", this, percentage));
+    return ByteAmount.fromBytes(Math.round(value.doubleValue() * factor));
   }
 
   public ByteAmount max(ByteAmount other) {
@@ -221,42 +202,19 @@ public ByteAmount max(ByteAmount other) {
     }
   }
 
-  @Override
-  public int compareTo(ByteAmount other) {
-    return Long.compare(asBytes(), other.asBytes());
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (this == other) {
-      return true;
-    }
-    if (other == null || getClass() != other.getClass()) {
-      return false;
-    }
-
-    ByteAmount that = (ByteAmount) other;
-    return bytes == that.bytes;
-  }
-
-  @Override
-  public int hashCode() {
-    return (int) (bytes ^ (bytes >>> 32));
-  }
-
   @Override
   public String toString() {
-    String value;
+    String str;
     if (asGigabytes() > 0) {
-      value = String.format("%.1f GB (%d bytes)", (double) asBytes() / GB, asBytes());
+      str = String.format("%.1f GB (%d bytes)", value.doubleValue() / GB, value);
     } else if (asMegabytes() > 0) {
-      value = String.format("%.1f MB (%d bytes)", (double) asBytes() / MB, asBytes());
+      str = String.format("%.1f MB (%d bytes)", value.doubleValue() / MB, value);
     } else if (asKilobytes() > 0) {
-      value = String.format("%.1f KB (%d bytes)", (double) asBytes() / KB, asBytes());
+      str = String.format("%.1f KB (%d bytes)", value.doubleValue() / KB, value);
     } else {
-      value = bytes + " bytes";
+      str = value + " bytes";
     }
-    return String.format("ByteAmount{%s}", value);
+    return String.format("ByteAmount{%s}", str);
   }
 
   private void checkArgument(boolean condition, String errorMessage) {
diff --git a/heron/common/src/java/org/apache/heron/common/basics/CPUShare.java b/heron/common/src/java/org/apache/heron/common/basics/CPUShare.java
new file mode 100644
index 0000000000..b400c0a4a2
--- /dev/null
+++ b/heron/common/src/java/org/apache/heron/common/basics/CPUShare.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.common.basics;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public final class CPUShare extends ResourceMeasure<Double> {
+
+  private CPUShare(Double value) {
+    super(value);
+  }
+
+  public static CPUShare fromDouble(double value) {
+    return new CPUShare(value);
+  }
+
+  @Override
+  public CPUShare minus(ResourceMeasure<Double> other) {
+    return new CPUShare(value - other.value);
+  }
+
+  @Override
+  public CPUShare plus(ResourceMeasure<Double> other) {
+    return new CPUShare(value + other.value);
+  }
+
+  @Override
+  public CPUShare multiply(int factor) {
+    return new CPUShare(value * factor);
+  }
+
+  @Override
+  public CPUShare divide(int factor) {
+    return new CPUShare(value / factor);
+  }
+
+  @Override
+  public CPUShare increaseBy(int percentage) {
+    return new CPUShare(value * (1.0 + percentage / 100.0));
+  }
+
+  public static Map<String, CPUShare> convertDoubleMapToCpuShareMap(Map<String, Double> doubleMap) {
+    Map<String, CPUShare> retval = new HashMap<>();
+    for (Map.Entry<String, Double> entry : doubleMap.entrySet()) {
+      retval.put(entry.getKey(), new CPUShare(entry.getValue()));
+    }
+    return retval;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("CPUShare{%.3f}", value);
+  }
+}
diff --git a/heron/common/src/java/org/apache/heron/common/basics/ResourceMeasure.java b/heron/common/src/java/org/apache/heron/common/basics/ResourceMeasure.java
new file mode 100644
index 0000000000..d0c50e60b0
--- /dev/null
+++ b/heron/common/src/java/org/apache/heron/common/basics/ResourceMeasure.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.common.basics;
+
+public abstract class ResourceMeasure<V extends Number & Comparable>
+    implements Comparable<ResourceMeasure<V>> {
+
+  protected final V value;
+
+  protected ResourceMeasure(V value) {
+    if (value == null) {
+      throw new IllegalArgumentException();
+    }
+    this.value = value;
+  }
+
+  public V getValue() {
+    return value;
+  }
+
+  public boolean isZero() {
+    return value.doubleValue() == 0.0;
+  }
+
+  public abstract ResourceMeasure<V> minus(ResourceMeasure<V> other);
+
+  public abstract ResourceMeasure<V> plus(ResourceMeasure<V> other);
+
+  public abstract ResourceMeasure<V> multiply(int factor);
+
+  public abstract ResourceMeasure<V> divide(int factor);
+
+  public abstract ResourceMeasure<V> increaseBy(int percentage);
+
+  @SuppressWarnings("unchecked")
+  public boolean greaterThan(ResourceMeasure<V> other) {
+    return value.compareTo(other.value) > 0;
+  }
+
+  @SuppressWarnings("unchecked")
+  public boolean greaterOrEqual(ResourceMeasure<V> other) {
+    return value.compareTo(other.value) >= 0;
+  }
+
+  @SuppressWarnings("unchecked")
+  public boolean lessThan(ResourceMeasure<V> other) {
+    return value.compareTo(other.value) < 0;
+  }
+
+  @SuppressWarnings("unchecked")
+  public boolean lessOrEqual(ResourceMeasure<V> other) {
+    return value.compareTo(other.value) <= 0;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public int compareTo(ResourceMeasure<V> o) {
+    return value.compareTo(o.value);
+  }
+
+  @Override
+  public int hashCode() {
+    return value.hashCode();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public boolean equals(Object other) {
+    if (this == other) {
+      return true;
+    }
+
+    if (other == null || getClass() != other.getClass()) {
+      return false;
+    }
+
+    ResourceMeasure<V> that = (ResourceMeasure<V>) other;
+    return value.equals(that.value);
+  }
+}
diff --git a/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java b/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
index 0a80440de8..e4d2cf395f 100644
--- a/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
+++ b/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
@@ -32,6 +32,8 @@
 import org.apache.heron.api.generated.TopologyAPI;
 import org.apache.heron.api.utils.TopologyUtils;
 import org.apache.heron.common.basics.ByteAmount;
+import org.apache.heron.common.basics.CPUShare;
+import org.apache.heron.common.basics.ResourceMeasure;
 import org.apache.heron.spi.common.Config;
 import org.apache.heron.spi.common.Context;
 import org.apache.heron.spi.packing.IPacking;
@@ -84,17 +86,23 @@
 public class RoundRobinPacking implements IPacking, IRepacking {
   private static final Logger LOG = Logger.getLogger(RoundRobinPacking.class.getName());
 
-  // TODO(mfu): Read these values from Config
-  private static final ByteAmount DEFAULT_DISK_PADDING_PER_CONTAINER = ByteAmount.fromGigabytes(12);
-  private static final double DEFAULT_CPU_PADDING_PER_CONTAINER = 1;
-  private static final ByteAmount MIN_RAM_PER_INSTANCE = ByteAmount.fromMegabytes(192);
   @VisibleForTesting
   static final ByteAmount DEFAULT_RAM_PADDING_PER_CONTAINER = ByteAmount.fromGigabytes(2);
+  @VisibleForTesting
+  static final double DEFAULT_CPU_PADDING_PER_CONTAINER = 1.0;
+  private static final ByteAmount DEFAULT_DISK_PADDING_PER_CONTAINER = ByteAmount.fromGigabytes(12);
+
   @VisibleForTesting
   static final ByteAmount DEFAULT_DAEMON_PROCESS_RAM_PADDING = ByteAmount.fromGigabytes(1);
+  private static final ByteAmount MIN_RAM_PER_INSTANCE = ByteAmount.fromMegabytes(192);
 
   // Use as a stub as default number value when getting config value
-  private static final ByteAmount NOT_SPECIFIED_NUMBER_VALUE = ByteAmount.fromBytes(-1);
+  private static final ByteAmount NOT_SPECIFIED_BYTE_AMOUNT = ByteAmount.fromBytes(-1);
+  private static final double NOT_SPECIFIED_CPU_SHARE = -1.0;
+
+  private static final String RAM = "RAM";
+  private static final String CPU = "CPU";
+  private static final String DISK = "DISK";
 
   private TopologyAPI.Topology topology;
 
@@ -102,6 +110,7 @@
   private double instanceCpuDefault;
   private ByteAmount instanceDiskDefault;
   private ByteAmount containerRamPadding = DEFAULT_RAM_PADDING_PER_CONTAINER;
+  private double containerCpuPadding = DEFAULT_CPU_PADDING_PER_CONTAINER;
 
   @Override
   public void initialize(Config config, TopologyAPI.Topology inputTopology) {
@@ -133,16 +142,36 @@ private PackingPlan packInternal(int numContainer, Map<String, Integer> parallel
 
     // Get the RAM map for every instance
     Map<Integer, Map<InstanceId, ByteAmount>> instancesRamMap =
-        getInstancesRamMapInContainer(roundRobinAllocation);
+        calculateInstancesResourceMapInContainer(
+        roundRobinAllocation,
+        TopologyUtils.getComponentRamMapConfig(topology),
+        getContainerRamHint(roundRobinAllocation),
+        instanceRamDefault,
+        containerRamPadding,
+        ByteAmount.ZERO,
+        NOT_SPECIFIED_BYTE_AMOUNT,
+        RAM);
+
+    // Get the CPU map for every instance
+    Map<Integer, Map<InstanceId, CPUShare>> instancesCpuMap =
+        calculateInstancesResourceMapInContainer(
+        roundRobinAllocation,
+        CPUShare.convertDoubleMapToCpuShareMap(TopologyUtils.getComponentCpuMapConfig(topology)),
+        CPUShare.fromDouble(getContainerCpuHint(roundRobinAllocation)),
+        CPUShare.fromDouble(instanceCpuDefault),
+        CPUShare.fromDouble(containerCpuPadding),
+        CPUShare.fromDouble(0.0),
+        CPUShare.fromDouble(NOT_SPECIFIED_CPU_SHARE),
+        CPU);
 
     ByteAmount containerDiskInBytes = getContainerDiskHint(roundRobinAllocation);
-    double containerCpu = getContainerCpuHint(roundRobinAllocation);
+    double containerCpuHint = getContainerCpuHint(roundRobinAllocation);
     ByteAmount containerRamHint = getContainerRamHint(roundRobinAllocation);
 
-    LOG.info(String.format("Pack internal: container CPU hint: %f, RAM hint: %s, disk hint: %s.",
-        containerCpu,
-        containerDiskInBytes.toString(),
-        containerRamHint.toString()));
+    LOG.info(String.format("Pack internal: container CPU hint: %.3f, RAM hint: %s, disk hint: %s.",
+        containerCpuHint,
+        containerRamHint.toString(),
+        containerDiskInBytes.toString()));
 
     // Construct the PackingPlan
     Set<PackingPlan.ContainerPlan> containerPlans = new HashSet<>();
@@ -152,31 +181,39 @@ private PackingPlan packInternal(int numContainer, Map<String, Integer> parallel
       // Calculate the resource required for single instance
       Map<InstanceId, PackingPlan.InstancePlan> instancePlanMap = new HashMap<>();
       ByteAmount containerRam = containerRamPadding;
+      double containerCpu = containerCpuPadding;
+
       for (InstanceId instanceId : instanceList) {
         ByteAmount instanceRam = instancesRamMap.get(containerId).get(instanceId);
+        Double instanceCpu = instancesCpuMap.get(containerId).get(instanceId).getValue();
 
-        // Currently not yet support disk or CPU config for different components,
-        // so just use the default value.
+        // Currently not yet support disk config for different components, just use the default.
         ByteAmount instanceDisk = instanceDiskDefault;
-        double instanceCpu = instanceCpuDefault;
 
         Resource resource = new Resource(instanceCpu, instanceRam, instanceDisk);
 
         // Insert it into the map
         instancePlanMap.put(instanceId, new PackingPlan.InstancePlan(instanceId, resource));
         containerRam = containerRam.plus(instanceRam);
+        containerCpu += instanceCpu;
       }
 
-      Resource resource = new Resource(containerCpu, containerRam, containerDiskInBytes);
+      Resource resource = new Resource(Math.max(containerCpu, containerCpuHint),
+          containerRam, containerDiskInBytes);
       PackingPlan.ContainerPlan containerPlan = new PackingPlan.ContainerPlan(
           containerId, new HashSet<>(instancePlanMap.values()), resource);
 
       containerPlans.add(containerPlan);
+
+      LOG.info(String.format("Pack internal finalized: container#%d CPU: %f, RAM: %s, disk: %s.",
+          containerId,
+          resource.getCpu(),
+          resource.getRam().toString(),
+          resource.getDisk().toString()));
     }
 
     PackingPlan plan = new PackingPlan(topology.getId(), containerPlans);
 
-    // Check whether it is a valid PackingPlan
     validatePackingPlan(plan);
     return plan;
   }
@@ -211,69 +248,75 @@ private ByteAmount getContainerRamPadding(List<TopologyAPI.Config.KeyValue> topo
         daemonProcessPadding);
   }
 
-  /**
-   * Calculate the RAM required by any instance in the container
-   *
-   * @param allocation the allocation of instances in different container
-   * @return A map: (containerId -&gt; (instanceId -&gt; instanceRequiredRam))
-   */
-  private Map<Integer, Map<InstanceId, ByteAmount>> getInstancesRamMapInContainer(
-      Map<Integer, List<InstanceId>> allocation) {
-    Map<String, ByteAmount> ramMap = TopologyUtils.getComponentRamMapConfig(topology);
-
-    Map<Integer, Map<InstanceId, ByteAmount>> instancesRamMapInContainer = new HashMap<>();
-    ByteAmount containerRamHint = getContainerRamHint(allocation);
+  @SuppressWarnings("unchecked")
+  private <T extends ResourceMeasure> Map<Integer, Map<InstanceId, T>>
+            calculateInstancesResourceMapInContainer(
+                Map<Integer, List<InstanceId>> allocation,
+                Map<String, T> resMap,
+                T containerResHint,
+                T instanceResDefault,
+                T containerResPadding,
+                T zero,
+                T notSpecified,
+                String resourceType) {
+    Map<Integer, Map<InstanceId, T>> instancesResMapInContainer = new HashMap<>();
 
     for (int containerId : allocation.keySet()) {
       List<InstanceId> instanceIds = allocation.get(containerId);
-      Map<InstanceId, ByteAmount> ramInsideContainer = new HashMap<>();
-      instancesRamMapInContainer.put(containerId, ramInsideContainer);
-      List<InstanceId> instancesToBeAccounted = new ArrayList<>();
+      Map<InstanceId, T> resInsideContainer = new HashMap<>();
+      instancesResMapInContainer.put(containerId, resInsideContainer);
+      List<InstanceId> unspecifiedInstances = new ArrayList<>();
 
-      // Calculate the actual value
-      ByteAmount usedRam = ByteAmount.ZERO;
+      // Register the instance resource allocation and calculate the used resource so far
+      T usedRes = zero;
       for (InstanceId instanceId : instanceIds) {
         String componentName = instanceId.getComponentName();
-        if (ramMap.containsKey(componentName)) {
-          ByteAmount ram = ramMap.get(componentName);
-          ramInsideContainer.put(instanceId, ram);
-          usedRam = usedRam.plus(ram);
+        if (resMap.containsKey(componentName)) {
+          T res = resMap.get(componentName);
+          resInsideContainer.put(instanceId, res);
+          usedRes = (T) usedRes.plus(res);
         } else {
-          instancesToBeAccounted.add(instanceId);
+          unspecifiedInstances.add(instanceId);
         }
       }
 
-      // Now we have calculated RAM for instances specified in ComponentRamMap
-      // Then to calculate RAM for the rest instances
-      int instancesToAllocate = instancesToBeAccounted.size();
+      // Validate instance resources specified so far don't violate container-level constraint
+      if (!containerResHint.equals(notSpecified)
+          && usedRes.greaterThan(containerResHint.minus(containerResPadding))) {
+        throw new PackingException(String.format("Invalid packing plan generated. "
+                + "Total instance %s in a container (%s) have exceeded "
+                + "the container-level constraint of %s.",
+            resourceType, usedRes.toString(), containerResHint));
+      }
 
-      if (instancesToAllocate != 0) {
-        ByteAmount individualInstanceRam = instanceRamDefault;
+      // calculate resource for the remaining unspecified instances if any
+      if (!unspecifiedInstances.isEmpty()) {
+        T individualInstanceRes = instanceResDefault;
 
-        // The RAM map is partially set. We need to calculate RAM for the rest
+        // If container resource is specified
+        if (!containerResHint.equals(notSpecified)) {
+          // discount resource for heron internal process (padding) and used (usedRes)
+          T remainingRes = (T) containerResHint.minus(containerResPadding).minus(usedRes);
 
-        // We have different strategy depending on whether container RAM is specified
-        // If container RAM is specified
-        if (!containerRamHint.equals(NOT_SPECIFIED_NUMBER_VALUE)) {
-          // remove RAM for heron internal process
-          ByteAmount remainingRam =
-              containerRamHint.minus(containerRamPadding).minus(usedRam);
+          if (remainingRes.lessOrEqual(zero)) {
+            throw new PackingException(String.format("Invalid packing plan generated. "
+                + "No enough %s to allocate for unspecified instances", resourceType));
+          }
 
-          // Split remaining RAM evenly
-          individualInstanceRam = remainingRam.divide(instancesToAllocate);
+          // Split remaining resource evenly
+          individualInstanceRes = (T) remainingRes.divide(unspecifiedInstances.size());
         }
 
-        // Put the results in instancesRam
-        for (InstanceId instanceId : instancesToBeAccounted) {
-          ramInsideContainer.put(instanceId, individualInstanceRam);
+        // Put the results in resInsideContainer
+        for (InstanceId instanceId : unspecifiedInstances) {
+          resInsideContainer.put(instanceId, individualInstanceRes);
         }
       }
     }
 
-    return instancesRamMapInContainer;
+    return instancesResMapInContainer;
   }
 
-
   /**
    * Get the instances' allocation basing on round robin algorithm
    *
@@ -288,7 +331,7 @@ private ByteAmount getContainerRamPadding(List<TopologyAPI.Config.KeyValue> topo
     }
 
     for (int i = 1; i <= numContainer; ++i) {
-      allocation.put(i, new ArrayList<InstanceId>());
+      allocation.put(i, new ArrayList<>());
     }
 
     int index = 1;
@@ -367,7 +410,7 @@ private ByteAmount getContainerRamHint(Map<Integer, List<InstanceId>> allocation
 
     return TopologyUtils.getConfigWithDefault(
         topologyConfig, org.apache.heron.api.Config.TOPOLOGY_CONTAINER_RAM_REQUESTED,
-        NOT_SPECIFIED_NUMBER_VALUE);
+        NOT_SPECIFIED_BYTE_AMOUNT);
   }
 
   /**
diff --git a/heron/packing/tests/java/org/apache/heron/packing/AssertPacking.java b/heron/packing/tests/java/org/apache/heron/packing/AssertPacking.java
index dc54ad6680..356ff1a7e7 100644
--- a/heron/packing/tests/java/org/apache/heron/packing/AssertPacking.java
+++ b/heron/packing/tests/java/org/apache/heron/packing/AssertPacking.java
@@ -42,6 +42,8 @@
  */
 public final class AssertPacking {
 
+  private static final double DELTA = 0.1;
+
   private AssertPacking() { }
 
   /**
@@ -55,18 +57,17 @@ public static void assertContainers(Set<PackingPlan.ContainerPlan> containerPlan
                                       ByteAmount notExpectedContainerRam) {
     boolean boltFound = false;
     boolean spoutFound = false;
-    List<Integer> expectedInstanceIndecies = new ArrayList<>();
-    List<Integer> foundInstanceIndecies = new ArrayList<>();
+    List<Integer> expectedInstanceIndices = new ArrayList<>();
+    List<Integer> foundInstanceIndices = new ArrayList<>();
     int expectedInstanceIndex = 1;
     // RAM for bolt should be the value in component RAM map
     for (PackingPlan.ContainerPlan containerPlan : containerPlans) {
       if (notExpectedContainerRam != null) {
-        assertNotEquals(
-            notExpectedContainerRam, containerPlan.getRequiredResource().getRam());
+        assertNotEquals(notExpectedContainerRam, containerPlan.getRequiredResource().getRam());
       }
       for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
-        expectedInstanceIndecies.add(expectedInstanceIndex++);
-        foundInstanceIndecies.add(instancePlan.getTaskId());
+        expectedInstanceIndices.add(expectedInstanceIndex++);
+        foundInstanceIndices.add(instancePlan.getTaskId());
         if (instancePlan.getComponentName().equals(boltName)) {
           assertEquals("Unexpected bolt RAM", expectedBoltRam, instancePlan.getResource().getRam());
           boltFound = true;
@@ -81,9 +82,52 @@ public static void assertContainers(Set<PackingPlan.ContainerPlan> containerPlan
     assertTrue("Bolt not found in any of the container plans: " + boltName, boltFound);
     assertTrue("Spout not found in any of the container plans: " + spoutName, spoutFound);
 
-    Collections.sort(foundInstanceIndecies);
+    Collections.sort(foundInstanceIndices);
+    assertEquals("Unexpected instance global id set found.",
+        expectedInstanceIndices, foundInstanceIndices);
+  }
+
+  /**
+   * Verifies that the containerPlan has at least one bolt named boltName with CPU equal to
+   * expectedBoltCpu and likewise for spouts. If notExpectedContainerCpu is not null, verifies that
+   * the container CPU is not that.
+   */
+  public static void assertContainers(Set<PackingPlan.ContainerPlan> containerPlans,
+                                      String boltName, String spoutName,
+                                      Double expectedBoltCpu, Double expectedSpoutCpu,
+                                      Double notExpectedContainerCpu) {
+    boolean boltFound = false;
+    boolean spoutFound = false;
+    List<Integer> expectedInstanceIndices = new ArrayList<>();
+    List<Integer> foundInstanceIndices = new ArrayList<>();
+    int expectedInstanceIndex = 1;
+    // CPU for bolt should be the value in component CPU map
+    for (PackingPlan.ContainerPlan containerPlan : containerPlans) {
+      if (notExpectedContainerCpu != null) {
+        assertNotEquals(notExpectedContainerCpu, containerPlan.getRequiredResource().getCpu());
+      }
+      for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
+        expectedInstanceIndices.add(expectedInstanceIndex++);
+        foundInstanceIndices.add(instancePlan.getTaskId());
+        if (instancePlan.getComponentName().equals(boltName)) {
+          assertEquals("Unexpected bolt CPU",
+              expectedBoltCpu.doubleValue(), instancePlan.getResource().getCpu(), DELTA);
+          boltFound = true;
+        }
+        if (instancePlan.getComponentName().equals(spoutName)) {
+          assertEquals(
+              "Unexpected spout CPU",
+              expectedSpoutCpu.doubleValue(), instancePlan.getResource().getCpu(), DELTA);
+          spoutFound = true;
+        }
+      }
+    }
+    assertTrue("Bolt not found in any of the container plans: " + boltName, boltFound);
+    assertTrue("Spout not found in any of the container plans: " + spoutName, spoutFound);
+
+    Collections.sort(foundInstanceIndices);
     assertEquals("Unexpected instance global id set found.",
-        expectedInstanceIndecies, foundInstanceIndecies);
+        expectedInstanceIndices, foundInstanceIndices);
   }
 
   /**
diff --git a/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java b/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
index 742c6a4987..826feaa06b 100644
--- a/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
+++ b/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
@@ -69,7 +69,7 @@ private PackingPlan getRoundRobinRePackingPlan(
   }
 
   @Test(expected = PackingException.class)
-  public void testCheckFailure() throws Exception {
+  public void testCheckInsufficientRamFailure() throws Exception {
     int numContainers = 2;
     int spoutParallelism = 4;
     int boltParallelism = 3;
@@ -87,6 +87,25 @@ public void testCheckFailure() throws Exception {
     getRoundRobinPackingPlan(topology);
   }
 
+  @Test(expected = PackingException.class)
+  public void testCheckInsufficientCpuFailure() throws Exception {
+    int numContainers = 2;
+    int spoutParallelism = 4;
+    int boltParallelism = 3;
+
+    // Set up the topology and its config
+    org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
+    topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
+
+    // Explicit set insufficient CPU for container
+    double containerCpu = 1.0;
+
+    topologyConfig.setContainerCpuRequested(containerCpu);
+
+    TopologyAPI.Topology topology =  getTopology(spoutParallelism, boltParallelism, topologyConfig);
+    getRoundRobinPackingPlan(topology);
+  }
+
   @Test
   public void testDefaultResources() throws Exception {
     int numContainers = 2;
@@ -160,6 +179,10 @@ public void testContainerRequestedResources() throws Exception {
       Assert.assertEquals(containerRam
           .minus(RoundRobinPacking.DEFAULT_RAM_PADDING_PER_CONTAINER).divide(instancesCount),
           resources.iterator().next().getRam());
+
+      Assert.assertEquals(
+          (containerCpu - RoundRobinPacking.DEFAULT_CPU_PADDING_PER_CONTAINER) / instancesCount,
+          resources.iterator().next().getCpu(), DELTA);
     }
   }
 
@@ -216,11 +239,15 @@ public void testContainerRequestedResourcesWhenRamPaddingSet() throws Exception
       Assert.assertEquals(containerRam
               .minus(containerRamPadding).divide(instancesCount),
           resources.iterator().next().getRam());
+
+      Assert.assertEquals(
+          (containerCpu - RoundRobinPacking.DEFAULT_CPU_PADDING_PER_CONTAINER) / instancesCount,
+          resources.iterator().next().getCpu(), DELTA);
     }
   }
 
   /**
-   * Test the scenario RAM map config is partially set
+   * Test the scenario RAM map config is completely set
    */
   @Test
   public void testCompleteRamMapRequested() throws Exception {
@@ -255,6 +282,135 @@ public void testCompleteRamMapRequested() throws Exception {
     Assert.assertEquals(totalInstances, packingPlanExplicitRamMap.getInstanceCount());
   }
 
+  /**
+   * Test the scenario CPU map config is completely set and there are exactly enough resource
+   */
+  @Test
+  public void testCompleteCpuMapRequestedWithExactlyEnoughResource() throws Exception {
+    int numContainers = 2;
+    int spoutParallelism = 4;
+    int boltParallelism = 3;
+    Integer totalInstances = spoutParallelism + boltParallelism;
+
+    // Set up the topology and its config
+    org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
+    topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
+
+    // Explicit set resources for container
+    double containerCpu = 17;
+
+    // Explicit set component CPU map
+    double boltCpu = 4;
+    double spoutCpu = 4;
+
+    topologyConfig.setContainerCpuRequested(containerCpu);
+    topologyConfig.setComponentCpu(BOLT_NAME, boltCpu);
+    topologyConfig.setComponentCpu(SPOUT_NAME, spoutCpu);
+
+    TopologyAPI.Topology topologyExplicitCpuMap =
+        getTopology(spoutParallelism, boltParallelism, topologyConfig);
+    PackingPlan packingPlanExplicitCpuMap =
+        getRoundRobinPackingPlan(topologyExplicitCpuMap);
+
+    AssertPacking.assertContainers(packingPlanExplicitCpuMap.getContainers(),
+        BOLT_NAME, SPOUT_NAME, boltCpu, spoutCpu, null);
+    Assert.assertEquals(totalInstances, packingPlanExplicitCpuMap.getInstanceCount());
+  }
+
+  /**
+   * Test the scenario CPU map config is completely set and there are more than enough resource
+   */
+  @Test
+  public void testCompleteCpuMapRequestedWithMoreThanEnoughResource() throws Exception {
+    int numContainers = 2;
+    int spoutParallelism = 4;
+    int boltParallelism = 3;
+    Integer totalInstances = spoutParallelism + boltParallelism;
+
+    // Set up the topology and its config
+    org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
+    topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
+
+    // Explicit set resources for container
+    double containerCpu = 30;
+
+    // Explicit set component CPU map
+    double boltCpu = 4;
+    double spoutCpu = 4;
+
+    topologyConfig.setContainerCpuRequested(containerCpu);
+    topologyConfig.setComponentCpu(BOLT_NAME, boltCpu);
+    topologyConfig.setComponentCpu(SPOUT_NAME, spoutCpu);
+
+    TopologyAPI.Topology topologyExplicitCpuMap =
+        getTopology(spoutParallelism, boltParallelism, topologyConfig);
+    PackingPlan packingPlanExplicitCpuMap =
+        getRoundRobinPackingPlan(topologyExplicitCpuMap);
+
+    AssertPacking.assertContainers(packingPlanExplicitCpuMap.getContainers(),
+        BOLT_NAME, SPOUT_NAME, boltCpu, spoutCpu, null);
+    Assert.assertEquals(totalInstances, packingPlanExplicitCpuMap.getInstanceCount());
+  }
+
+  /**
+   * Test the scenario CPU map config is completely set and there are less than enough resource
+   */
+  @Test(expected = PackingException.class)
+  public void testCompleteCpuMapRequestedWithLessThanEnoughResource() throws Exception {
+    int numContainers = 2;
+    int spoutParallelism = 4;
+    int boltParallelism = 3;
+
+    // Set up the topology and its config
+    org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
+    topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
+
+    // Explicit set resources for container
+    double containerCpu = 10;
+
+    // Explicit set component CPU map
+    double boltCpu = 4;
+    double spoutCpu = 4;
+
+    topologyConfig.setContainerCpuRequested(containerCpu);
+    topologyConfig.setComponentCpu(BOLT_NAME, boltCpu);
+    topologyConfig.setComponentCpu(SPOUT_NAME, spoutCpu);
+
+    TopologyAPI.Topology topologyExplicitCpuMap =
+        getTopology(spoutParallelism, boltParallelism, topologyConfig);
+    getRoundRobinPackingPlan(topologyExplicitCpuMap);
+  }
+
+  /**
+   * Test the scenario CPU map config is completely set
+   * and there are exactly enough resource for instances, but not enough for padding
+   */
+  @Test(expected = PackingException.class)
+  public void testCompleteCpuMapRequestedWithoutPaddingResource() throws Exception {
+    int numContainers = 2;
+    int spoutParallelism = 4;
+    int boltParallelism = 3;
+
+    // Set up the topology and its config
+    org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
+    topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
+
+    // Explicit set resources for container
+    double containerCpu = 16;
+
+    // Explicit set component CPU map
+    double boltCpu = 4;
+    double spoutCpu = 4;
+
+    topologyConfig.setContainerCpuRequested(containerCpu);
+    topologyConfig.setComponentCpu(BOLT_NAME, boltCpu);
+    topologyConfig.setComponentCpu(SPOUT_NAME, spoutCpu);
+
+    TopologyAPI.Topology topologyExplicitCpuMap =
+        getTopology(spoutParallelism, boltParallelism, topologyConfig);
+    getRoundRobinPackingPlan(topologyExplicitCpuMap);
+  }
+
   /**
    * Test the scenario RAM map config is partially set
    */
@@ -285,8 +441,7 @@ public void testPartialRamMap() throws Exception {
     Assert.assertEquals(totalInstances, packingPlanExplicitRamMap.getInstanceCount());
 
     // RAM for bolt should be the value in component RAM map
-    for (PackingPlan.ContainerPlan containerPlan
-        : packingPlanExplicitRamMap.getContainers()) {
+    for (PackingPlan.ContainerPlan containerPlan : packingPlanExplicitRamMap.getContainers()) {
       Assert.assertEquals(containerRam, containerPlan.getRequiredResource().getRam());
       int boltCount = 0;
       int instancesCount = containerPlan.getInstances().size();
@@ -313,6 +468,63 @@ public void testPartialRamMap() throws Exception {
     }
   }
 
+  /**
+   * Test the scenario CPU map config is partially set
+   */
+  @Test
+  public void testPartialCpuMap() throws Exception {
+    int numContainers = 2;
+    int spoutParallelism = 4;
+    int boltParallelism = 3;
+    Integer totalInstances = spoutParallelism + boltParallelism;
+
+    // Set up the topology and its config
+    org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
+    topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
+
+    // Explicit set resources for container
+    double containerCpu = 17;
+
+    // Explicit set component CPU map
+    double boltCpu = 4;
+
+    topologyConfig.setContainerCpuRequested(containerCpu);
+    topologyConfig.setComponentCpu(BOLT_NAME, boltCpu);
+
+    TopologyAPI.Topology topologyExplicitCpuMap =
+        getTopology(spoutParallelism, boltParallelism, topologyConfig);
+    PackingPlan packingPlanExplicitCpuMap =
+        getRoundRobinPackingPlan(topologyExplicitCpuMap);
+    Assert.assertEquals(totalInstances, packingPlanExplicitCpuMap.getInstanceCount());
+
+    // CPU for bolt should be the value in component CPU map
+    for (PackingPlan.ContainerPlan containerPlan : packingPlanExplicitCpuMap.getContainers()) {
+      Assert.assertEquals(containerCpu, containerPlan.getRequiredResource().getCpu(), DELTA);
+      int boltCount = 0;
+      int instancesCount = containerPlan.getInstances().size();
+      for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
+        if (instancePlan.getComponentName().equals(BOLT_NAME)) {
+          Assert.assertEquals(boltCpu, instancePlan.getResource().getCpu(), DELTA);
+          boltCount++;
+        }
+      }
+
+      // CPU for spout should be:
+      // (containerCpu - all CPU for bolt - CPU for padding) / (# of spouts)
+      int spoutCount = instancesCount - boltCount;
+      for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
+        if (instancePlan.getComponentName().equals(SPOUT_NAME)) {
+          Assert.assertEquals(
+              (containerCpu
+                  - boltCpu * boltCount
+                  - RoundRobinPacking.DEFAULT_CPU_PADDING_PER_CONTAINER)
+                  / spoutCount,
+              instancePlan.getResource().getCpu(), DELTA);
+        }
+      }
+    }
+  }
+
   /**
    * test even packing of instances
    */


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services