You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by hu...@apache.org on 2019/01/10 22:38:12 UTC
[incubator-heron] branch master updated: Validate resource
constraint (RAM and CPU) in RoundRobinPacking (#3142)
This is an automated email from the ASF dual-hosted git repository.
huijun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new a1b761f Validate resource constraint (RAM and CPU) in RoundRobinPacking (#3142)
a1b761f is described below
commit a1b761fea282c0d9bd45fe0b6a0f66296f1580ea
Author: Xiaoyao Qian <qi...@illinois.edu>
AuthorDate: Thu Jan 10 14:38:07 2019 -0800
Validate resource constraint (RAM and CPU) in RoundRobinPacking (#3142)
* init
* general resource constraint validation
* pass existing unit tests
* add more tests
* rename
* rename
* generic ResourceMeasure
* fixed wc example
* even more general generics
* address comments
* address comments by putting more tests
* set safe amount of cpu
* meaningful constants in ExampleResource
---
.../heron/examples/api/ExampleResources.java | 10 +-
.../heron/examples/api/WordCountTopology.java | 6 +-
.../org/apache/heron/common/basics/ByteAmount.java | 116 ++++-------
.../org/apache/heron/common/basics/CPUShare.java | 72 +++++++
.../heron/common/basics/ResourceMeasure.java | 97 +++++++++
.../packing/roundrobin/RoundRobinPacking.java | 161 +++++++++------
.../org/apache/heron/packing/AssertPacking.java | 60 +++++-
.../packing/roundrobin/RoundRobinPackingTest.java | 220 ++++++++++++++++++++-
8 files changed, 590 insertions(+), 152 deletions(-)
diff --git a/examples/src/java/org/apache/heron/examples/api/ExampleResources.java b/examples/src/java/org/apache/heron/examples/api/ExampleResources.java
index be92dea..6c08d49 100644
--- a/examples/src/java/org/apache/heron/examples/api/ExampleResources.java
+++ b/examples/src/java/org/apache/heron/examples/api/ExampleResources.java
@@ -23,7 +23,9 @@ import org.apache.heron.common.basics.ByteAmount;
public final class ExampleResources {
+ static final long DEFAULT_RAM_PADDING_PER_CONTAINER = 2 * 1024;
static final long COMPONENT_RAM_MB = 1024;
+ static final double DEFAULT_CPU_PADDING_PER_CONTAINER = 1.0;
static ByteAmount getComponentRam() {
return ByteAmount.fromMegabytes(COMPONENT_RAM_MB);
@@ -35,7 +37,13 @@ public final class ExampleResources {
static ByteAmount getContainerRam(int components, int containers) {
final int componentsPerContainer = Math.max(components / containers, 1);
- return ByteAmount.fromMegabytes(COMPONENT_RAM_MB * componentsPerContainer);
+ return ByteAmount.fromMegabytes(COMPONENT_RAM_MB * componentsPerContainer
+ + DEFAULT_RAM_PADDING_PER_CONTAINER);
+ }
+
+ static double getContainerCpu(int components, int containers) {
+ final int componentsPerContainer = Math.max(components / containers, 1);
+ return componentsPerContainer + DEFAULT_CPU_PADDING_PER_CONTAINER;
}
private ExampleResources() {
diff --git a/examples/src/java/org/apache/heron/examples/api/WordCountTopology.java b/examples/src/java/org/apache/heron/examples/api/WordCountTopology.java
index 5991cfd..3b1feb4 100644
--- a/examples/src/java/org/apache/heron/examples/api/WordCountTopology.java
+++ b/examples/src/java/org/apache/heron/examples/api/WordCountTopology.java
@@ -188,12 +188,16 @@ public final class WordCountTopology {
conf.setComponentRam("consumer",
ByteAmount.fromMegabytes(ExampleResources.COMPONENT_RAM_MB));
+ conf.setComponentCpu("word", 1.0);
+ conf.setComponentCpu("consumer", 1.0);
+
// configure container resources
conf.setContainerDiskRequested(
ExampleResources.getContainerDisk(2 * parallelism, parallelism));
conf.setContainerRamRequested(
ExampleResources.getContainerRam(2 * parallelism, parallelism));
- conf.setContainerCpuRequested(2);
+ conf.setContainerCpuRequested(
+ ExampleResources.getContainerCpu(2 * parallelism, parallelism));
HeronSubmitter.submitTopology(args[0], conf, builder.createTopology());
}
diff --git a/heron/common/src/java/org/apache/heron/common/basics/ByteAmount.java b/heron/common/src/java/org/apache/heron/common/basics/ByteAmount.java
index 11c5745..5b576af 100644
--- a/heron/common/src/java/org/apache/heron/common/basics/ByteAmount.java
+++ b/heron/common/src/java/org/apache/heron/common/basics/ByteAmount.java
@@ -22,7 +22,7 @@ package org.apache.heron.common.basics;
/**
* Class that encapsulates number of bytes, with helpers to handle units properly.
*/
-public final class ByteAmount implements Comparable<ByteAmount> {
+public final class ByteAmount extends ResourceMeasure<Long> {
private static final long KB = 1024L;
private static final long MB = KB * 1024;
private static final long GB = MB * 1024;
@@ -34,10 +34,9 @@ public final class ByteAmount implements Comparable<ByteAmount> {
private static final long MAX_KB = Math.round(Long.MAX_VALUE / KB);
public static final ByteAmount ZERO = ByteAmount.fromBytes(0);
- private final long bytes;
- private ByteAmount(long bytes) {
- this.bytes = bytes;
+ private ByteAmount(Long value) {
+ super(value);
}
/**
@@ -67,7 +66,8 @@ public final class ByteAmount implements Comparable<ByteAmount> {
/**
* Creates a ByteAmount value in gigabytes. If the gigabytes value
- * is >= Long.MAX_VALUE / 1024 / 1024 / 1024, the byte representation is capped at Long.MAX_VALUE.
+ * is >= Long.MAX_VALUE / 1024 / 1024 / 1024,
+ * the byte representation is capped at Long.MAX_VALUE.
*
* @param gigabytes value in gigabytes to represent
* @return a ByteAmount object repressing the number of GBs passed
@@ -85,7 +85,7 @@ public final class ByteAmount implements Comparable<ByteAmount> {
* @return number of bytes
*/
public long asBytes() {
- return bytes;
+ return super.getValue();
}
/**
@@ -97,7 +97,7 @@ public final class ByteAmount implements Comparable<ByteAmount> {
* @return returns the ByteValue in MBs or 0 if the value is < (1024 * 1024) / 2
*/
public long asMegabytes() {
- return Math.round((double) bytes / MB);
+ return Math.round(value.doubleValue() / MB);
}
/**
@@ -109,7 +109,7 @@ public final class ByteAmount implements Comparable<ByteAmount> {
* @return returns the ByteValue in KBs or 0 if the value is < (1024) / 2
*/
public long asKilobytes() {
- return Math.round((double) bytes / KB);
+ return Math.round(value.doubleValue() / KB);
}
/**
@@ -121,15 +121,7 @@ public final class ByteAmount implements Comparable<ByteAmount> {
* @return returns the ByteValue in GBs or 0 if the value is < (1024 * 1024 * 1024) / 2
*/
public long asGigabytes() {
- return Math.round((double) bytes / GB);
- }
-
- /**
- * Convenience methdod to determine if byte value is zero
- * @return true if the byte value is 0
- */
- public boolean isZero() {
- return ZERO.equals(this);
+ return Math.round(value.doubleValue() / GB);
}
/**
@@ -138,10 +130,11 @@ public final class ByteAmount implements Comparable<ByteAmount> {
* @return a new ByteValue of this minus other ByteValue
* @throws IllegalArgumentException if subtraction would overshoot Long.MIN_VALUE
*/
- public ByteAmount minus(ByteAmount other) {
- checkArgument(Long.MIN_VALUE + other.asBytes() <= asBytes(), String.format(
- "Subtracting %s from %s would overshoot Long.MIN_LONG", other, this));
- return ByteAmount.fromBytes(asBytes() - other.asBytes());
+ @Override
+ public ByteAmount minus(ResourceMeasure<Long> other) {
+ checkArgument(Long.MIN_VALUE + other.value <= value,
+ String.format("Subtracting %s from %s would overshoot Long.MIN_LONG", other, this));
+ return ByteAmount.fromBytes(value - other.value);
}
/**
@@ -150,10 +143,11 @@ public final class ByteAmount implements Comparable<ByteAmount> {
* @return a new ByteValue of this plus other ByteValue
* @throws IllegalArgumentException if addition would exceed Long.MAX_VALUE
*/
- public ByteAmount plus(ByteAmount other) {
- checkArgument(Long.MAX_VALUE - asBytes() >= other.asBytes(), String.format(
- "Adding %s to %s would exceed Long.MAX_LONG", other, this));
- return ByteAmount.fromBytes(asBytes() + other.asBytes());
+ @Override
+ public ByteAmount plus(ResourceMeasure<Long> other) {
+ checkArgument(Long.MAX_VALUE - value >= other.value,
+ String.format("Adding %s to %s would exceed Long.MAX_LONG", other, this));
+ return ByteAmount.fromBytes(value + other.value);
}
/**
@@ -162,10 +156,11 @@ public final class ByteAmount implements Comparable<ByteAmount> {
* @return a new ByteValue of this ByteValue multiplied by factor
* @throws IllegalArgumentException if multiplication would exceed Long.MAX_VALUE
*/
+ @Override
public ByteAmount multiply(int factor) {
- checkArgument(asBytes() <= Long.MAX_VALUE / factor, String.format(
- "Multiplying %s by %d would exceed Long.MAX_LONG", this, factor));
- return ByteAmount.fromBytes(asBytes() * factor);
+ checkArgument(value <= Long.MAX_VALUE / factor,
+ String.format("Multiplying %s by %d would exceed Long.MAX_LONG", this, factor));
+ return ByteAmount.fromBytes(value * factor);
}
/**
@@ -175,9 +170,10 @@ public final class ByteAmount implements Comparable<ByteAmount> {
* @param factor value to divide by
* @return a new ByteValue of this ByteValue divided by factor
*/
+ @Override
public ByteAmount divide(int factor) {
checkArgument(factor != 0, String.format("Can not divide %s by 0", this));
- return ByteAmount.fromBytes(Math.round((double) this.asBytes() / (double) factor));
+ return ByteAmount.fromBytes(Math.round(value.doubleValue() / factor));
}
/**
@@ -187,30 +183,15 @@ public final class ByteAmount implements Comparable<ByteAmount> {
* @return a new ByteValue of this ByteValue increased by percentage
* @throws IllegalArgumentException if increase would exceed Long.MAX_VALUE
*/
+ @Override
public ByteAmount increaseBy(int percentage) {
- checkArgument(percentage >= 0, String.format(
- "Increasing by negative percent (%d) not supported", percentage));
+ checkArgument(percentage >= 0,
+ String.format("Increasing by negative percent (%d) not supported", percentage));
double factor = 1.0 + ((double) percentage / 100);
long max = Math.round(Long.MAX_VALUE / factor);
- checkArgument(asBytes() <= max, String.format(
- "Increasing %s by %d percent would exceed Long.MAX_LONG", this, percentage));
- return ByteAmount.fromBytes(Math.round((double) asBytes() * factor));
- }
-
- public boolean greaterThan(ByteAmount other) {
- return this.asBytes() > other.asBytes();
- }
-
- public boolean greaterOrEqual(ByteAmount other) {
- return this.asBytes() >= other.asBytes();
- }
-
- public boolean lessThan(ByteAmount other) {
- return this.asBytes() < other.asBytes();
- }
-
- public boolean lessOrEqual(ByteAmount other) {
- return this.asBytes() <= other.asBytes();
+ checkArgument(value <= max,
+ String.format("Increasing %s by %d percent would exceed Long.MAX_LONG", this, percentage));
+ return ByteAmount.fromBytes(Math.round(value.doubleValue() * factor));
}
public ByteAmount max(ByteAmount other) {
@@ -222,41 +203,18 @@ public final class ByteAmount implements Comparable<ByteAmount> {
}
@Override
- public int compareTo(ByteAmount other) {
- return Long.compare(asBytes(), other.asBytes());
- }
-
- @Override
- public boolean equals(Object other) {
- if (this == other) {
- return true;
- }
- if (other == null || getClass() != other.getClass()) {
- return false;
- }
-
- ByteAmount that = (ByteAmount) other;
- return bytes == that.bytes;
- }
-
- @Override
- public int hashCode() {
- return (int) (bytes ^ (bytes >>> 32));
- }
-
- @Override
public String toString() {
- String value;
+ String str;
if (asGigabytes() > 0) {
- value = String.format("%.1f GB (%d bytes)", (double) asBytes() / GB, asBytes());
+ str = String.format("%.1f GB (%d bytes)", value.doubleValue() / GB, value);
} else if (asMegabytes() > 0) {
- value = String.format("%.1f MB (%d bytes)", (double) asBytes() / MB, asBytes());
+ str = String.format("%.1f MB (%d bytes)", value.doubleValue() / MB, value);
} else if (asKilobytes() > 0) {
- value = String.format("%.1f KB (%d bytes)", (double) asBytes() / KB, asBytes());
+ str = String.format("%.1f KB (%d bytes)", value.doubleValue() / KB, value);
} else {
- value = bytes + " bytes";
+ str = value + " bytes";
}
- return String.format("ByteAmount{%s}", value);
+ return String.format("ByteAmount{%s}", str);
}
private void checkArgument(boolean condition, String errorMessage) {
diff --git a/heron/common/src/java/org/apache/heron/common/basics/CPUShare.java b/heron/common/src/java/org/apache/heron/common/basics/CPUShare.java
new file mode 100644
index 0000000..b400c0a
--- /dev/null
+++ b/heron/common/src/java/org/apache/heron/common/basics/CPUShare.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.common.basics;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public final class CPUShare extends ResourceMeasure<Double> {
+
+ private CPUShare(Double value) {
+ super(value);
+ }
+
+ public static CPUShare fromDouble(double value) {
+ return new CPUShare(value);
+ }
+
+ @Override
+ public CPUShare minus(ResourceMeasure<Double> other) {
+ return new CPUShare(value - other.value);
+ }
+
+ @Override
+ public CPUShare plus(ResourceMeasure<Double> other) {
+ return new CPUShare(value + other.value);
+ }
+
+ @Override
+ public CPUShare multiply(int factor) {
+ return new CPUShare(value * factor);
+ }
+
+ @Override
+ public CPUShare divide(int factor) {
+ return new CPUShare(value / factor);
+ }
+
+ @Override
+ public CPUShare increaseBy(int percentage) {
+ return new CPUShare(value * (1.0 + percentage / 100.0));
+ }
+
+ public static Map<String, CPUShare> convertDoubleMapToCpuShareMap(Map<String, Double> doubleMap) {
+ Map<String, CPUShare> retval = new HashMap<>();
+ for (Map.Entry<String, Double> entry : doubleMap.entrySet()) {
+ retval.put(entry.getKey(), new CPUShare(entry.getValue()));
+ }
+ return retval;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("CPUShare{%.3f}", value);
+ }
+}
diff --git a/heron/common/src/java/org/apache/heron/common/basics/ResourceMeasure.java b/heron/common/src/java/org/apache/heron/common/basics/ResourceMeasure.java
new file mode 100644
index 0000000..d0c50e6
--- /dev/null
+++ b/heron/common/src/java/org/apache/heron/common/basics/ResourceMeasure.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.common.basics;
+
+public abstract class ResourceMeasure<V extends Number & Comparable>
+ implements Comparable<ResourceMeasure<V>> {
+
+ protected final V value;
+
+ protected ResourceMeasure(V value) {
+ if (value == null) {
+ throw new IllegalArgumentException();
+ }
+ this.value = value;
+ }
+
+ public V getValue() {
+ return value;
+ }
+
+ public boolean isZero() {
+ return value.doubleValue() == 0.0;
+ }
+
+ public abstract ResourceMeasure<V> minus(ResourceMeasure<V> other);
+
+ public abstract ResourceMeasure<V> plus(ResourceMeasure<V> other);
+
+ public abstract ResourceMeasure<V> multiply(int factor);
+
+ public abstract ResourceMeasure<V> divide(int factor);
+
+ public abstract ResourceMeasure<V> increaseBy(int percentage);
+
+ @SuppressWarnings("unchecked")
+ public boolean greaterThan(ResourceMeasure<V> other) {
+ return value.compareTo(other.value) > 0;
+ }
+
+ @SuppressWarnings("unchecked")
+ public boolean greaterOrEqual(ResourceMeasure<V> other) {
+ return value.compareTo(other.value) >= 0;
+ }
+
+ @SuppressWarnings("unchecked")
+ public boolean lessThan(ResourceMeasure<V> other) {
+ return value.compareTo(other.value) < 0;
+ }
+
+ @SuppressWarnings("unchecked")
+ public boolean lessOrEqual(ResourceMeasure<V> other) {
+ return value.compareTo(other.value) <= 0;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public int compareTo(ResourceMeasure<V> o) {
+ return value.compareTo(o.value);
+ }
+
+ @Override
+ public int hashCode() {
+ return value.hashCode();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+
+ if (other == null || getClass() != other.getClass()) {
+ return false;
+ }
+
+ ResourceMeasure<V> that = (ResourceMeasure<V>) other;
+ return value.equals(that.value);
+ }
+}
diff --git a/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java b/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
index 0a80440..e4d2cf3 100644
--- a/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
+++ b/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
@@ -32,6 +32,8 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.heron.api.generated.TopologyAPI;
import org.apache.heron.api.utils.TopologyUtils;
import org.apache.heron.common.basics.ByteAmount;
+import org.apache.heron.common.basics.CPUShare;
+import org.apache.heron.common.basics.ResourceMeasure;
import org.apache.heron.spi.common.Config;
import org.apache.heron.spi.common.Context;
import org.apache.heron.spi.packing.IPacking;
@@ -84,17 +86,23 @@ import org.apache.heron.spi.packing.Resource;
public class RoundRobinPacking implements IPacking, IRepacking {
private static final Logger LOG = Logger.getLogger(RoundRobinPacking.class.getName());
- // TODO(mfu): Read these values from Config
- private static final ByteAmount DEFAULT_DISK_PADDING_PER_CONTAINER = ByteAmount.fromGigabytes(12);
- private static final double DEFAULT_CPU_PADDING_PER_CONTAINER = 1;
- private static final ByteAmount MIN_RAM_PER_INSTANCE = ByteAmount.fromMegabytes(192);
@VisibleForTesting
static final ByteAmount DEFAULT_RAM_PADDING_PER_CONTAINER = ByteAmount.fromGigabytes(2);
@VisibleForTesting
+ static final double DEFAULT_CPU_PADDING_PER_CONTAINER = 1.0;
+ private static final ByteAmount DEFAULT_DISK_PADDING_PER_CONTAINER = ByteAmount.fromGigabytes(12);
+
+ @VisibleForTesting
static final ByteAmount DEFAULT_DAEMON_PROCESS_RAM_PADDING = ByteAmount.fromGigabytes(1);
+ private static final ByteAmount MIN_RAM_PER_INSTANCE = ByteAmount.fromMegabytes(192);
// Use as a stub as default number value when getting config value
- private static final ByteAmount NOT_SPECIFIED_NUMBER_VALUE = ByteAmount.fromBytes(-1);
+ private static final ByteAmount NOT_SPECIFIED_BYTE_AMOUNT = ByteAmount.fromBytes(-1);
+ private static final double NOT_SPECIFIED_CPU_SHARE = -1.0;
+
+ private static final String RAM = "RAM";
+ private static final String CPU = "CPU";
+ private static final String DISK = "DISK";
private TopologyAPI.Topology topology;
@@ -102,6 +110,7 @@ public class RoundRobinPacking implements IPacking, IRepacking {
private double instanceCpuDefault;
private ByteAmount instanceDiskDefault;
private ByteAmount containerRamPadding = DEFAULT_RAM_PADDING_PER_CONTAINER;
+ private double containerCpuPadding = DEFAULT_CPU_PADDING_PER_CONTAINER;
@Override
public void initialize(Config config, TopologyAPI.Topology inputTopology) {
@@ -133,16 +142,36 @@ public class RoundRobinPacking implements IPacking, IRepacking {
// Get the RAM map for every instance
Map<Integer, Map<InstanceId, ByteAmount>> instancesRamMap =
- getInstancesRamMapInContainer(roundRobinAllocation);
+ calculateInstancesResourceMapInContainer(
+ roundRobinAllocation,
+ TopologyUtils.getComponentRamMapConfig(topology),
+ getContainerRamHint(roundRobinAllocation),
+ instanceRamDefault,
+ containerRamPadding,
+ ByteAmount.ZERO,
+ NOT_SPECIFIED_BYTE_AMOUNT,
+ RAM);
+
+ // Get the CPU map for every instance
+ Map<Integer, Map<InstanceId, CPUShare>> instancesCpuMap =
+ calculateInstancesResourceMapInContainer(
+ roundRobinAllocation,
+ CPUShare.convertDoubleMapToCpuShareMap(TopologyUtils.getComponentCpuMapConfig(topology)),
+ CPUShare.fromDouble(getContainerCpuHint(roundRobinAllocation)),
+ CPUShare.fromDouble(instanceCpuDefault),
+ CPUShare.fromDouble(containerCpuPadding),
+ CPUShare.fromDouble(0.0),
+ CPUShare.fromDouble(NOT_SPECIFIED_CPU_SHARE),
+ CPU);
ByteAmount containerDiskInBytes = getContainerDiskHint(roundRobinAllocation);
- double containerCpu = getContainerCpuHint(roundRobinAllocation);
+ double containerCpuHint = getContainerCpuHint(roundRobinAllocation);
ByteAmount containerRamHint = getContainerRamHint(roundRobinAllocation);
- LOG.info(String.format("Pack internal: container CPU hint: %f, RAM hint: %s, disk hint: %s.",
- containerCpu,
- containerDiskInBytes.toString(),
- containerRamHint.toString()));
+ LOG.info(String.format("Pack internal: container CPU hint: %.3f, RAM hint: %s, disk hint: %s.",
+ containerCpuHint,
+ containerRamHint.toString(),
+ containerDiskInBytes.toString()));
// Construct the PackingPlan
Set<PackingPlan.ContainerPlan> containerPlans = new HashSet<>();
@@ -152,31 +181,39 @@ public class RoundRobinPacking implements IPacking, IRepacking {
// Calculate the resource required for single instance
Map<InstanceId, PackingPlan.InstancePlan> instancePlanMap = new HashMap<>();
ByteAmount containerRam = containerRamPadding;
+ double containerCpu = containerCpuPadding;
+
for (InstanceId instanceId : instanceList) {
ByteAmount instanceRam = instancesRamMap.get(containerId).get(instanceId);
+ Double instanceCpu = instancesCpuMap.get(containerId).get(instanceId).getValue();
- // Currently not yet support disk or CPU config for different components,
- // so just use the default value.
+ // Currently not yet support disk config for different components, just use the default.
ByteAmount instanceDisk = instanceDiskDefault;
- double instanceCpu = instanceCpuDefault;
Resource resource = new Resource(instanceCpu, instanceRam, instanceDisk);
// Insert it into the map
instancePlanMap.put(instanceId, new PackingPlan.InstancePlan(instanceId, resource));
containerRam = containerRam.plus(instanceRam);
+ containerCpu += instanceCpu;
}
- Resource resource = new Resource(containerCpu, containerRam, containerDiskInBytes);
+ Resource resource = new Resource(Math.max(containerCpu, containerCpuHint),
+ containerRam, containerDiskInBytes);
PackingPlan.ContainerPlan containerPlan = new PackingPlan.ContainerPlan(
containerId, new HashSet<>(instancePlanMap.values()), resource);
containerPlans.add(containerPlan);
+
+ LOG.info(String.format("Pack internal finalized: container#%d CPU: %f, RAM: %s, disk: %s.",
+ containerId,
+ resource.getCpu(),
+ resource.getRam().toString(),
+ resource.getDisk().toString()));
}
PackingPlan plan = new PackingPlan(topology.getId(), containerPlans);
- // Check whether it is a valid PackingPlan
validatePackingPlan(plan);
return plan;
}
@@ -211,69 +248,75 @@ public class RoundRobinPacking implements IPacking, IRepacking {
daemonProcessPadding);
}
- /**
- * Calculate the RAM required by any instance in the container
- *
- * @param allocation the allocation of instances in different container
- * @return A map: (containerId -> (instanceId -> instanceRequiredRam))
- */
- private Map<Integer, Map<InstanceId, ByteAmount>> getInstancesRamMapInContainer(
- Map<Integer, List<InstanceId>> allocation) {
- Map<String, ByteAmount> ramMap = TopologyUtils.getComponentRamMapConfig(topology);
-
- Map<Integer, Map<InstanceId, ByteAmount>> instancesRamMapInContainer = new HashMap<>();
- ByteAmount containerRamHint = getContainerRamHint(allocation);
+ @SuppressWarnings("unchecked")
+ private <T extends ResourceMeasure> Map<Integer, Map<InstanceId, T>>
+ calculateInstancesResourceMapInContainer(
+ Map<Integer, List<InstanceId>> allocation,
+ Map<String, T> resMap,
+ T containerResHint,
+ T instanceResDefault,
+ T containerResPadding,
+ T zero,
+ T notSpecified,
+ String resourceType) {
+ Map<Integer, Map<InstanceId, T>> instancesResMapInContainer = new HashMap<>();
for (int containerId : allocation.keySet()) {
List<InstanceId> instanceIds = allocation.get(containerId);
- Map<InstanceId, ByteAmount> ramInsideContainer = new HashMap<>();
- instancesRamMapInContainer.put(containerId, ramInsideContainer);
- List<InstanceId> instancesToBeAccounted = new ArrayList<>();
+ Map<InstanceId, T> resInsideContainer = new HashMap<>();
+ instancesResMapInContainer.put(containerId, resInsideContainer);
+ List<InstanceId> unspecifiedInstances = new ArrayList<>();
- // Calculate the actual value
- ByteAmount usedRam = ByteAmount.ZERO;
+ // Register the instance resource allocation and calculate the used resource so far
+ T usedRes = zero;
for (InstanceId instanceId : instanceIds) {
String componentName = instanceId.getComponentName();
- if (ramMap.containsKey(componentName)) {
- ByteAmount ram = ramMap.get(componentName);
- ramInsideContainer.put(instanceId, ram);
- usedRam = usedRam.plus(ram);
+ if (resMap.containsKey(componentName)) {
+ T res = resMap.get(componentName);
+ resInsideContainer.put(instanceId, res);
+ usedRes = (T) usedRes.plus(res);
} else {
- instancesToBeAccounted.add(instanceId);
+ unspecifiedInstances.add(instanceId);
}
}
- // Now we have calculated RAM for instances specified in ComponentRamMap
- // Then to calculate RAM for the rest instances
- int instancesToAllocate = instancesToBeAccounted.size();
+ // Validate instance resources specified so far don't violate container-level constraint
+ if (!containerResHint.equals(notSpecified)
+ && usedRes.greaterThan(containerResHint.minus(containerResPadding))) {
+ throw new PackingException(String.format("Invalid packing plan generated. "
+ + "Total instance %s in a container (%s) have exceeded "
+ + "the container-level constraint of %s.",
+ resourceType, usedRes.toString(), containerResHint));
+ }
- if (instancesToAllocate != 0) {
- ByteAmount individualInstanceRam = instanceRamDefault;
+ // calculate resource for the remaining unspecified instances if any
+ if (!unspecifiedInstances.isEmpty()) {
+ T individualInstanceRes = instanceResDefault;
- // The RAM map is partially set. We need to calculate RAM for the rest
+ // If container resource is specified
+ if (!containerResHint.equals(notSpecified)) {
+ // discount resource for heron internal process (padding) and used (usedRes)
+ T remainingRes = (T) containerResHint.minus(containerResPadding).minus(usedRes);
- // We have different strategy depending on whether container RAM is specified
- // If container RAM is specified
- if (!containerRamHint.equals(NOT_SPECIFIED_NUMBER_VALUE)) {
- // remove RAM for heron internal process
- ByteAmount remainingRam =
- containerRamHint.minus(containerRamPadding).minus(usedRam);
+ if (remainingRes.lessOrEqual(zero)) {
+ throw new PackingException(String.format("Invalid packing plan generated. "
+ + "No enough %s to allocate for unspecified instances", resourceType));
+ }
- // Split remaining RAM evenly
- individualInstanceRam = remainingRam.divide(instancesToAllocate);
+ // Split remaining resource evenly
+ individualInstanceRes = (T) remainingRes.divide(unspecifiedInstances.size());
}
- // Put the results in instancesRam
- for (InstanceId instanceId : instancesToBeAccounted) {
- ramInsideContainer.put(instanceId, individualInstanceRam);
+ // Put the results in resInsideContainer
+ for (InstanceId instanceId : unspecifiedInstances) {
+ resInsideContainer.put(instanceId, individualInstanceRes);
}
}
}
- return instancesRamMapInContainer;
+ return instancesResMapInContainer;
}
-
/**
* Get the instances' allocation basing on round robin algorithm
*
@@ -288,7 +331,7 @@ public class RoundRobinPacking implements IPacking, IRepacking {
}
for (int i = 1; i <= numContainer; ++i) {
- allocation.put(i, new ArrayList<InstanceId>());
+ allocation.put(i, new ArrayList<>());
}
int index = 1;
@@ -367,7 +410,7 @@ public class RoundRobinPacking implements IPacking, IRepacking {
return TopologyUtils.getConfigWithDefault(
topologyConfig, org.apache.heron.api.Config.TOPOLOGY_CONTAINER_RAM_REQUESTED,
- NOT_SPECIFIED_NUMBER_VALUE);
+ NOT_SPECIFIED_BYTE_AMOUNT);
}
/**
diff --git a/heron/packing/tests/java/org/apache/heron/packing/AssertPacking.java b/heron/packing/tests/java/org/apache/heron/packing/AssertPacking.java
index dc54ad6..356ff1a 100644
--- a/heron/packing/tests/java/org/apache/heron/packing/AssertPacking.java
+++ b/heron/packing/tests/java/org/apache/heron/packing/AssertPacking.java
@@ -42,6 +42,8 @@ import static org.junit.Assert.assertTrue;
*/
public final class AssertPacking {
+ private static final double DELTA = 0.1;
+
private AssertPacking() { }
/**
@@ -55,18 +57,17 @@ public final class AssertPacking {
ByteAmount notExpectedContainerRam) {
boolean boltFound = false;
boolean spoutFound = false;
- List<Integer> expectedInstanceIndecies = new ArrayList<>();
- List<Integer> foundInstanceIndecies = new ArrayList<>();
+ List<Integer> expectedInstanceIndices = new ArrayList<>();
+ List<Integer> foundInstanceIndices = new ArrayList<>();
int expectedInstanceIndex = 1;
// RAM for bolt should be the value in component RAM map
for (PackingPlan.ContainerPlan containerPlan : containerPlans) {
if (notExpectedContainerRam != null) {
- assertNotEquals(
- notExpectedContainerRam, containerPlan.getRequiredResource().getRam());
+ assertNotEquals(notExpectedContainerRam, containerPlan.getRequiredResource().getRam());
}
for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
- expectedInstanceIndecies.add(expectedInstanceIndex++);
- foundInstanceIndecies.add(instancePlan.getTaskId());
+ expectedInstanceIndices.add(expectedInstanceIndex++);
+ foundInstanceIndices.add(instancePlan.getTaskId());
if (instancePlan.getComponentName().equals(boltName)) {
assertEquals("Unexpected bolt RAM", expectedBoltRam, instancePlan.getResource().getRam());
boltFound = true;
@@ -81,9 +82,52 @@ public final class AssertPacking {
assertTrue("Bolt not found in any of the container plans: " + boltName, boltFound);
assertTrue("Spout not found in any of the container plans: " + spoutName, spoutFound);
- Collections.sort(foundInstanceIndecies);
+ Collections.sort(foundInstanceIndices);
+ assertEquals("Unexpected instance global id set found.",
+ expectedInstanceIndices, foundInstanceIndices);
+ }
+
+ /**
+ * Verifies that the containerPlan has at least one bolt named boltName with CPU equal to
+ * expectedBoltCpu and likewise for spouts. If notExpectedContainerCpu is not null, verifies that
+ * the container CPU is not that.
+ */
+ public static void assertContainers(Set<PackingPlan.ContainerPlan> containerPlans,
+ String boltName, String spoutName,
+ Double expectedBoltCpu, Double expectedSpoutCpu,
+ Double notExpectedContainerCpu) {
+ boolean boltFound = false;
+ boolean spoutFound = false;
+ List<Integer> expectedInstanceIndices = new ArrayList<>();
+ List<Integer> foundInstanceIndices = new ArrayList<>();
+ int expectedInstanceIndex = 1;
+ // CPU for bolt should be the value in component CPU map
+ for (PackingPlan.ContainerPlan containerPlan : containerPlans) {
+ if (notExpectedContainerCpu != null) {
+ assertNotEquals(notExpectedContainerCpu, containerPlan.getRequiredResource().getCpu());
+ }
+ for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
+ expectedInstanceIndices.add(expectedInstanceIndex++);
+ foundInstanceIndices.add(instancePlan.getTaskId());
+ if (instancePlan.getComponentName().equals(boltName)) {
+ assertEquals("Unexpected bolt CPU",
+ expectedBoltCpu.doubleValue(), instancePlan.getResource().getCpu(), DELTA);
+ boltFound = true;
+ }
+ if (instancePlan.getComponentName().equals(spoutName)) {
+ assertEquals(
+ "Unexpected spout CPU",
+ expectedSpoutCpu.doubleValue(), instancePlan.getResource().getCpu(), DELTA);
+ spoutFound = true;
+ }
+ }
+ }
+ assertTrue("Bolt not found in any of the container plans: " + boltName, boltFound);
+ assertTrue("Spout not found in any of the container plans: " + spoutName, spoutFound);
+
+ Collections.sort(foundInstanceIndices);
assertEquals("Unexpected instance global id set found.",
- expectedInstanceIndecies, foundInstanceIndecies);
+ expectedInstanceIndices, foundInstanceIndices);
}
/**
diff --git a/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java b/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
index 742c6a4..826feaa 100644
--- a/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
+++ b/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
@@ -69,7 +69,7 @@ public class RoundRobinPackingTest {
}
@Test(expected = PackingException.class)
- public void testCheckFailure() throws Exception {
+ public void testCheckInsufficientRamFailure() throws Exception {
int numContainers = 2;
int spoutParallelism = 4;
int boltParallelism = 3;
@@ -87,6 +87,25 @@ public class RoundRobinPackingTest {
getRoundRobinPackingPlan(topology);
}
+ @Test(expected = PackingException.class)
+ public void testCheckInsufficientCpuFailure() throws Exception {
+ int numContainers = 2;
+ int spoutParallelism = 4;
+ int boltParallelism = 3;
+
+ // Set up the topology and its config
+ org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
+ topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
+
+ // Explicit set insufficient CPU for container
+ double containerCpu = 1.0;
+
+ topologyConfig.setContainerCpuRequested(containerCpu);
+
+ TopologyAPI.Topology topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+ getRoundRobinPackingPlan(topology);
+ }
+
@Test
public void testDefaultResources() throws Exception {
int numContainers = 2;
@@ -160,6 +179,10 @@ public class RoundRobinPackingTest {
Assert.assertEquals(containerRam
.minus(RoundRobinPacking.DEFAULT_RAM_PADDING_PER_CONTAINER).divide(instancesCount),
resources.iterator().next().getRam());
+
+ Assert.assertEquals(
+ (containerCpu - RoundRobinPacking.DEFAULT_CPU_PADDING_PER_CONTAINER) / instancesCount,
+ resources.iterator().next().getCpu(), DELTA);
}
}
@@ -216,11 +239,15 @@ public class RoundRobinPackingTest {
Assert.assertEquals(containerRam
.minus(containerRamPadding).divide(instancesCount),
resources.iterator().next().getRam());
+
+ Assert.assertEquals(
+ (containerCpu - RoundRobinPacking.DEFAULT_CPU_PADDING_PER_CONTAINER) / instancesCount,
+ resources.iterator().next().getCpu(), DELTA);
}
}
/**
- * Test the scenario RAM map config is partially set
+ * Test the scenario RAM map config is completely set
*/
@Test
public void testCompleteRamMapRequested() throws Exception {
@@ -256,6 +283,135 @@ public class RoundRobinPackingTest {
}
/**
+ * Test the scenario CPU map config is completely set and there are exactly enough resource
+ */
+ @Test
+ public void testCompleteCpuMapRequestedWithExactlyEnoughResource() throws Exception {
+ int numContainers = 2;
+ int spoutParallelism = 4;
+ int boltParallelism = 3;
+ Integer totalInstances = spoutParallelism + boltParallelism;
+
+ // Set up the topology and its config
+ org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
+ topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
+
+ // Explicit set resources for container
+ double containerCpu = 17;
+
+ // Explicit set component CPU map
+ double boltCpu = 4;
+ double spoutCpu = 4;
+
+ topologyConfig.setContainerCpuRequested(containerCpu);
+ topologyConfig.setComponentCpu(BOLT_NAME, boltCpu);
+ topologyConfig.setComponentCpu(SPOUT_NAME, spoutCpu);
+
+ TopologyAPI.Topology topologyExplicitCpuMap =
+ getTopology(spoutParallelism, boltParallelism, topologyConfig);
+ PackingPlan packingPlanExplicitCpuMap =
+ getRoundRobinPackingPlan(topologyExplicitCpuMap);
+
+ AssertPacking.assertContainers(packingPlanExplicitCpuMap.getContainers(),
+ BOLT_NAME, SPOUT_NAME, boltCpu, spoutCpu, null);
+ Assert.assertEquals(totalInstances, packingPlanExplicitCpuMap.getInstanceCount());
+ }
+
+ /**
+ * Test the scenario CPU map config is completely set and there are more than enough resource
+ */
+ @Test
+ public void testCompleteCpuMapRequestedWithMoreThanEnoughResource() throws Exception {
+ int numContainers = 2;
+ int spoutParallelism = 4;
+ int boltParallelism = 3;
+ Integer totalInstances = spoutParallelism + boltParallelism;
+
+ // Set up the topology and its config
+ org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
+ topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
+
+ // Explicit set resources for container
+ double containerCpu = 30;
+
+ // Explicit set component CPU map
+ double boltCpu = 4;
+ double spoutCpu = 4;
+
+ topologyConfig.setContainerCpuRequested(containerCpu);
+ topologyConfig.setComponentCpu(BOLT_NAME, boltCpu);
+ topologyConfig.setComponentCpu(SPOUT_NAME, spoutCpu);
+
+ TopologyAPI.Topology topologyExplicitCpuMap =
+ getTopology(spoutParallelism, boltParallelism, topologyConfig);
+ PackingPlan packingPlanExplicitCpuMap =
+ getRoundRobinPackingPlan(topologyExplicitCpuMap);
+
+ AssertPacking.assertContainers(packingPlanExplicitCpuMap.getContainers(),
+ BOLT_NAME, SPOUT_NAME, boltCpu, spoutCpu, null);
+ Assert.assertEquals(totalInstances, packingPlanExplicitCpuMap.getInstanceCount());
+ }
+
+ /**
+ * Test the scenario CPU map config is completely set and there are less than enough resource
+ */
+ @Test(expected = PackingException.class)
+ public void testCompleteCpuMapRequestedWithLessThanEnoughResource() throws Exception {
+ int numContainers = 2;
+ int spoutParallelism = 4;
+ int boltParallelism = 3;
+
+ // Set up the topology and its config
+ org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
+ topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
+
+ // Explicit set resources for container
+ double containerCpu = 10;
+
+ // Explicit set component CPU map
+ double boltCpu = 4;
+ double spoutCpu = 4;
+
+ topologyConfig.setContainerCpuRequested(containerCpu);
+ topologyConfig.setComponentCpu(BOLT_NAME, boltCpu);
+ topologyConfig.setComponentCpu(SPOUT_NAME, spoutCpu);
+
+ TopologyAPI.Topology topologyExplicitCpuMap =
+ getTopology(spoutParallelism, boltParallelism, topologyConfig);
+ getRoundRobinPackingPlan(topologyExplicitCpuMap);
+ }
+
+ /**
+ * Test the scenario CPU map config is completely set
+ * and there are exactly enough resource for instances, but not enough for padding
+ */
+ @Test(expected = PackingException.class)
+ public void testCompleteCpuMapRequestedWithoutPaddingResource() throws Exception {
+ int numContainers = 2;
+ int spoutParallelism = 4;
+ int boltParallelism = 3;
+
+ // Set up the topology and its config
+ org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
+ topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
+
+ // Explicit set resources for container
+ double containerCpu = 16;
+
+ // Explicit set component CPU map
+ double boltCpu = 4;
+ double spoutCpu = 4;
+
+ topologyConfig.setContainerCpuRequested(containerCpu);
+ topologyConfig.setComponentCpu(BOLT_NAME, boltCpu);
+ topologyConfig.setComponentCpu(SPOUT_NAME, spoutCpu);
+
+ TopologyAPI.Topology topologyExplicitCpuMap =
+ getTopology(spoutParallelism, boltParallelism, topologyConfig);
+ getRoundRobinPackingPlan(topologyExplicitCpuMap);
+ }
+
+ /**
* Test the scenario RAM map config is partially set
*/
@Test
@@ -285,8 +441,7 @@ public class RoundRobinPackingTest {
Assert.assertEquals(totalInstances, packingPlanExplicitRamMap.getInstanceCount());
// RAM for bolt should be the value in component RAM map
- for (PackingPlan.ContainerPlan containerPlan
- : packingPlanExplicitRamMap.getContainers()) {
+ for (PackingPlan.ContainerPlan containerPlan : packingPlanExplicitRamMap.getContainers()) {
Assert.assertEquals(containerRam, containerPlan.getRequiredResource().getRam());
int boltCount = 0;
int instancesCount = containerPlan.getInstances().size();
@@ -314,6 +469,63 @@ public class RoundRobinPackingTest {
}
/**
+ * Test the scenario CPU map config is partially set
+ */
+ @Test
+ public void testPartialCpuMap() throws Exception {
+ int numContainers = 2;
+ int spoutParallelism = 4;
+ int boltParallelism = 3;
+ Integer totalInstances = spoutParallelism + boltParallelism;
+
+ // Set up the topology and its config
+ org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
+ topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
+
+ // Explicit set resources for container
+ double containerCpu = 17;
+
+ // Explicit set component CPU map
+ double boltCpu = 4;
+
+ topologyConfig.setContainerCpuRequested(containerCpu);
+ topologyConfig.setComponentCpu(BOLT_NAME, boltCpu);
+
+ TopologyAPI.Topology topologyExplicitCpuMap =
+ getTopology(spoutParallelism, boltParallelism, topologyConfig);
+ PackingPlan packingPlanExplicitCpuMap =
+ getRoundRobinPackingPlan(topologyExplicitCpuMap);
+ Assert.assertEquals(totalInstances, packingPlanExplicitCpuMap.getInstanceCount());
+
+ // CPU for bolt should be the value in component CPU map
+ for (PackingPlan.ContainerPlan containerPlan : packingPlanExplicitCpuMap.getContainers()) {
+ Assert.assertEquals(containerCpu, containerPlan.getRequiredResource().getCpu(), DELTA);
+ int boltCount = 0;
+ int instancesCount = containerPlan.getInstances().size();
+ for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
+ if (instancePlan.getComponentName().equals(BOLT_NAME)) {
+ Assert.assertEquals(boltCpu, instancePlan.getResource().getCpu(), DELTA);
+ boltCount++;
+ }
+ }
+
+ // CPU for spout should be:
+ // (containerCpu - all CPU for bolt - CPU for padding) / (# of spouts)
+ int spoutCount = instancesCount - boltCount;
+ for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
+ if (instancePlan.getComponentName().equals(SPOUT_NAME)) {
+ Assert.assertEquals(
+ (containerCpu
+ - boltCpu * boltCount
+ - RoundRobinPacking.DEFAULT_CPU_PADDING_PER_CONTAINER)
+ / spoutCount,
+ instancePlan.getResource().getCpu(), DELTA);
+ }
+ }
+ }
+ }
+
+ /**
* test even packing of instances
*/
@Test