You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nw...@apache.org on 2019/02/27 17:28:20 UTC
[incubator-heron] branch master updated: Refactor
FirstFitDecreasingPacking to support CPU intensive topologies (#3195)
This is an automated email from the ASF dual-hosted git repository.
nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new 7ea8b8c Refactor FirstFitDecreasingPacking to support CPU intensive topologies (#3195)
7ea8b8c is described below
commit 7ea8b8cc7d99554a6d2bcdbb13073a6857e320dd
Author: Xiaoyao Qian <qi...@illinois.edu>
AuthorDate: Wed Feb 27 09:28:15 2019 -0800
Refactor FirstFitDecreasingPacking to support CPU intensive topologies (#3195)
---
.../api/src/java/org/apache/heron/api/Config.java | 15 ++++++
.../binpacking/FirstFitDecreasingPacking.java | 62 +++++++++++++++-------
.../heron/packing/builder/PackingPlanBuilder.java | 4 +-
...amRequirement.java => ResourceRequirement.java} | 39 ++++++--------
.../SortingStrategy.java} | 24 +++++++--
.../packing/constraints/InstanceConstraint.java | 2 +-
.../constraints/InstanceDensityConstraint.java | 48 +++++++++++++++++
...MinRamConstraint.java => MinCpuConstraint.java} | 16 +++---
.../packing/constraints/MinRamConstraint.java | 3 +-
.../packing/constraints/PackingConstraint.java | 2 +-
.../packing/constraints/ResourceConstraint.java | 2 +-
.../TooManyInstancesException.java} | 16 ++++--
.../packing/roundrobin/RoundRobinPacking.java | 15 +++---
.../binpacking/FirstFitDecreasingPackingTest.java | 11 ++--
14 files changed, 180 insertions(+), 79 deletions(-)
diff --git a/heron/api/src/java/org/apache/heron/api/Config.java b/heron/api/src/java/org/apache/heron/api/Config.java
index 7319537..8da9109 100644
--- a/heron/api/src/java/org/apache/heron/api/Config.java
+++ b/heron/api/src/java/org/apache/heron/api/Config.java
@@ -216,6 +216,12 @@ public class Config extends HashMap<String, Object> {
*/
public static final String TOPOLOGY_COMPONENT_DISKMAP = "topology.component.diskmap";
/**
+ * Sorting strategy for FirstFitDecreasingPacking algorithm.
+ * RAM_FIRST (default), or CPU_FIRST
+ */
+ public static final String TOPOLOGY_PACKING_FFD_SORTING_STRATEGY
+ = "topology.packing.ffd.sorting.strategy";
+ /**
* What's the checkpoint interval for stateful topologies in seconds
*/
public static final String TOPOLOGY_STATEFUL_CHECKPOINT_INTERVAL_SECONDS =
@@ -351,6 +357,7 @@ public class Config extends HashMap<String, Object> {
apiVars.add(TOPOLOGY_COMPONENT_CPUMAP);
apiVars.add(TOPOLOGY_COMPONENT_RAMMAP);
apiVars.add(TOPOLOGY_COMPONENT_DISKMAP);
+ apiVars.add(TOPOLOGY_PACKING_FFD_SORTING_STRATEGY);
apiVars.add(TOPOLOGY_STATEFUL_START_CLEAN);
apiVars.add(TOPOLOGY_STATEFUL_CHECKPOINT_INTERVAL_SECONDS);
apiVars.add(TOPOLOGY_STATEFUL_CKPTMGR_RAM);
@@ -518,6 +525,10 @@ public class Config extends HashMap<String, Object> {
conf.put(Config.TOPOLOGY_COMPONENT_DISKMAP, diskMap);
}
+ public static void setFFDSortingStrategy(Map<String, Object> conf, String sortingStrategy) {
+ conf.put(Config.TOPOLOGY_PACKING_FFD_SORTING_STRATEGY, sortingStrategy);
+ }
+
public static void setAutoTaskHooks(Map<String, Object> conf, List<String> hooks) {
conf.put(Config.TOPOLOGY_AUTO_TASK_HOOKS, hooks);
}
@@ -799,6 +810,10 @@ public class Config extends HashMap<String, Object> {
setComponentDisk(this, component, diskInBytes);
}
+ public void setFFDSortingStrategy(String sortingStrategy) {
+ setFFDSortingStrategy(this, sortingStrategy);
+ }
+
public void setUpdateDeactivateWaitDuration(int seconds) {
put(Config.TOPOLOGY_UPDATE_DEACTIVATE_WAIT_SECS, Integer.toString(seconds));
}
diff --git a/heron/packing/src/java/org/apache/heron/packing/binpacking/FirstFitDecreasingPacking.java b/heron/packing/src/java/org/apache/heron/packing/binpacking/FirstFitDecreasingPacking.java
index f8e7523..e293172 100644
--- a/heron/packing/src/java/org/apache/heron/packing/binpacking/FirstFitDecreasingPacking.java
+++ b/heron/packing/src/java/org/apache/heron/packing/binpacking/FirstFitDecreasingPacking.java
@@ -20,12 +20,14 @@
package org.apache.heron.packing.binpacking;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Logger;
+import org.apache.heron.api.generated.TopologyAPI;
import org.apache.heron.api.utils.TopologyUtils;
import org.apache.heron.packing.AbstractPacking;
import org.apache.heron.packing.builder.Container;
@@ -33,17 +35,23 @@ import org.apache.heron.packing.builder.ContainerIdScorer;
import org.apache.heron.packing.builder.HomogeneityScorer;
import org.apache.heron.packing.builder.InstanceCountScorer;
import org.apache.heron.packing.builder.PackingPlanBuilder;
-import org.apache.heron.packing.builder.RamRequirement;
+import org.apache.heron.packing.builder.ResourceRequirement;
import org.apache.heron.packing.builder.Scorer;
+import org.apache.heron.packing.builder.SortingStrategy;
+import org.apache.heron.packing.constraints.InstanceDensityConstraint;
+import org.apache.heron.packing.constraints.MinCpuConstraint;
import org.apache.heron.packing.constraints.MinRamConstraint;
import org.apache.heron.packing.constraints.ResourceConstraint;
import org.apache.heron.packing.exceptions.ConstraintViolationException;
import org.apache.heron.packing.exceptions.ResourceExceededException;
import org.apache.heron.packing.utils.PackingUtils;
+import org.apache.heron.spi.common.Config;
import org.apache.heron.spi.packing.PackingException;
import org.apache.heron.spi.packing.PackingPlan;
import org.apache.heron.spi.packing.Resource;
+import static org.apache.heron.api.Config.TOPOLOGY_PACKING_FFD_SORTING_STRATEGY;
+
/**
* FirstFitDecreasing packing algorithm
* <p>
@@ -93,7 +101,20 @@ public class FirstFitDecreasingPacking extends AbstractPacking {
private static final Logger LOG = Logger.getLogger(FirstFitDecreasingPacking.class.getName());
+ private static final SortingStrategy DEFAULT_SORTING_STRATEGY = SortingStrategy.RAM_FIRST;
+
private int numContainers = 0;
+ private SortingStrategy sortingStrategy = DEFAULT_SORTING_STRATEGY;
+
+ @Override
+ public void initialize(Config config, TopologyAPI.Topology inputTopology) {
+ super.initialize(config, inputTopology);
+ List<TopologyAPI.Config.KeyValue> topologyConfig = topology.getTopologyConfig().getKvsList();
+ this.sortingStrategy = SortingStrategy.valueOf(
+ TopologyUtils.getConfigWithDefault(topologyConfig,
+ TOPOLOGY_PACKING_FFD_SORTING_STRATEGY, DEFAULT_SORTING_STRATEGY.toString())
+ .toUpperCase());
+ }
private PackingPlanBuilder newPackingPlanBuilder(PackingPlan existingPackingPlan) {
return new PackingPlanBuilder(topology.getId(), existingPackingPlan)
@@ -101,8 +122,9 @@ public class FirstFitDecreasingPacking extends AbstractPacking {
.setMaxContainerResource(maxContainerResources)
.setRequestedContainerPadding(padding)
.setRequestedComponentResource(componentResourceMap)
- .setInstanceConstraints(Collections.singletonList(new MinRamConstraint()))
- .setPackingConstraints(Collections.singletonList(new ResourceConstraint()));
+ .setInstanceConstraints(Arrays.asList(new MinRamConstraint(), new MinCpuConstraint()))
+ .setPackingConstraints(Arrays.asList(new ResourceConstraint(),
+ new InstanceDensityConstraint(maxNumInstancesPerContainer)));
}
/**
@@ -154,16 +176,17 @@ public class FirstFitDecreasingPacking extends AbstractPacking {
*
* @return The sorted list of components and their RAM requirements
*/
- private ArrayList<RamRequirement> getSortedRAMInstances(Set<String> componentNames) {
- ArrayList<RamRequirement> ramRequirements = new ArrayList<>();
+ private List<ResourceRequirement> getSortedInstances(Set<String> componentNames) {
+ List<ResourceRequirement> resourceRequirements = new ArrayList<>();
for (String componentName : componentNames) {
Resource requiredResource = this.componentResourceMap.getOrDefault(componentName,
defaultInstanceResources);
- ramRequirements.add(new RamRequirement(componentName, requiredResource.getRam()));
+ resourceRequirements.add(new ResourceRequirement(componentName,
+ requiredResource.getRam(), requiredResource.getCpu()));
}
- Collections.sort(ramRequirements, Collections.reverseOrder());
+ Collections.sort(resourceRequirements, sortingStrategy.reversed());
- return ramRequirements;
+ return resourceRequirements;
}
/**
@@ -212,11 +235,12 @@ public class FirstFitDecreasingPacking extends AbstractPacking {
* @param parallelismMap component parallelism
*/
private void assignInstancesToContainers(PackingPlanBuilder planBuilder,
- Map<String, Integer> parallelismMap) throws ConstraintViolationException {
- ArrayList<RamRequirement> ramRequirements
- = getSortedRAMInstances(parallelismMap.keySet());
- for (RamRequirement ramRequirement : ramRequirements) {
- String componentName = ramRequirement.getComponentName();
+ Map<String, Integer> parallelismMap)
+ throws ConstraintViolationException {
+ List<ResourceRequirement> resourceRequirements
+ = getSortedInstances(parallelismMap.keySet());
+ for (ResourceRequirement resourceRequirement : resourceRequirements) {
+ String componentName = resourceRequirement.getComponentName();
int numInstance = parallelismMap.get(componentName);
for (int j = 0; j < numInstance; j++) {
placeFFDInstance(planBuilder, componentName);
@@ -233,14 +257,14 @@ public class FirstFitDecreasingPacking extends AbstractPacking {
private void removeInstancesFromContainers(PackingPlanBuilder packingPlanBuilder,
Map<String, Integer> componentsToScaleDown) {
- ArrayList<RamRequirement> ramRequirements =
- getSortedRAMInstances(componentsToScaleDown.keySet());
+ List<ResourceRequirement> resourceRequirements =
+ getSortedInstances(componentsToScaleDown.keySet());
InstanceCountScorer instanceCountScorer = new InstanceCountScorer();
ContainerIdScorer containerIdScorer = new ContainerIdScorer(false);
- for (RamRequirement ramRequirement : ramRequirements) {
- String componentName = ramRequirement.getComponentName();
+ for (ResourceRequirement resourceRequirement : resourceRequirements) {
+ String componentName = resourceRequirement.getComponentName();
int numInstancesToRemove = -componentsToScaleDown.get(componentName);
List<Scorer<Container>> scorers = new ArrayList<>();
@@ -259,8 +283,8 @@ public class FirstFitDecreasingPacking extends AbstractPacking {
* Assign a particular instance to an existing container or to a new container
*
*/
- private void placeFFDInstance(PackingPlanBuilder planBuilder,
- String componentName) throws ConstraintViolationException {
+ private void placeFFDInstance(PackingPlanBuilder planBuilder, String componentName)
+ throws ConstraintViolationException {
if (this.numContainers == 0) {
planBuilder.updateNumContainers(++numContainers);
}
diff --git a/heron/packing/src/java/org/apache/heron/packing/builder/PackingPlanBuilder.java b/heron/packing/src/java/org/apache/heron/packing/builder/PackingPlanBuilder.java
index d9fe4db..18a9c99 100644
--- a/heron/packing/src/java/org/apache/heron/packing/builder/PackingPlanBuilder.java
+++ b/heron/packing/src/java/org/apache/heron/packing/builder/PackingPlanBuilder.java
@@ -144,10 +144,10 @@ public class PackingPlanBuilder {
// Check constraints
for (InstanceConstraint constraint : instanceConstraints) {
- constraint.test(instancePlan);
+ constraint.validate(instancePlan);
}
for (PackingConstraint constraint : packingConstraints) {
- constraint.test(container, instancePlan);
+ constraint.validate(container, instancePlan);
}
addToContainer(container, instancePlan, this.componentIndexes, this.taskIds);
diff --git a/heron/packing/src/java/org/apache/heron/packing/builder/RamRequirement.java b/heron/packing/src/java/org/apache/heron/packing/builder/ResourceRequirement.java
similarity index 61%
rename from heron/packing/src/java/org/apache/heron/packing/builder/RamRequirement.java
rename to heron/packing/src/java/org/apache/heron/packing/builder/ResourceRequirement.java
index 5c1035d..a687a15 100644
--- a/heron/packing/src/java/org/apache/heron/packing/builder/RamRequirement.java
+++ b/heron/packing/src/java/org/apache/heron/packing/builder/ResourceRequirement.java
@@ -22,44 +22,35 @@ package org.apache.heron.packing.builder;
import org.apache.heron.common.basics.ByteAmount;
/**
- * Helper class that captures the RAM requirements of each component
+ * Helper class that captures the RAM and CPU requirements of each component
*/
-public class RamRequirement implements Comparable<RamRequirement> {
+public class ResourceRequirement {
private String componentName;
private ByteAmount ramRequirement;
+ private double cpuRequirement;
- public RamRequirement(String componentName, ByteAmount ram) {
+ public ResourceRequirement(String componentName, ByteAmount ram) {
+ this(componentName, ram, 0.0);
+ }
+
+ public ResourceRequirement(String componentName,
+ ByteAmount ram,
+ double cpu) {
this.componentName = componentName;
this.ramRequirement = ram;
+ this.cpuRequirement = cpu;
}
public String getComponentName() {
return componentName;
}
- @Override
- public int compareTo(RamRequirement other) {
- return this.ramRequirement.compareTo(other.ramRequirement);
- }
-
- @Override
- public boolean equals(Object o) {
-
- if (o == this) {
- return true;
- }
- if (!(o instanceof RamRequirement)) {
- return false;
- }
- RamRequirement c = (RamRequirement) o;
-
- // Compare the ramRequirement values and return accordingly
- return ramRequirement.equals(c.ramRequirement);
+ public ByteAmount getRamRequirement() {
+ return ramRequirement;
}
- @Override
- public int hashCode() {
- return ramRequirement.hashCode();
+ public double getCpuRequirement() {
+ return cpuRequirement;
}
}
diff --git a/heron/packing/src/java/org/apache/heron/packing/constraints/InstanceConstraint.java b/heron/packing/src/java/org/apache/heron/packing/builder/SortingStrategy.java
similarity index 50%
copy from heron/packing/src/java/org/apache/heron/packing/constraints/InstanceConstraint.java
copy to heron/packing/src/java/org/apache/heron/packing/builder/SortingStrategy.java
index d7ddeed..cabe94c 100644
--- a/heron/packing/src/java/org/apache/heron/packing/constraints/InstanceConstraint.java
+++ b/heron/packing/src/java/org/apache/heron/packing/builder/SortingStrategy.java
@@ -17,11 +17,25 @@
* under the License.
*/
-package org.apache.heron.packing.constraints;
+package org.apache.heron.packing.builder;
-import org.apache.heron.packing.exceptions.ConstraintViolationException;
-import org.apache.heron.spi.packing.PackingPlan;
+import java.util.Comparator;
-public interface InstanceConstraint {
- void test(PackingPlan.InstancePlan instancePlan) throws ConstraintViolationException;
+public enum SortingStrategy implements Comparator<ResourceRequirement> {
+ RAM_FIRST {
+ @Override
+ public int compare(ResourceRequirement o1, ResourceRequirement o2) {
+ int ramComparison = o1.getRamRequirement().compareTo(o2.getRamRequirement());
+ return ramComparison == 0
+ ? Double.compare(o1.getCpuRequirement(), o2.getCpuRequirement()) : ramComparison;
+ }
+ },
+ CPU_FIRST {
+ @Override
+ public int compare(ResourceRequirement o1, ResourceRequirement o2) {
+ int cpuComparison = Double.compare(o1.getCpuRequirement(), o2.getCpuRequirement());
+ return cpuComparison == 0
+ ? o1.getRamRequirement().compareTo(o2.getRamRequirement()) : cpuComparison;
+ }
+ }
}
diff --git a/heron/packing/src/java/org/apache/heron/packing/constraints/InstanceConstraint.java b/heron/packing/src/java/org/apache/heron/packing/constraints/InstanceConstraint.java
index d7ddeed..83aff74 100644
--- a/heron/packing/src/java/org/apache/heron/packing/constraints/InstanceConstraint.java
+++ b/heron/packing/src/java/org/apache/heron/packing/constraints/InstanceConstraint.java
@@ -23,5 +23,5 @@ import org.apache.heron.packing.exceptions.ConstraintViolationException;
import org.apache.heron.spi.packing.PackingPlan;
public interface InstanceConstraint {
- void test(PackingPlan.InstancePlan instancePlan) throws ConstraintViolationException;
+ void validate(PackingPlan.InstancePlan instancePlan) throws ConstraintViolationException;
}
diff --git a/heron/packing/src/java/org/apache/heron/packing/constraints/InstanceDensityConstraint.java b/heron/packing/src/java/org/apache/heron/packing/constraints/InstanceDensityConstraint.java
new file mode 100644
index 0000000..704cff8
--- /dev/null
+++ b/heron/packing/src/java/org/apache/heron/packing/constraints/InstanceDensityConstraint.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.packing.constraints;
+
+import org.apache.heron.packing.builder.Container;
+import org.apache.heron.packing.exceptions.TooManyInstancesException;
+import org.apache.heron.spi.packing.PackingPlan;
+
+/**
+ * Constraint on the number of instances in one container
+ */
+public class InstanceDensityConstraint implements PackingConstraint {
+ private final int maxNumInstancesPerContainer;
+
+ public InstanceDensityConstraint(int maxNumInstancesPerContainer) {
+ this.maxNumInstancesPerContainer = maxNumInstancesPerContainer;
+ }
+
+ @Override
+ public void validate(Container container, PackingPlan.InstancePlan instancePlan)
+ throws TooManyInstancesException {
+ if (container.getInstances().size() + 1 > maxNumInstancesPerContainer) {
+ throw new TooManyInstancesException(String.format("Adding instance %s to container %d "
+ + "will cause the container to have more instances "
+ + "than the configured maxNumInstancesPerContainer %d",
+ instancePlan.getComponentName(),
+ container.getContainerId(),
+ maxNumInstancesPerContainer));
+ }
+ }
+}
diff --git a/heron/packing/src/java/org/apache/heron/packing/constraints/MinRamConstraint.java b/heron/packing/src/java/org/apache/heron/packing/constraints/MinCpuConstraint.java
similarity index 68%
copy from heron/packing/src/java/org/apache/heron/packing/constraints/MinRamConstraint.java
copy to heron/packing/src/java/org/apache/heron/packing/constraints/MinCpuConstraint.java
index 39d9d5c..bd1bc4a 100644
--- a/heron/packing/src/java/org/apache/heron/packing/constraints/MinRamConstraint.java
+++ b/heron/packing/src/java/org/apache/heron/packing/constraints/MinCpuConstraint.java
@@ -19,20 +19,20 @@
package org.apache.heron.packing.constraints;
-import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.packing.exceptions.MinResourceNotSatisfiedException;
import org.apache.heron.spi.packing.PackingPlan;
-public class MinRamConstraint implements InstanceConstraint {
- private static final ByteAmount MIN_RAM_PER_INSTANCE = ByteAmount.fromMegabytes(192);
+public class MinCpuConstraint implements InstanceConstraint {
+ private static final double MIN_CPU_PER_INSTANCE = 0.1;
@Override
- public void test(PackingPlan.InstancePlan instancePlan) throws MinResourceNotSatisfiedException {
- if (instancePlan.getResource().getRam().lessThan(MIN_RAM_PER_INSTANCE)) {
+ public void validate(PackingPlan.InstancePlan instancePlan)
+ throws MinResourceNotSatisfiedException {
+ if (instancePlan.getResource().getCpu() < MIN_CPU_PER_INSTANCE) {
throw new MinResourceNotSatisfiedException(String.format("Instance %s is "
- + "configured %s RAM that is less than the minimum RAM per instance %s",
- instancePlan.getComponentName(), instancePlan.getResource().getRam().toString(),
- MIN_RAM_PER_INSTANCE.toString()));
+ + "configured %.3f CPU that is less than the minimum CPU per instance %.3f",
+ instancePlan.getComponentName(), instancePlan.getResource().getCpu(),
+ MIN_CPU_PER_INSTANCE));
}
}
}
diff --git a/heron/packing/src/java/org/apache/heron/packing/constraints/MinRamConstraint.java b/heron/packing/src/java/org/apache/heron/packing/constraints/MinRamConstraint.java
index 39d9d5c..0738c6c 100644
--- a/heron/packing/src/java/org/apache/heron/packing/constraints/MinRamConstraint.java
+++ b/heron/packing/src/java/org/apache/heron/packing/constraints/MinRamConstraint.java
@@ -27,7 +27,8 @@ public class MinRamConstraint implements InstanceConstraint {
private static final ByteAmount MIN_RAM_PER_INSTANCE = ByteAmount.fromMegabytes(192);
@Override
- public void test(PackingPlan.InstancePlan instancePlan) throws MinResourceNotSatisfiedException {
+ public void validate(PackingPlan.InstancePlan instancePlan)
+ throws MinResourceNotSatisfiedException {
if (instancePlan.getResource().getRam().lessThan(MIN_RAM_PER_INSTANCE)) {
throw new MinResourceNotSatisfiedException(String.format("Instance %s is "
+ "configured %s RAM that is less than the minimum RAM per instance %s",
diff --git a/heron/packing/src/java/org/apache/heron/packing/constraints/PackingConstraint.java b/heron/packing/src/java/org/apache/heron/packing/constraints/PackingConstraint.java
index dd21f9d..2b7aeda 100644
--- a/heron/packing/src/java/org/apache/heron/packing/constraints/PackingConstraint.java
+++ b/heron/packing/src/java/org/apache/heron/packing/constraints/PackingConstraint.java
@@ -24,6 +24,6 @@ import org.apache.heron.packing.exceptions.ConstraintViolationException;
import org.apache.heron.spi.packing.PackingPlan;
public interface PackingConstraint {
- void test(Container container, PackingPlan.InstancePlan instancePlan)
+ void validate(Container container, PackingPlan.InstancePlan instancePlan)
throws ConstraintViolationException;
}
diff --git a/heron/packing/src/java/org/apache/heron/packing/constraints/ResourceConstraint.java b/heron/packing/src/java/org/apache/heron/packing/constraints/ResourceConstraint.java
index 5b7700c..87198e3 100644
--- a/heron/packing/src/java/org/apache/heron/packing/constraints/ResourceConstraint.java
+++ b/heron/packing/src/java/org/apache/heron/packing/constraints/ResourceConstraint.java
@@ -26,7 +26,7 @@ import org.apache.heron.spi.packing.Resource;
public class ResourceConstraint implements PackingConstraint {
@Override
- public void test(Container container, PackingPlan.InstancePlan instancePlan)
+ public void validate(Container container, PackingPlan.InstancePlan instancePlan)
throws ResourceExceededException {
Resource usedResource = container.getTotalUsedResources();
Resource newUsedResource = usedResource.plus(instancePlan.getResource());
diff --git a/heron/packing/src/java/org/apache/heron/packing/constraints/InstanceConstraint.java b/heron/packing/src/java/org/apache/heron/packing/exceptions/TooManyInstancesException.java
similarity index 68%
copy from heron/packing/src/java/org/apache/heron/packing/constraints/InstanceConstraint.java
copy to heron/packing/src/java/org/apache/heron/packing/exceptions/TooManyInstancesException.java
index d7ddeed..e688900 100644
--- a/heron/packing/src/java/org/apache/heron/packing/constraints/InstanceConstraint.java
+++ b/heron/packing/src/java/org/apache/heron/packing/exceptions/TooManyInstancesException.java
@@ -17,11 +17,17 @@
* under the License.
*/
-package org.apache.heron.packing.constraints;
+package org.apache.heron.packing.exceptions;
-import org.apache.heron.packing.exceptions.ConstraintViolationException;
-import org.apache.heron.spi.packing.PackingPlan;
+/**
+ * Thrown to indicate that the resources required are not available
+ */
+public class TooManyInstancesException extends ConstraintViolationException {
+ public TooManyInstancesException(String message) {
+ super(message);
+ }
-public interface InstanceConstraint {
- void test(PackingPlan.InstancePlan instancePlan) throws ConstraintViolationException;
+ public TooManyInstancesException(String message, Throwable cause) {
+ super(message, cause);
+ }
}
diff --git a/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java b/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
index 3af3092..b28d71f 100644
--- a/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
+++ b/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
@@ -36,7 +36,8 @@ import org.apache.heron.api.utils.TopologyUtils;
import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.common.basics.CPUShare;
import org.apache.heron.common.basics.ResourceMeasure;
-import org.apache.heron.packing.builder.RamRequirement;
+import org.apache.heron.packing.builder.ResourceRequirement;
+import org.apache.heron.packing.builder.SortingStrategy;
import org.apache.heron.spi.common.Config;
import org.apache.heron.spi.common.Context;
import org.apache.heron.spi.packing.IPacking;
@@ -377,7 +378,7 @@ public class RoundRobinPacking implements IPacking, IRepacking {
// To ensure we spread out the big instances first
// Only sorting by RAM here because only RAM can be explicitly capped by JVM processes
List<String> sortedInstances = getSortedRAMComponents(parallelismMap.keySet()).stream()
- .map(RamRequirement::getComponentName).collect(Collectors.toList());
+ .map(ResourceRequirement::getComponentName).collect(Collectors.toList());
for (String component : sortedInstances) {
int numInstance = parallelismMap.get(component);
for (int i = 0; i < numInstance; ++i) {
@@ -395,16 +396,16 @@ public class RoundRobinPacking implements IPacking, IRepacking {
*
* @return The sorted list of components and their RAM requirements
*/
- private ArrayList<RamRequirement> getSortedRAMComponents(Set<String> componentNames) {
- ArrayList<RamRequirement> ramRequirements = new ArrayList<>();
+ private ArrayList<ResourceRequirement> getSortedRAMComponents(Set<String> componentNames) {
+ ArrayList<ResourceRequirement> resourceRequirements = new ArrayList<>();
Map<String, ByteAmount> ramMap = TopologyUtils.getComponentRamMapConfig(topology);
for (String componentName : componentNames) {
- ramRequirements.add(new RamRequirement(componentName,
+ resourceRequirements.add(new ResourceRequirement(componentName,
ramMap.getOrDefault(componentName, ByteAmount.ZERO)));
}
- Collections.sort(ramRequirements, Collections.reverseOrder());
- return ramRequirements;
+ Collections.sort(resourceRequirements, SortingStrategy.RAM_FIRST.reversed());
+ return resourceRequirements;
}
/**
diff --git a/heron/packing/tests/java/org/apache/heron/packing/binpacking/FirstFitDecreasingPackingTest.java b/heron/packing/tests/java/org/apache/heron/packing/binpacking/FirstFitDecreasingPackingTest.java
index 78245ff..8a975f4 100644
--- a/heron/packing/tests/java/org/apache/heron/packing/binpacking/FirstFitDecreasingPackingTest.java
+++ b/heron/packing/tests/java/org/apache/heron/packing/binpacking/FirstFitDecreasingPackingTest.java
@@ -115,20 +115,21 @@ public class FirstFitDecreasingPackingTest extends CommonPackingTests {
PackingPlan packingPlan = doPackingTest(topology,
instanceDefaultResources, boltParallelism,
instanceDefaultResources, spoutParallelism,
- 1, containerResource);
+ 2, containerResource);
for (PackingPlan.ContainerPlan containerPlan : packingPlan.getContainers()) {
- Assert.assertEquals(Math.round(totalInstances * instanceDefaultResources.getCpu()
+ int instanceCount = containerPlan.getInstances().size();
+ Assert.assertEquals(Math.round(instanceCount * instanceDefaultResources.getCpu()
+ padding.getCpu()),
(long) containerPlan.getRequiredResource().getCpu());
Assert.assertEquals(instanceDefaultResources.getRam()
- .multiply(totalInstances)
+ .multiply(instanceCount)
.plus(padding.getRam()),
containerPlan.getRequiredResource().getRam());
Assert.assertEquals(instanceDefaultResources.getDisk()
- .multiply(totalInstances)
+ .multiply(instanceCount)
.plus(padding.getDisk()),
containerPlan.getRequiredResource().getDisk());
@@ -170,7 +171,7 @@ public class FirstFitDecreasingPackingTest extends CommonPackingTests {
doPackingTest(topology,
instanceDefaultResources.cloneWithRam(boltRam), boltParallelism,
instanceDefaultResources.cloneWithRam(spoutRam), spoutParallelism,
- 1, containerResource);
+ 2, containerResource);
}
/**