You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nw...@apache.org on 2019/02/08 23:06:10 UTC
[incubator-heron] branch master updated: init (#3191)
This is an automated email from the ASF dual-hosted git repository.
nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new c066c76 init (#3191)
c066c76 is described below
commit c066c7626d38dd31ec984a95be7dabcee32ec422
Author: Xiaoyao Qian <qi...@illinois.edu>
AuthorDate: Fri Feb 8 15:06:05 2019 -0800
init (#3191)
---
.../binpacking/FirstFitDecreasingPacking.java | 19 +++++----
.../heron/packing/builder/PackingPlanBuilder.java | 45 +++++++++++++++-------
.../roundrobin/ResourceCompliantRRPacking.java | 23 +++++++----
.../apache/heron/packing/CommonPackingTests.java | 4 +-
.../apache/heron/packing/PackingTestHelper.java | 10 ++---
.../packing/builder/PackingPlanBuilderTest.java | 15 ++++----
6 files changed, 74 insertions(+), 42 deletions(-)
diff --git a/heron/packing/src/java/org/apache/heron/packing/binpacking/FirstFitDecreasingPacking.java b/heron/packing/src/java/org/apache/heron/packing/binpacking/FirstFitDecreasingPacking.java
index d889abe..ab9b650 100644
--- a/heron/packing/src/java/org/apache/heron/packing/binpacking/FirstFitDecreasingPacking.java
+++ b/heron/packing/src/java/org/apache/heron/packing/binpacking/FirstFitDecreasingPacking.java
@@ -36,6 +36,9 @@ import org.apache.heron.packing.builder.HomogeneityScorer;
import org.apache.heron.packing.builder.InstanceCountScorer;
import org.apache.heron.packing.builder.PackingPlanBuilder;
import org.apache.heron.packing.builder.Scorer;
+import org.apache.heron.packing.constraints.MinRamConstraint;
+import org.apache.heron.packing.constraints.ResourceConstraint;
+import org.apache.heron.packing.exceptions.ConstraintViolationException;
import org.apache.heron.packing.exceptions.ResourceExceededException;
import org.apache.heron.packing.utils.PackingUtils;
import org.apache.heron.spi.common.Config;
@@ -161,7 +164,9 @@ public class FirstFitDecreasingPacking implements IPacking, IRepacking {
.setMaxContainerResource(maxContainerResources)
.setDefaultInstanceResource(defaultInstanceResources)
.setRequestedContainerPadding(paddingPercentage)
- .setRequestedComponentRam(TopologyUtils.getComponentRamMapConfig(topology));
+ .setRequestedComponentRam(TopologyUtils.getComponentRamMapConfig(topology))
+ .setInstanceConstraints(Collections.singletonList(new MinRamConstraint()))
+ .setPackingConstraints(Collections.singletonList(new ResourceConstraint()));
}
/**
@@ -176,7 +181,7 @@ public class FirstFitDecreasingPacking implements IPacking, IRepacking {
// Get the instances using FFD allocation
try {
planBuilder = getFFDAllocation(planBuilder);
- } catch (ResourceExceededException e) {
+ } catch (ConstraintViolationException e) {
throw new PackingException("Could not allocate all instances to packing plan", e);
}
@@ -193,7 +198,7 @@ public class FirstFitDecreasingPacking implements IPacking, IRepacking {
// Get the instances using FFD allocation
try {
planBuilder = getFFDAllocation(planBuilder, currentPackingPlan, componentChanges);
- } catch (ResourceExceededException e) {
+ } catch (ConstraintViolationException e) {
throw new PackingException("Could not repack instances into existing packing plan", e);
}
@@ -239,7 +244,7 @@ public class FirstFitDecreasingPacking implements IPacking, IRepacking {
* @return Map < containerId, list of InstanceId belonging to this container >
*/
private PackingPlanBuilder getFFDAllocation(PackingPlanBuilder planBuilder)
- throws ResourceExceededException {
+ throws ConstraintViolationException {
Map<String, Integer> parallelismMap = TopologyUtils.getComponentParallelism(topology);
assignInstancesToContainers(planBuilder, parallelismMap);
return planBuilder;
@@ -253,7 +258,7 @@ public class FirstFitDecreasingPacking implements IPacking, IRepacking {
private PackingPlanBuilder getFFDAllocation(PackingPlanBuilder packingPlanBuilder,
PackingPlan currentPackingPlan,
Map<String, Integer> componentChanges)
- throws ResourceExceededException {
+ throws ConstraintViolationException {
this.numContainers = currentPackingPlan.getContainers().size();
Map<String, Integer> componentsToScaleDown =
@@ -279,7 +284,7 @@ public class FirstFitDecreasingPacking implements IPacking, IRepacking {
* @param parallelismMap component parallelism
*/
private void assignInstancesToContainers(PackingPlanBuilder planBuilder,
- Map<String, Integer> parallelismMap) throws ResourceExceededException {
+ Map<String, Integer> parallelismMap) throws ConstraintViolationException {
ArrayList<RamRequirement> ramRequirements = getSortedRAMInstances(parallelismMap.keySet());
for (RamRequirement ramRequirement : ramRequirements) {
String componentName = ramRequirement.getComponentName();
@@ -326,7 +331,7 @@ public class FirstFitDecreasingPacking implements IPacking, IRepacking {
*
*/
private void placeFFDInstance(PackingPlanBuilder planBuilder,
- String componentName) throws ResourceExceededException {
+ String componentName) throws ConstraintViolationException {
if (this.numContainers == 0) {
planBuilder.updateNumContainers(++numContainers);
}
diff --git a/heron/packing/src/java/org/apache/heron/packing/builder/PackingPlanBuilder.java b/heron/packing/src/java/org/apache/heron/packing/builder/PackingPlanBuilder.java
index 633a0f4..64f4b8e 100644
--- a/heron/packing/src/java/org/apache/heron/packing/builder/PackingPlanBuilder.java
+++ b/heron/packing/src/java/org/apache/heron/packing/builder/PackingPlanBuilder.java
@@ -38,6 +38,9 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import org.apache.heron.common.basics.ByteAmount;
+import org.apache.heron.packing.constraints.InstanceConstraint;
+import org.apache.heron.packing.constraints.PackingConstraint;
+import org.apache.heron.packing.exceptions.ConstraintViolationException;
import org.apache.heron.packing.exceptions.ResourceExceededException;
import org.apache.heron.packing.utils.PackingUtils;
import org.apache.heron.spi.packing.InstanceId;
@@ -58,6 +61,8 @@ public class PackingPlanBuilder {
private Map<String, ByteAmount> componentRamMap;
private int requestedContainerPadding;
private int numContainers;
+ private List<PackingConstraint> packingConstraints;
+ private List<InstanceConstraint> instanceConstraints;
private Map<Integer, Container> containers;
private TreeSet<Integer> taskIds; // globally unique ids assigned to instances
@@ -73,6 +78,8 @@ public class PackingPlanBuilder {
this.numContainers = 0;
this.requestedContainerPadding = 0;
this.componentRamMap = new HashMap<>();
+ this.packingConstraints = new ArrayList<>();
+ this.instanceConstraints = new ArrayList<>();
}
// set resource settings
@@ -96,6 +103,16 @@ public class PackingPlanBuilder {
return this;
}
+ public PackingPlanBuilder setPackingConstraints(List<PackingConstraint> packingConstraintLst) {
+ this.packingConstraints = packingConstraintLst;
+ return this;
+ }
+
+ public PackingPlanBuilder setInstanceConstraints(List<InstanceConstraint> instanceConstraintLst) {
+ this.instanceConstraints = instanceConstraintLst;
+ return this;
+ }
+
// Calling updateNumContainers will produce that many containers, starting with id 1. The build()
// method will prune out empty containers from the plan.
public PackingPlanBuilder updateNumContainers(int count) {
@@ -107,7 +124,7 @@ public class PackingPlanBuilder {
// be lazily initialized, which could result in more containers than those requested using the
// updateNumContainers method
public PackingPlanBuilder addInstance(Integer containerId,
- String componentName) throws ResourceExceededException {
+ String componentName) throws ConstraintViolationException {
initContainer(containerId);
Integer taskId = taskIds.isEmpty() ? 1 : taskIds.last() + 1;
@@ -120,15 +137,18 @@ public class PackingPlanBuilder {
componentName, this.componentRamMap, this.defaultInstanceResource,
this.maxContainerResource, this.requestedContainerPadding);
- try {
- addToContainer(containers.get(containerId),
- new PackingPlan.InstancePlan(instanceId, instanceResource),
- this.componentIndexes, this.taskIds);
- } catch (ResourceExceededException e) {
- throw new ResourceExceededException(String.format(
- "Insufficient container resources to add instance %s with resources %s to container %d.",
- instanceId, instanceResource, containerId), e);
+ Container container = containers.get(containerId);
+ PackingPlan.InstancePlan instancePlan
+ = new PackingPlan.InstancePlan(instanceId, instanceResource);
+
+ // Check constraints
+ for (InstanceConstraint constraint : instanceConstraints) {
+ constraint.test(instancePlan);
}
+ for (PackingConstraint constraint : packingConstraints) {
+ constraint.test(container, instancePlan);
+ }
+ addToContainer(container, instancePlan, this.componentIndexes, this.taskIds);
LOG.finest(String.format("Added to container %d instance %s", containerId, instanceId));
return this;
@@ -161,7 +181,7 @@ public class PackingPlanBuilder {
try {
addInstance(container.getContainerId(), componentName);
return container.getContainerId();
- } catch (ResourceExceededException e) {
+ } catch (ConstraintViolationException e) {
// ignore since we'll continue trying
}
}
@@ -423,9 +443,8 @@ public class PackingPlanBuilder {
container.add(instancePlan);
String componentName = instancePlan.getComponentName();
- if (componentIndexes.get(componentName) == null) {
- componentIndexes.put(componentName, new TreeSet<Integer>());
- }
+ // update componentIndex and taskIds
+ componentIndexes.computeIfAbsent(componentName, k -> new TreeSet<Integer>());
componentIndexes.get(componentName).add(instancePlan.getComponentIndex());
taskIds.add(instancePlan.getTaskId());
}
diff --git a/heron/packing/src/java/org/apache/heron/packing/roundrobin/ResourceCompliantRRPacking.java b/heron/packing/src/java/org/apache/heron/packing/roundrobin/ResourceCompliantRRPacking.java
index 1dcb900..df9718e 100644
--- a/heron/packing/src/java/org/apache/heron/packing/roundrobin/ResourceCompliantRRPacking.java
+++ b/heron/packing/src/java/org/apache/heron/packing/roundrobin/ResourceCompliantRRPacking.java
@@ -20,6 +20,7 @@
package org.apache.heron.packing.roundrobin;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger;
@@ -33,6 +34,9 @@ import org.apache.heron.packing.builder.HomogeneityScorer;
import org.apache.heron.packing.builder.InstanceCountScorer;
import org.apache.heron.packing.builder.PackingPlanBuilder;
import org.apache.heron.packing.builder.Scorer;
+import org.apache.heron.packing.constraints.MinRamConstraint;
+import org.apache.heron.packing.constraints.ResourceConstraint;
+import org.apache.heron.packing.exceptions.ConstraintViolationException;
import org.apache.heron.packing.exceptions.ResourceExceededException;
import org.apache.heron.packing.utils.PackingUtils;
import org.apache.heron.spi.common.Config;
@@ -165,7 +169,9 @@ public class ResourceCompliantRRPacking implements IPacking, IRepacking {
.setMaxContainerResource(maxContainerResources)
.setDefaultInstanceResource(defaultInstanceResources)
.setRequestedContainerPadding(paddingPercentage)
- .setRequestedComponentRam(TopologyUtils.getComponentRamMapConfig(topology));
+ .setRequestedComponentRam(TopologyUtils.getComponentRamMapConfig(topology))
+ .setInstanceConstraints(Collections.singletonList(new MinRamConstraint()))
+ .setPackingConstraints(Collections.singletonList(new ResourceConstraint()));
}
@Override
@@ -179,7 +185,7 @@ public class ResourceCompliantRRPacking implements IPacking, IRepacking {
return planBuilder.build();
- } catch (ResourceExceededException e) {
+ } catch (ConstraintViolationException e) {
//Not enough containers. Adjust the number of containers.
LOG.finest(String.format(
"%s Increasing the number of containers to %s and attempting to place again.",
@@ -216,7 +222,7 @@ public class ResourceCompliantRRPacking implements IPacking, IRepacking {
return planBuilder.build();
- } catch (ResourceExceededException e) {
+ } catch (ConstraintViolationException e) {
//Not enough containers. Adjust the number of containers.
increaseNumContainers(1);
resetToFirstContainer();
@@ -258,7 +264,7 @@ public class ResourceCompliantRRPacking implements IPacking, IRepacking {
}
private PackingPlanBuilder getResourceCompliantRRAllocation(
- PackingPlanBuilder planBuilder) throws ResourceExceededException {
+ PackingPlanBuilder planBuilder) throws ConstraintViolationException {
Map<String, Integer> parallelismMap = TopologyUtils.getComponentParallelism(topology);
int totalInstances = TopologyUtils.getTotalInstance(topology);
@@ -282,7 +288,7 @@ public class ResourceCompliantRRPacking implements IPacking, IRepacking {
*/
private PackingPlanBuilder getResourceCompliantRRAllocation(
PackingPlanBuilder planBuilder, Map<String, Integer> componentChanges)
- throws ResourceExceededException {
+ throws ConstraintViolationException {
Map<String, Integer> componentsToScaleDown =
PackingUtils.getComponentsToScale(componentChanges, PackingUtils.ScalingDirection.DOWN);
@@ -309,7 +315,8 @@ public class ResourceCompliantRRPacking implements IPacking, IRepacking {
*/
private void assignInstancesToContainers(PackingPlanBuilder planBuilder,
Map<String, Integer> parallelismMap,
- PolicyType policyType) throws ResourceExceededException {
+ PolicyType policyType)
+ throws ConstraintViolationException {
for (String componentName : parallelismMap.keySet()) {
int numInstance = parallelismMap.get(componentName);
for (int i = 0; i < numInstance; ++i) {
@@ -326,7 +333,7 @@ public class ResourceCompliantRRPacking implements IPacking, IRepacking {
* @throws ResourceExceededException if there is no room on the current container for the instance
*/
private void strictRRpolicy(PackingPlanBuilder planBuilder,
- String componentName) throws ResourceExceededException {
+ String componentName) throws ConstraintViolationException {
planBuilder.addInstance(this.containerId, componentName);
this.containerId = nextContainerId(this.containerId);
}
@@ -383,7 +390,7 @@ public class ResourceCompliantRRPacking implements IPacking, IRepacking {
private void assignInstance(PackingPlanBuilder planBuilder,
String componentName,
ResourceCompliantRRPacking packing)
- throws ResourceExceededException, RuntimeException {
+ throws ConstraintViolationException, RuntimeException {
switch (this) {
case STRICT:
packing.strictRRpolicy(planBuilder, componentName);
diff --git a/heron/packing/tests/java/org/apache/heron/packing/CommonPackingTests.java b/heron/packing/tests/java/org/apache/heron/packing/CommonPackingTests.java
index 98e16f9..97aaeec 100644
--- a/heron/packing/tests/java/org/apache/heron/packing/CommonPackingTests.java
+++ b/heron/packing/tests/java/org/apache/heron/packing/CommonPackingTests.java
@@ -29,7 +29,7 @@ import org.apache.heron.api.generated.TopologyAPI;
import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.common.basics.Pair;
import org.apache.heron.common.utils.topology.TopologyTests;
-import org.apache.heron.packing.exceptions.ResourceExceededException;
+import org.apache.heron.packing.exceptions.ConstraintViolationException;
import org.apache.heron.spi.common.Config;
import org.apache.heron.spi.common.Context;
import org.apache.heron.spi.packing.IPacking;
@@ -365,7 +365,7 @@ public abstract class CommonPackingTests {
protected void doScaleDownTest(Pair<Integer, InstanceId>[] initialComponentInstances,
Map<String, Integer> componentChanges,
Pair<Integer, InstanceId>[] expectedComponentInstances)
- throws ResourceExceededException {
+ throws ConstraintViolationException {
String topologyId = this.topology.getId();
// The padding percentage used in repack() must be <= one as used in pack(), otherwise we can't
diff --git a/heron/packing/tests/java/org/apache/heron/packing/PackingTestHelper.java b/heron/packing/tests/java/org/apache/heron/packing/PackingTestHelper.java
index 530b5d2..d4dc6fe 100644
--- a/heron/packing/tests/java/org/apache/heron/packing/PackingTestHelper.java
+++ b/heron/packing/tests/java/org/apache/heron/packing/PackingTestHelper.java
@@ -22,7 +22,7 @@ package org.apache.heron.packing;
import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.common.basics.Pair;
import org.apache.heron.packing.builder.PackingPlanBuilder;
-import org.apache.heron.packing.exceptions.ResourceExceededException;
+import org.apache.heron.packing.exceptions.ConstraintViolationException;
import org.apache.heron.spi.packing.InstanceId;
import org.apache.heron.spi.packing.PackingPlan;
import org.apache.heron.spi.packing.Resource;
@@ -37,7 +37,7 @@ public final class PackingTestHelper {
public static PackingPlan createTestPackingPlan(String topologyName,
Pair<Integer, String>[] instances,
int containerPadding)
- throws ResourceExceededException {
+ throws ConstraintViolationException {
return generateTestPackingPlan(topologyName, null, instances, null, containerPadding);
}
@@ -45,7 +45,7 @@ public final class PackingTestHelper {
PackingPlan previousPackingPlan,
Pair<Integer, String>[] instances,
int containerPadding)
- throws ResourceExceededException {
+ throws ConstraintViolationException {
return generateTestPackingPlan(
topologyName, previousPackingPlan, instances, null, containerPadding);
}
@@ -54,7 +54,7 @@ public final class PackingTestHelper {
PackingPlan previousPackingPlan,
Pair<Integer, String>[] instances,
int containerPadding)
- throws ResourceExceededException {
+ throws ConstraintViolationException {
return generateTestPackingPlan(
topologyName, previousPackingPlan, null, instances, containerPadding);
}
@@ -68,7 +68,7 @@ public final class PackingTestHelper {
Pair<Integer, String>[] addInstances,
Pair<Integer, String>[] removeInstances,
int containerPadding)
- throws ResourceExceededException {
+ throws ConstraintViolationException {
PackingPlanBuilder builder = new PackingPlanBuilder(topologyName, previousPackingPlan);
int instanceCount = 0;
diff --git a/heron/packing/tests/java/org/apache/heron/packing/builder/PackingPlanBuilderTest.java b/heron/packing/tests/java/org/apache/heron/packing/builder/PackingPlanBuilderTest.java
index 24123ce..dfd0c85 100644
--- a/heron/packing/tests/java/org/apache/heron/packing/builder/PackingPlanBuilderTest.java
+++ b/heron/packing/tests/java/org/apache/heron/packing/builder/PackingPlanBuilderTest.java
@@ -35,6 +35,7 @@ import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.common.basics.Pair;
import org.apache.heron.packing.AssertPacking;
import org.apache.heron.packing.PackingTestHelper;
+import org.apache.heron.packing.exceptions.ConstraintViolationException;
import org.apache.heron.packing.exceptions.ResourceExceededException;
import org.apache.heron.spi.packing.InstanceId;
import org.apache.heron.spi.packing.PackingException;
@@ -190,12 +191,12 @@ public class PackingPlanBuilderTest {
PackingTestHelper.toContainerIdComponentNames(testContainerInstances);
@Test
- public void testBuildPackingPlan() throws ResourceExceededException {
+ public void testBuildPackingPlan() throws ConstraintViolationException {
doCreatePackingPlanTest(testContainerInstances);
}
@Test
- public void testAddToPackingPlan() throws ResourceExceededException {
+ public void testAddToPackingPlan() throws ConstraintViolationException {
PackingPlan plan = doCreatePackingPlanTest(testContainerInstances);
@SuppressWarnings({"unchecked", "rawtypes"})
@@ -211,7 +212,7 @@ public class PackingPlanBuilderTest {
}
@Test(expected = ResourceExceededException.class)
- public void testExceededCapacityAddingToPackingPlan() throws ResourceExceededException {
+ public void testExceededCapacityAddingToPackingPlan() throws ConstraintViolationException {
PackingPlan plan = doCreatePackingPlanTest(testContainerInstances);
@SuppressWarnings({"unchecked", "rawtypes"})
@@ -223,7 +224,7 @@ public class PackingPlanBuilderTest {
}
@Test
- public void testRemoveFromPackingPlan() throws ResourceExceededException {
+ public void testRemoveFromPackingPlan() throws ConstraintViolationException {
PackingPlan plan = doCreatePackingPlanTest(testContainerInstances);
@SuppressWarnings({"unchecked", "rawtypes"})
@@ -242,7 +243,7 @@ public class PackingPlanBuilderTest {
}
@Test(expected = PackingException.class)
- public void testInvalidContainerRemoveFromPackingPlan() throws ResourceExceededException {
+ public void testInvalidContainerRemoveFromPackingPlan() throws ConstraintViolationException {
PackingPlan plan = doCreatePackingPlanTest(testContainerInstances);
@SuppressWarnings({"unchecked", "rawtypes"})
@@ -253,7 +254,7 @@ public class PackingPlanBuilderTest {
}
@Test(expected = PackingException.class)
- public void testInvalidComponentRemoveFromPackingPlan() throws ResourceExceededException {
+ public void testInvalidComponentRemoveFromPackingPlan() throws ConstraintViolationException {
PackingPlan plan = doCreatePackingPlanTest(testContainerInstances);
@SuppressWarnings({"unchecked", "rawtypes"})
@@ -264,7 +265,7 @@ public class PackingPlanBuilderTest {
}
private static PackingPlan doCreatePackingPlanTest(
- Pair<Integer, InstanceId>[] instances) throws ResourceExceededException {
+ Pair<Integer, InstanceId>[] instances) throws ConstraintViolationException {
PackingPlan plan = PackingTestHelper.createTestPackingPlan(
TOPOLOGY_ID, PackingTestHelper.toContainerIdComponentNames(instances), 0);
AssertPacking.assertPackingPlan(TOPOLOGY_ID, instances, plan);