You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/03/11 23:10:44 UTC
[storm] branch master updated: [STORM-3585] New compact Constraint
config including maxNodeCoLocationCnt and incompatibleComponents (#3215)
This is an automated email from the ASF dual-hosted git repository.
ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 1137f61 [STORM-3585] New compact Constraint config including maxNodeCoLocationCnt and incompatibleComponents (#3215)
1137f61 is described below
commit 1137f61aad3e277f102fb8baf6ee24a5d183da32
Author: Bipin Prasad <bi...@yahoo.com>
AuthorDate: Wed Mar 11 18:10:30 2020 -0500
[STORM-3585] New compact Constraint config including maxNodeCoLocationCnt and incompatibleComponents (#3215)
---
storm-client/src/jvm/org/apache/storm/Config.java | 35 +-
.../apache/storm/validation/ConfigValidation.java | 133 ++++++-
.../validation/ConfigValidationAnnotations.java | 12 +
.../jvm/org/apache/storm/TestConfigValidate.java | 143 ++++++-
.../apache/storm/scheduler/resource/RasNode.java | 7 +-
.../scheduling/ConstraintSolverStrategy.java | 411 ++++++++++++++-------
.../scheduling/TestConstraintSolverStrategy.java | 284 +++++++++++---
7 files changed, 829 insertions(+), 196 deletions(-)
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index 7a1f689..2c95521 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -29,6 +29,7 @@ import org.apache.storm.metric.IEventLogger;
import org.apache.storm.policy.IWaitStrategy;
import org.apache.storm.serialization.IKryoDecorator;
import org.apache.storm.serialization.IKryoFactory;
+import org.apache.storm.utils.ShellLogHandler;
import org.apache.storm.utils.Utils;
import org.apache.storm.validation.ConfigValidation;
import org.apache.storm.validation.ConfigValidation.EventLoggerRegistryValidator;
@@ -36,9 +37,11 @@ import org.apache.storm.validation.ConfigValidation.ListOfListOfStringValidator;
import org.apache.storm.validation.ConfigValidation.MapOfStringToMapOfStringToObjectValidator;
import org.apache.storm.validation.ConfigValidation.MetricRegistryValidator;
import org.apache.storm.validation.ConfigValidation.MetricReportersValidator;
+import org.apache.storm.validation.ConfigValidation.RasConstraintsTypeValidator;
import org.apache.storm.validation.ConfigValidationAnnotations;
import org.apache.storm.validation.ConfigValidationAnnotations.CustomValidator;
import org.apache.storm.validation.ConfigValidationAnnotations.IsBoolean;
+import org.apache.storm.validation.ConfigValidationAnnotations.IsExactlyOneOf;
import org.apache.storm.validation.ConfigValidationAnnotations.IsImplementationOfClass;
import org.apache.storm.validation.ConfigValidationAnnotations.IsInteger;
import org.apache.storm.validation.ConfigValidationAnnotations.IsKryoReg;
@@ -304,16 +307,40 @@ public class Config extends HashMap<String, Object> {
// an error will be thrown by nimbus on topology submission and not by the client prior to submitting
// the topology.
public static final String TOPOLOGY_SCHEDULER_STRATEGY = "topology.scheduler.strategy";
+
/**
- * Declare scheduling constraints for a topology used by the constraint solver strategy. A List of pairs (also a list) of components
- * that cannot coexist in the same worker.
+ * Declare scheduling constraints for a topology used by the constraint solver strategy. The format can be either
+ * old style (validated by ListOfListOfStringValidator.class or the newer style, which is a list of specific type of
+ * Maps (validated by RasConstraintsTypeValidator.class). The value must be in one or the other format.
+ *
+ * <p>
+ * Old style Config.TOPOLOGY_RAS_CONSTRAINTS (ListOfListOfString) specified a list of components that cannot
+ * co-exist on the same Worker.
+ * </p>
+ *
+ * <p>
+ * New style Config.TOPOLOGY_RAS_CONSTRAINTS is map where each component has a list of other incompatible components
+ * (which serves the same function as the old style configuration) and optional number that specifies
+ * the maximum co-location count for the component on a node.
+ * </p>
+ *
+ * <p>comp-1 cannot exist on same worker as comp-2 or comp-3, and at most "2" comp-1 on same node</p>
+ * <p>comp-2 and comp-4 cannot be on same worker (missing comp-1 is implied from comp-1 constraint)</p>
+ *
+ * <p>
+ * { "comp-1": { "maxNodeCoLocationCnt": 2, "incompatibleComponents": ["comp-2", "comp-3" ] },
+ * "comp-2": { "incompatibleComponents": [ "comp-4" ] }
+ * }
+ * </p>
*/
- @CustomValidator(validatorClass = ListOfListOfStringValidator.class)
+ @IsExactlyOneOf(valueValidatorClasses = { ListOfListOfStringValidator.class, RasConstraintsTypeValidator.class })
public static final String TOPOLOGY_RAS_CONSTRAINTS = "topology.ras.constraints";
/**
* Array of components that scheduler should try to place on separate hosts when using the constraint solver strategy or the
- * multi-tenant scheduler.
+ * multi-tenant scheduler. Note that this configuration can be specified in TOPOLOGY_RAS_CONSTRAINTS using the
+ * "maxNodeCoLocationCnt" map entry with value of 1.
*/
+ @Deprecated
@IsStringList
public static final String TOPOLOGY_SPREAD_COMPONENTS = "topology.spread.components";
/**
diff --git a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
index eb4ce7d..4bf7432 100644
--- a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
+++ b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
@@ -27,6 +27,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
+
import org.apache.storm.Config;
import org.apache.storm.utils.Utils;
import org.apache.storm.validation.ConfigValidationAnnotations.ValidatorParams;
@@ -586,8 +588,8 @@ public class ConfigValidation {
((Validator) v).validateField(name + " list entry", entry);
} else {
LOG.warn(
- "validator: {} cannot be used in ListEntryCustomValidator. Individual entry validators must a instance of "
- + "Validator class",
+ "validator: {} cannot be used in ListEntryCustomValidator. "
+ + "Individual entry validators must be an instance of Validator class",
validator.getName());
}
}
@@ -656,8 +658,8 @@ public class ConfigValidation {
((Validator) keyValidator).validateField(name + " Map key", entry.getKey());
} else {
LOG.warn(
- "validator: {} cannot be used in MapEntryCustomValidator to validate keys. Individual entry validators must "
- + "a instance of Validator class",
+ "validator: {} cannot be used in MapEntryCustomValidator to validate keys. "
+ + "Individual entry validators must be an instance of Validator class",
kv.getName());
}
}
@@ -667,8 +669,8 @@ public class ConfigValidation {
((Validator) valueValidator).validateField(name + " Map value", entry.getValue());
} else {
LOG.warn(
- "validator: {} cannot be used in MapEntryCustomValidator to validate values. Individual entry validators "
- + "must a instance of Validator class",
+ "validator: {} cannot be used in MapEntryCustomValidator to validate values. "
+ + "Individual entry validators must be an instance of Validator class",
vv.getName());
}
}
@@ -850,6 +852,125 @@ public class ConfigValidation {
}
}
+ public static class CustomIsExactlyOneOfValidators extends Validator {
+ private Class<?>[] subValidators;
+ private List<String> validatorClassNames;
+
+ public CustomIsExactlyOneOfValidators(Map<String, Object> params) {
+ this.subValidators = (Class<?>[]) params.get(ConfigValidationAnnotations.ValidatorParams.VALUE_VALIDATOR_CLASSES);
+ this.validatorClassNames = Arrays.asList(subValidators).stream().map(x -> x.getName()).collect(Collectors.toList());
+ }
+
+ @Override
+ public void validateField(String name, Object o) {
+ if (o == null) {
+ return;
+ }
+
+ HashMap<String, Exception> validatorExceptions = new HashMap<>();
+ Set<String> selectedValidators = new HashSet<>();
+ for (Class<?> vv : subValidators) {
+ Object valueValidator;
+ try {
+ valueValidator = vv.getConstructor().newInstance();
+ } catch (Exception ex) {
+ throw new IllegalArgumentException(vv.getName() + " instantiation failure", ex);
+ }
+ if (valueValidator instanceof Validator) {
+ try {
+ ((Validator) valueValidator).validateField(name + " " + vv.getSimpleName() + " value", o);
+ selectedValidators.add(vv.getName());
+ } catch (Exception ex) {
+ // only one will pass, so ignore all validation errors - stored for future use
+ validatorExceptions.put(vv.getName(), ex);
+ }
+ } else {
+ String err = String.format("validator: %s cannot be used in CustomExactlyOneOfValidators to validate values. "
+ + "Individual entry validators must a instance of Validator class", vv.getName());
+ LOG.warn(err);
+ }
+ }
+ // check if one and only one validation succeeded
+ if (selectedValidators.isEmpty()) {
+ String parseErrs = String.join(";\n\t", validatorExceptions.entrySet().stream()
+ .map(e -> String.format("%s:%s", e.getKey(), e.getValue())).collect(Collectors.toList()));
+ String err = String.format("Field %s must be one of %s; parse errors are \n\t%s", name,
+ String.join(", ", validatorClassNames), parseErrs);
+ throw new IllegalArgumentException(err);
+ }
+ if (selectedValidators.size() > 1) {
+ throw new IllegalArgumentException("Field " + name + " must match exactly one of " + String.join(", ", selectedValidators));
+ }
+ }
+ }
+
+ public static class RasConstraintsTypeValidator extends Validator {
+ public static final String CONSTRAINT_TYPE_MAX_NODE_CO_LOCATION_CNT = "maxNodeCoLocationCnt";
+ public static final String CONSTRAINT_TYPE_INCOMPATIBLE_COMPONENTS = "incompatibleComponents";
+
+ @Override
+ public void validateField(String name, Object o) {
+ if (o == null) {
+ return;
+ }
+ if (!(o instanceof Map)) {
+ throw new IllegalArgumentException(
+ "Field " + name + " must be an Iterable containing only Map of Maps");
+ }
+ Map<String, Object> map1 = (Map<String, Object>) o;
+ for (Map.Entry<String, Object> entry1: map1.entrySet()) {
+ String comp1 = entry1.getKey();
+ Object o2 = entry1.getValue();
+ if (!(o2 instanceof Map)) {
+ String err = String.format("Field %s, component %s, expecting constraints Map with keys [\"%s\", \"%s\"], in \"%s\"",
+ name, comp1, CONSTRAINT_TYPE_MAX_NODE_CO_LOCATION_CNT, CONSTRAINT_TYPE_INCOMPATIBLE_COMPONENTS, o);
+ throw new IllegalArgumentException(err);
+ }
+ Map<String, Object> map2 = (Map<String, Object>) o2;
+ for (Map.Entry<String, Object> entry2: map2.entrySet()) {
+ String constraintType = entry2.getKey();
+ Object o3 = entry2.getValue();
+ switch (constraintType) {
+ case CONSTRAINT_TYPE_MAX_NODE_CO_LOCATION_CNT:
+ try {
+ Integer.parseInt("" + o3);
+ } catch (Exception ex) {
+ String err = String.format("Field %s, component %s, constraint %s should be a number, not \"%s\"",
+ name, comp1, constraintType, o3);
+ throw new IllegalArgumentException(err);
+ }
+ break;
+
+ case CONSTRAINT_TYPE_INCOMPATIBLE_COMPONENTS:
+ if (o3 instanceof String) {
+ break;
+ } else if (o3 instanceof List) {
+ for (Object otherComp : (List) o3) {
+ if (otherComp instanceof String) {
+ continue;
+ }
+ String err = String.format(
+ "Field %s, component %s, constraintType \"%s\", expecting incompatible component-name, "
+ + "found instance of class \"%s\" value \"%s\"",
+ name, comp1, constraintType, o3.getClass().getName(), otherComp);
+ throw new IllegalArgumentException(err);
+ }
+ }
+ break;
+
+ default:
+ String err = String.format(
+ "Field %s, component %s, has unsupported constraintType \"%s\", expecting one of [\"%s\", \"%s\"], "
+ + "in \"%s\"",
+ name, comp1, constraintType, CONSTRAINT_TYPE_MAX_NODE_CO_LOCATION_CNT,
+ CONSTRAINT_TYPE_INCOMPATIBLE_COMPONENTS, o);
+ throw new IllegalArgumentException(err);
+ }
+ }
+ }
+ }
+ }
+
public static class UserResourcePoolEntryValidator extends Validator {
@Override
diff --git a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidationAnnotations.java b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidationAnnotations.java
index ac8ba9d..ce22c3c 100644
--- a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidationAnnotations.java
+++ b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidationAnnotations.java
@@ -197,6 +197,18 @@ public class ConfigValidationAnnotations {
Class<?> validatorClass();
}
+ /**
+ * Custom validator where exactly one of the validations must be successful.
+ * Used for overloaded configuration, where value must match one (and exactly one)
+ * format of supplied values.
+ */
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target(ElementType.FIELD)
+ public @interface IsExactlyOneOf {
+ Class<?> validatorClass() default ConfigValidation.CustomIsExactlyOneOfValidators.class;
+ Class<?>[] valueValidatorClasses();
+ }
+
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface Password {
diff --git a/storm-client/test/jvm/org/apache/storm/TestConfigValidate.java b/storm-client/test/jvm/org/apache/storm/TestConfigValidate.java
index 2ece764..dedf937 100644
--- a/storm-client/test/jvm/org/apache/storm/TestConfigValidate.java
+++ b/storm-client/test/jvm/org/apache/storm/TestConfigValidate.java
@@ -26,7 +26,10 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import javax.security.auth.Subject;
+
import org.apache.storm.blobstore.BlobStore;
import org.apache.storm.blobstore.NimbusBlobStore;
import org.apache.storm.generated.AuthorizationException;
@@ -41,13 +44,15 @@ import org.apache.storm.validation.ConfigValidation.ImpersonationAclUserEntryVal
import org.apache.storm.validation.ConfigValidation.IntegerValidator;
import org.apache.storm.validation.ConfigValidation.KryoRegValidator;
import org.apache.storm.validation.ConfigValidation.ListEntryTypeValidator;
+import org.apache.storm.validation.ConfigValidation.ListOfListOfStringValidator;
import org.apache.storm.validation.ConfigValidation.NoDuplicateInListValidator;
import org.apache.storm.validation.ConfigValidation.NotNullValidator;
import org.apache.storm.validation.ConfigValidation.PositiveNumberValidator;
import org.apache.storm.validation.ConfigValidation.PowerOf2Validator;
+import org.apache.storm.validation.ConfigValidation.RasConstraintsTypeValidator;
import org.apache.storm.validation.ConfigValidation.StringValidator;
import org.apache.storm.validation.ConfigValidation.UserResourcePoolEntryValidator;
-import org.apache.storm.validation.ConfigValidationAnnotations.NotNull;
+import org.apache.storm.validation.ConfigValidationAnnotations.IsExactlyOneOf;
import org.apache.storm.validation.ConfigValidationAnnotations.IsImplementationOfClass;
import org.apache.storm.validation.ConfigValidationAnnotations.IsListEntryCustom;
import org.apache.storm.validation.ConfigValidationAnnotations.IsListEntryType;
@@ -55,6 +60,8 @@ import org.apache.storm.validation.ConfigValidationAnnotations.IsMapEntryCustom;
import org.apache.storm.validation.ConfigValidationAnnotations.IsMapEntryType;
import org.apache.storm.validation.ConfigValidationAnnotations.IsNoDuplicateInList;
import org.apache.storm.validation.ConfigValidationAnnotations.IsString;
+import org.apache.storm.validation.ConfigValidationAnnotations.NotNull;
+
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
@@ -178,7 +185,6 @@ public class TestConfigValidate {
ConfigValidation.validateFields(conf);
}
-
@Test
public void testWorkerChildoptsIsStringOrStringList() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException,
InstantiationException, IllegalAccessException {
@@ -542,6 +548,135 @@ public class TestConfigValidate {
}
@Test
+ public void testExactlyOneOfCustomAnnotation() {
+ TestConfig config = new TestConfig();
+ Collection<Object> passCases = new LinkedList<Object>();
+ Collection<Object> failCases = new LinkedList<Object>();
+
+ List<Object> passCaseListOfList = new ArrayList<>();
+ passCaseListOfList.add(Arrays.asList("comp1", "comp2"));
+ passCaseListOfList.add(Arrays.asList("comp1", "comp3"));
+ passCaseListOfList.add(Arrays.asList("comp2", "comp4"));
+ passCaseListOfList.add(Arrays.asList("comp2", "comp5"));
+
+ Map<Object, Object> passCaseMapOfMap = new HashMap<>();
+ passCaseMapOfMap.put("comp1",
+ Stream.of(new Object[][] {
+ { RasConstraintsTypeValidator.CONSTRAINT_TYPE_MAX_NODE_CO_LOCATION_CNT, 10 },
+ { RasConstraintsTypeValidator.CONSTRAINT_TYPE_INCOMPATIBLE_COMPONENTS, Arrays.asList("comp2", "comp3")},
+ }).collect(Collectors.toMap(data -> data[0], data -> data[1]))
+ );
+ passCaseMapOfMap.put("comp2",
+ Stream.of(new Object[][] {
+ { RasConstraintsTypeValidator.CONSTRAINT_TYPE_MAX_NODE_CO_LOCATION_CNT, 2 },
+ { RasConstraintsTypeValidator.CONSTRAINT_TYPE_INCOMPATIBLE_COMPONENTS, Arrays.asList("comp4", "comp5")},
+ }).collect(Collectors.toMap(data -> data[0], data -> data[1]))
+ );
+ passCases.add(passCaseMapOfMap);
+
+ passCaseMapOfMap = new HashMap<>();
+ passCaseMapOfMap.put("comp1",
+ Stream.of(new Object[][] {
+ { RasConstraintsTypeValidator.CONSTRAINT_TYPE_INCOMPATIBLE_COMPONENTS, Arrays.asList("comp2", "comp3")},
+ }).collect(Collectors.toMap(data -> data[0], data -> data[1]))
+ );
+ passCaseMapOfMap.put("comp2",
+ Stream.of(new Object[][] {
+ { RasConstraintsTypeValidator.CONSTRAINT_TYPE_MAX_NODE_CO_LOCATION_CNT, 2 },
+ { RasConstraintsTypeValidator.CONSTRAINT_TYPE_INCOMPATIBLE_COMPONENTS, Arrays.asList("comp4", "comp5")},
+ }).collect(Collectors.toMap(data -> data[0], data -> data[1]))
+ );
+ passCases.add(passCaseMapOfMap);
+
+ passCaseMapOfMap = new HashMap<>();
+ passCaseMapOfMap.put("comp1",
+ Stream.of(new Object[][] {
+ { RasConstraintsTypeValidator.CONSTRAINT_TYPE_INCOMPATIBLE_COMPONENTS, "comp2"},
+ }).collect(Collectors.toMap(data -> data[0], data -> data[1]))
+ );
+ passCaseMapOfMap.put("comp2",
+ Stream.of(new Object[][] {
+ { RasConstraintsTypeValidator.CONSTRAINT_TYPE_MAX_NODE_CO_LOCATION_CNT, 2 },
+ { RasConstraintsTypeValidator.CONSTRAINT_TYPE_INCOMPATIBLE_COMPONENTS, "comp4"},
+ }).collect(Collectors.toMap(data -> data[0], data -> data[1]))
+ );
+ passCases.add(passCaseMapOfMap);
+
+ for (Object value : passCases) {
+ config.put(TestConfig.TEST_MAP_CONFIG_9, value);
+ ConfigValidation.validateFields(config, Arrays.asList(TestConfig.class));
+ }
+
+ List<Object> failCaseList = new ArrayList<>();
+ failCaseList.add(Arrays.asList("comp1", Arrays.asList("comp2", "comp3")));
+ failCaseList.add(Arrays.asList("comp3", Arrays.asList("comp4", "comp5")));
+ failCases.add(failCaseList);
+
+ Map<String, Object> failCaseMapOfMap = new HashMap<>();
+ failCaseMapOfMap.put("comp1",
+ Stream.of(new Object[][] {
+ { RasConstraintsTypeValidator.CONSTRAINT_TYPE_MAX_NODE_CO_LOCATION_CNT, 10 },
+ { RasConstraintsTypeValidator.CONSTRAINT_TYPE_INCOMPATIBLE_COMPONENTS, Arrays.asList(1, 2, 3)},
+ }).collect(Collectors.toMap(data -> data[0], data -> data[1]))
+ );
+ failCaseMapOfMap.put("comp2",
+ Stream.of(new Object[][] {
+ { RasConstraintsTypeValidator.CONSTRAINT_TYPE_MAX_NODE_CO_LOCATION_CNT, 2 },
+ { RasConstraintsTypeValidator.CONSTRAINT_TYPE_INCOMPATIBLE_COMPONENTS, Arrays.asList("comp4", "comp5")},
+ }).collect(Collectors.toMap(data -> data[0], data -> data[1]))
+ );
+ failCases.add(failCaseMapOfMap);
+
+ failCaseMapOfMap = new HashMap<>();
+ failCaseMapOfMap.put("comp1",
+ Stream.of(new Object[][] {
+ { RasConstraintsTypeValidator.CONSTRAINT_TYPE_MAX_NODE_CO_LOCATION_CNT, 10 },
+ { RasConstraintsTypeValidator.CONSTRAINT_TYPE_INCOMPATIBLE_COMPONENTS, Arrays.asList("comp1", 3)},
+ }).collect(Collectors.toMap(data -> data[0], data -> data[1]))
+ );
+ failCases.add(failCaseMapOfMap);
+
+ failCaseMapOfMap = new HashMap<>();
+ failCaseMapOfMap.put("comp1", Arrays.asList("comp2", "comp3"));
+ failCaseMapOfMap.put("comp2", Arrays.asList("comp4", "comp5"));
+ failCases.add(failCaseMapOfMap);
+
+ failCaseMapOfMap = new HashMap<>();
+ failCaseMapOfMap.put("aaa", "str");
+ failCaseMapOfMap.put("bbb", 6);
+ failCaseMapOfMap.put("ccc", 7);
+ failCases.add(failCaseMapOfMap);
+
+ failCaseMapOfMap = new HashMap<>();
+ failCaseMapOfMap.put("aaa", -1);
+ failCaseMapOfMap.put("bbb", 6);
+ failCaseMapOfMap.put("ccc", 7);
+ failCases.add(failCaseMapOfMap);
+
+ failCaseMapOfMap = new HashMap<>();
+ failCaseMapOfMap.put("aaa", 1);
+ failCaseMapOfMap.put("bbb", 6);
+ failCaseMapOfMap.put("ccc", 7.4);
+ failCases.add(failCaseMapOfMap);
+
+ failCaseMapOfMap = new HashMap<>();
+ failCaseMapOfMap.put("comp1", "comp2");
+ failCaseMapOfMap.put("comp2", "comp4");
+ failCases.add(failCaseMapOfMap);
+
+ failCases.add(null);
+
+ for (Object value : failCases) {
+ try {
+ config.put(TestConfig.TEST_MAP_CONFIG_9, value);
+ ConfigValidation.validateFields(config, Arrays.asList(TestConfig.class));
+ Assert.fail("Expected Exception not Thrown for value: " + value);
+ } catch (IllegalArgumentException Ex) {
+ }
+ }
+ }
+
+ @Test
public void testListEntryTypeAnnotation() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException,
InstantiationException, IllegalAccessException {
TestConfig config = new TestConfig();
@@ -796,5 +931,9 @@ public class TestConfigValidate {
@IsImplementationOfClass(implementsClass = org.apache.storm.networktopography.DNSToSwitchMapping.class)
@NotNull
public static final String TEST_MAP_CONFIG_8 = "test.map.config.8";
+
+ @IsExactlyOneOf(valueValidatorClasses = {ListOfListOfStringValidator.class, RasConstraintsTypeValidator.class})
+ @NotNull
+ public static final String TEST_MAP_CONFIG_9 = "test.map.config.9";
}
}
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RasNode.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RasNode.java
index 2050bc3..63c9417 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RasNode.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RasNode.java
@@ -41,7 +41,7 @@ import org.slf4j.LoggerFactory;
/**
* Represents a single node in the cluster.
*/
-public class RasNode {
+public class RasNode implements Comparable<RasNode> {
private static final Logger LOG = LoggerFactory.getLogger(RasNode.class);
private final String nodeId;
private final Cluster cluster;
@@ -523,4 +523,9 @@ public class RasNode {
return 0.0;
}
}
+
+ @Override
+ public int compareTo(RasNode o) {
+ return this.nodeId.compareTo(o.nodeId);
+ }
}
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
index 81994ba..66d2b15 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
@@ -12,7 +12,10 @@
package org.apache.storm.scheduler.resource.strategies.scheduling;
+import com.google.common.collect.Sets;
+
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
@@ -24,6 +27,7 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.stream.Collectors;
+
import org.apache.storm.Config;
import org.apache.storm.DaemonConfig;
import org.apache.storm.scheduler.Cluster;
@@ -38,71 +42,176 @@ import org.apache.storm.scheduler.resource.SchedulingStatus;
import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.Time;
+import org.apache.storm.validation.ConfigValidation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
- //hard coded max number of states to search
private static final Logger LOG = LoggerFactory.getLogger(ConstraintSolverStrategy.class);
- //constraints and spreads
- private Map<String, Map<String, Integer>> constraintMatrix;
- private HashSet<String> spreadComps = new HashSet<>();
+ public static final String CONSTRAINT_TYPE_MAX_NODE_CO_LOCATION_CNT = "maxNodeCoLocationCnt";
+ public static final String CONSTRAINT_TYPE_INCOMPATIBLE_COMPONENTS = "incompatibleComponents";
- private Map<String, RasNode> nodes;
- private Map<ExecutorDetails, String> execToComp;
- private Map<String, Set<ExecutorDetails>> compToExecs;
- private List<String> favoredNodeIds;
- private List<String> unFavoredNodeIds;
+ /**
+ * Component constraint as derived from configuration.
+ * This is backward compatible and can parse old style Config.TOPOLOGY_RAS_CONSTRAINTS and Config.TOPOLOGY_SPREAD_COMPONENTS.
+ * New style Config.TOPOLOGY_RAS_CONSTRAINTS is map where each component has a list of other incompatible components
+ * and an optional number that specifies the maximum co-location count for the component on a node.
+ *
+ * <p>comp-1 cannot exist on same worker as comp-2 or comp-3, and at most "2" comp-1 on same node</p>
+ * <p>comp-2 and comp-4 cannot be on same worker (missing comp-1 is implied from comp-1 constraint)</p>
+ *
+ * <p>
+ * { "comp-1": { "maxNodeCoLocationCnt": 2, "incompatibleComponents": ["comp-2", "comp-3" ] },
+ * "comp-2": { "incompatibleComponents": [ "comp-4" ] }
+ * }
+ * </p>
+ */
+ public static final class ConstraintConfig {
+ private Map<String, Set<String>> incompatibleComponents = new HashMap<>();
+ private Map<String, Integer> maxCoLocationCnts = new HashMap<>(); // maximum node CoLocationCnt for restricted components
- static Map<String, Map<String, Integer>> getConstraintMap(TopologyDetails topo, Set<String> comps) {
- Map<String, Map<String, Integer>> matrix = new HashMap<>();
- for (String comp : comps) {
- matrix.put(comp, new HashMap<>());
- for (String comp2 : comps) {
- matrix.get(comp).put(comp2, 0);
- }
+ ConstraintConfig(TopologyDetails topo) {
+ // getExecutorToComponent().values() also contains system components
+ this(topo.getConf(), Sets.union(topo.getComponents().keySet(), new HashSet(topo.getExecutorToComponent().values())));
}
- List<List<String>> constraints = (List<List<String>>) topo.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINTS);
- if (constraints != null) {
- for (List<String> constraintPair : constraints) {
- String comp1 = constraintPair.get(0);
- String comp2 = constraintPair.get(1);
- if (!matrix.containsKey(comp1)) {
- LOG.warn("Comp: {} declared in constraints is not valid!", comp1);
- continue;
+
+ ConstraintConfig(Map<String, Object> conf, Set<String> comps) {
+ Object rasConstraints = conf.get(Config.TOPOLOGY_RAS_CONSTRAINTS);
+ comps.forEach(k -> incompatibleComponents.computeIfAbsent(k, x -> new HashSet<>()));
+ if (rasConstraints instanceof List) {
+ // old style
+ List<List<String>> constraints = (List<List<String>>) rasConstraints;
+ for (List<String> constraintPair : constraints) {
+ String comp1 = constraintPair.get(0);
+ String comp2 = constraintPair.get(1);
+ if (!comps.contains(comp1)) {
+ LOG.warn("Comp: {} declared in constraints is not valid!", comp1);
+ continue;
+ }
+ if (!comps.contains(comp2)) {
+ LOG.warn("Comp: {} declared in constraints is not valid!", comp2);
+ continue;
+ }
+ incompatibleComponents.get(comp1).add(comp2);
+ incompatibleComponents.get(comp2).add(comp1);
}
- if (!matrix.containsKey(comp2)) {
- LOG.warn("Comp: {} declared in constraints is not valid!", comp2);
- continue;
+ } else {
+ Map<String, Map<String, ?>> constraintMap = (Map<String, Map<String, ?>>) rasConstraints;
+ constraintMap.forEach((comp1, v) -> {
+ if (comps.contains(comp1)) {
+ v.forEach((ctype, constraint) -> {
+ switch (ctype) {
+ case CONSTRAINT_TYPE_MAX_NODE_CO_LOCATION_CNT:
+ try {
+ int numValue = Integer.parseInt("" + constraint);
+ if (numValue < 1) {
+ LOG.warn("{} {} declared for Comp {} is not valid, expected >= 1", ctype, numValue, comp1);
+ } else {
+ maxCoLocationCnts.put(comp1, numValue);
+ }
+ } catch (Exception ex) {
+ LOG.warn("{} {} declared for Comp {} is not valid, expected >= 1", ctype, constraint, comp1);
+ }
+ break;
+
+ case CONSTRAINT_TYPE_INCOMPATIBLE_COMPONENTS:
+ if (!(constraint instanceof List || constraint instanceof String)) {
+ LOG.warn("{} {} declared for Comp {} is not valid, expecting a list of components or 1 component",
+ ctype, constraint, comp1);
+ break;
+ }
+ List<String> list;
+ list = (constraint instanceof String) ? Arrays.asList((String) constraint) : (List<String>) constraint;
+ for (String comp2: list) {
+ if (!comps.contains(comp2)) {
+ LOG.warn("{} {} declared for Comp {} is not a valid component", ctype, comp2, comp1);
+ continue;
+ }
+ incompatibleComponents.get(comp1).add(comp2);
+ incompatibleComponents.get(comp2).add(comp1);
+ }
+ break;
+
+ default:
+ LOG.warn("ConstraintType={} invalid for component={}, valid values are {} and {}, ignoring value={}",
+ ctype, comp1, CONSTRAINT_TYPE_MAX_NODE_CO_LOCATION_CNT,
+ CONSTRAINT_TYPE_INCOMPATIBLE_COMPONENTS, constraint);
+ break;
+ }
+ });
+ } else {
+ LOG.warn("Component {} is not a valid component", comp1);
+ }
+ });
+ }
+
+ // process Config.TOPOLOGY_SPREAD_COMPONENTS - old style
+ // override only if not defined already using Config.TOPOLOGY_RAS_COMPONENTS above
+ Object obj = conf.get(Config.TOPOLOGY_SPREAD_COMPONENTS);
+ if (obj instanceof List) {
+ List<String> spread = (List<String>) obj;
+ if (spread != null) {
+ for (String comp : spread) {
+ if (!comps.contains(comp)) {
+ LOG.warn("Comp {} declared for spread not valid", comp);
+ continue;
+ }
+ if (maxCoLocationCnts.containsKey(comp)) {
+ LOG.warn("Comp {} maxNodeCoLocationCnt={} already defined in {}, ignoring spread config in {}", comp,
+ maxCoLocationCnts.get(comp), Config.TOPOLOGY_RAS_CONSTRAINTS, Config.TOPOLOGY_SPREAD_COMPONENTS);
+ continue;
+ }
+ maxCoLocationCnts.put(comp, 1);
+ }
}
- matrix.get(comp1).put(comp2, 1);
- matrix.get(comp2).put(comp1, 1);
+ } else {
+ LOG.warn("Ignoring invalid {} config={}", Config.TOPOLOGY_SPREAD_COMPONENTS, obj);
}
}
- return matrix;
+
+ public Map<String, Set<String>> getIncompatibleComponents() {
+ return incompatibleComponents;
+ }
+
+ public Map<String, Integer> getMaxCoLocationCnts() {
+ return maxCoLocationCnts;
+ }
}
+ private Map<String, RasNode> nodes;
+ private Map<ExecutorDetails, String> execToComp;
+ private Map<String, Set<ExecutorDetails>> compToExecs;
+ private List<String> favoredNodeIds;
+ private List<String> unFavoredNodeIds;
+ private ConstraintConfig constraintConfig;
+
/**
* Determines if a scheduling is valid and all constraints are satisfied.
*/
@VisibleForTesting
- public static boolean validateSolution(Cluster cluster, TopologyDetails td) {
- return checkSpreadSchedulingValid(cluster, td)
- && checkConstraintsSatisfied(cluster, td)
+ public static boolean validateSolution(Cluster cluster, TopologyDetails td, ConstraintConfig constraintConfig) {
+ if (constraintConfig == null) {
+ constraintConfig = new ConstraintConfig(td);
+ }
+ return checkSpreadSchedulingValid(cluster, td, constraintConfig)
+ && checkConstraintsSatisfied(cluster, td, constraintConfig)
&& checkResourcesCorrect(cluster, td);
}
/**
* Check if constraints are satisfied.
*/
- private static boolean checkConstraintsSatisfied(Cluster cluster, TopologyDetails topo) {
+ private static boolean checkConstraintsSatisfied(Cluster cluster, TopologyDetails topo, ConstraintConfig constraintConfig) {
LOG.info("Checking constraints...");
assert (cluster.getAssignmentById(topo.getId()) != null);
+ if (constraintConfig == null) {
+ constraintConfig = new ConstraintConfig(topo);
+ }
Map<ExecutorDetails, WorkerSlot> result = cluster.getAssignmentById(topo.getId()).getExecutorToSlot();
Map<ExecutorDetails, String> execToComp = topo.getExecutorToComponent();
//get topology constraints
- Map<String, Map<String, Integer>> constraintMatrix = getConstraintMap(topo, new HashSet<>(topo.getExecutorToComponent().values()));
+ Map<String, Set<String>> constraintMatrix = constraintConfig.incompatibleComponents;
Map<WorkerSlot, Set<String>> workerCompMap = new HashMap<>();
result.forEach((exec, worker) -> {
@@ -113,7 +222,7 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
Set<String> comps = entry.getValue();
for (String comp1 : comps) {
for (String comp2 : comps) {
- if (!comp1.equals(comp2) && constraintMatrix.get(comp1).get(comp2) != 0) {
+ if (!comp1.equals(comp2) && constraintMatrix.get(comp1).contains(comp2)) {
LOG.error("Incorrect Scheduling: worker exclusion for Component {} and {} not satisfied on WorkerSlot: {}",
comp1, comp2, entry.getKey());
return false;
@@ -134,38 +243,38 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
return workerToNodes;
}
- private static boolean checkSpreadSchedulingValid(Cluster cluster, TopologyDetails topo) {
+ private static boolean checkSpreadSchedulingValid(Cluster cluster, TopologyDetails topo, ConstraintConfig constraintConfig) {
LOG.info("Checking for a valid scheduling...");
assert (cluster.getAssignmentById(topo.getId()) != null);
- Map<ExecutorDetails, WorkerSlot> result = cluster.getAssignmentById(topo.getId()).getExecutorToSlot();
+ if (constraintConfig == null) {
+ constraintConfig = new ConstraintConfig(topo);
+ }
Map<ExecutorDetails, String> execToComp = topo.getExecutorToComponent();
- Map<WorkerSlot, HashSet<ExecutorDetails>> workerExecMap = new HashMap<>();
- Map<WorkerSlot, HashSet<String>> workerCompMap = new HashMap<>();
- Map<RasNode, HashSet<String>> nodeCompMap = new HashMap<>();
+ Map<String, Map<String, Integer>> nodeCompMap = new HashMap<>(); // this is the critical count
Map<WorkerSlot, RasNode> workerToNodes = workerToNodes(cluster);
boolean ret = true;
- HashSet<String> spreadComps = getSpreadComps(topo);
- for (Map.Entry<ExecutorDetails, WorkerSlot> entry : result.entrySet()) {
+ Map<String, Integer> spreadCompCnts = constraintConfig.maxCoLocationCnts;
+ for (Map.Entry<ExecutorDetails, WorkerSlot> entry : cluster.getAssignmentById(topo.getId()).getExecutorToSlot().entrySet()) {
ExecutorDetails exec = entry.getKey();
+ String comp = execToComp.get(exec);
WorkerSlot worker = entry.getValue();
RasNode node = workerToNodes.get(worker);
-
- if (workerExecMap.computeIfAbsent(worker, (k) -> new HashSet<>()).contains(exec)) {
- LOG.error("Incorrect Scheduling: Found duplicate in scheduling");
- return false;
- }
- workerExecMap.get(worker).add(exec);
- String comp = execToComp.get(exec);
- workerCompMap.computeIfAbsent(worker, (k) -> new HashSet<>()).add(comp);
- if (spreadComps.contains(comp)) {
- if (nodeCompMap.computeIfAbsent(node, (k) -> new HashSet<>()).contains(comp)) {
- LOG.error("Incorrect Scheduling: Spread for Component: {} {} on node {} not satisfied {}",
- comp, exec, node.getId(), nodeCompMap.get(node));
+ String nodeId = node.getId();
+
+ if (spreadCompCnts.containsKey(comp)) {
+ int allowedColocationMaxCnt = spreadCompCnts.get(comp);
+ Map<String, Integer> oneNodeCompMap = nodeCompMap.computeIfAbsent(nodeId, (k) -> new HashMap<>());
+ oneNodeCompMap.put(comp, oneNodeCompMap.getOrDefault(comp, 0) + 1);
+ if (allowedColocationMaxCnt < oneNodeCompMap.get(comp)) {
+ LOG.error("Incorrect Scheduling: MaxCoLocationCnt for Component: {} {} on node {} not satisfied, cnt {} > allowed {}",
+ comp, exec, nodeId, oneNodeCompMap.get(comp), allowedColocationMaxCnt);
ret = false;
}
}
- nodeCompMap.computeIfAbsent(node, (k) -> new HashSet<>()).add(comp);
+ }
+ if (!ret) {
+ LOG.error("Incorrect MaxCoLocationCnts: Node-Component-Cnt {}", nodeCompMap);
}
return ret;
}
@@ -224,29 +333,13 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
return true;
}
- private static HashSet<String> getSpreadComps(TopologyDetails topo) {
- HashSet<String> retSet = new HashSet<>();
- List<String> spread = (List<String>) topo.getConf().get(Config.TOPOLOGY_SPREAD_COMPONENTS);
- if (spread != null) {
- Set<String> comps = topo.getComponents().keySet();
- for (String comp : spread) {
- if (comps.contains(comp)) {
- retSet.add(comp);
- } else {
- LOG.warn("Comp {} declared for spread not valid", comp);
- }
- }
- }
- return retSet;
- }
-
@Override
public SchedulingResult schedule(Cluster cluster, TopologyDetails td) {
prepare(cluster);
LOG.debug("Scheduling {}", td.getId());
nodes = RasNodes.getAllNodesFrom(cluster);
- Map<WorkerSlot, Set<String>> workerCompAssignment = new HashMap<>();
- Map<RasNode, Set<String>> nodeCompAssignment = new HashMap<>();
+ Map<WorkerSlot, Map<String, Integer>> workerCompAssignment = new HashMap<>();
+ Map<RasNode, Map<String, Integer>> nodeCompAssignment = new HashMap<>();
int confMaxStateSearch = ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH));
int daemonMaxStateSearch = ObjectReader.getInt(cluster.getConf().get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_STATE_SEARCH));
@@ -262,17 +355,15 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
//get mapping of components to executors
compToExecs = getCompToExecs(execToComp);
- //get topology constraints
- constraintMatrix = getConstraintMap(td, compToExecs.keySet());
-
- //get spread components
- spreadComps = getSpreadComps(td);
+ // get constraint configuration
+ constraintConfig = new ConstraintConfig(td);
//get a sorted list of unassigned executors based on number of constraints
Set<ExecutorDetails> unassignedExecutors = new HashSet<>(cluster.getUnassignedExecutors(td));
- List<ExecutorDetails> sortedExecs = getSortedExecs(spreadComps, constraintMatrix, compToExecs).stream()
- .filter(unassignedExecutors::contains)
- .collect(Collectors.toList());
+ List<ExecutorDetails> sortedExecs;
+ sortedExecs = getSortedExecs(constraintConfig.maxCoLocationCnts, constraintConfig.incompatibleComponents, compToExecs).stream()
+ .filter(unassignedExecutors::contains)
+ .collect(Collectors.toList());
//populate with existing assignments
SchedulerAssignment existingAssignment = cluster.getAssignmentById(td.getId());
@@ -280,10 +371,11 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
existingAssignment.getExecutorToSlot().forEach((exec, ws) -> {
String compId = execToComp.get(exec);
RasNode node = nodes.get(ws.getNodeId());
- //populate node to component Assignments
- nodeCompAssignment.computeIfAbsent(node, (k) -> new HashSet<>()).add(compId);
+ Map<String, Integer> oneMap = nodeCompAssignment.computeIfAbsent(node, (k) -> new HashMap<>());
+ oneMap.put(compId, oneMap.getOrDefault(compId, 0) + 1); // increment
//populate worker to comp assignments
- workerCompAssignment.computeIfAbsent(ws, (k) -> new HashSet<>()).add(compId);
+ oneMap = workerCompAssignment.computeIfAbsent(ws, (k) -> new HashMap<>());
+ oneMap.put(compId, oneMap.getOrDefault(compId, 0) + 1); // increment
});
}
@@ -297,11 +389,13 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
}
private boolean checkSchedulingFeasibility(int maxStateSearch) {
- for (String comp : spreadComps) {
+ for (Map.Entry<String, Integer> entry : constraintConfig.maxCoLocationCnts.entrySet()) {
+ String comp = entry.getKey();
+ int maxCoLocationCnt = entry.getValue();
int numExecs = compToExecs.get(comp).size();
- if (numExecs > nodes.size()) {
+ if (numExecs > nodes.size() * maxCoLocationCnt) {
LOG.error("Unsatisfiable constraint: Component: {} marked as spread has {} executors which is larger "
- + "than number of nodes: {}", comp, numExecs, nodes.size());
+ + "than number of nodes * maxCoLocationCnt: {} * {} ", comp, numExecs, nodes.size(), maxCoLocationCnt);
return false;
}
}
@@ -345,7 +439,7 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
for (int loopCnt = 0 ; true ; loopCnt++) {
LOG.debug("backtrackSearch: loopCnt = {}, state.execIndex = {}", loopCnt, state.execIndex);
if (state.areSearchLimitsExceeded()) {
- LOG.warn("backtrackSearch: Search limits exceeded");
+ LOG.warn("backtrackSearch: Search limits exceeded, backtracked {} times, looped {} times", state.numBacktrack, loopCnt);
return new SolverResult(state, false);
}
@@ -356,6 +450,7 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
int execIndex = state.execIndex;
ExecutorDetails exec = state.currentExec();
+ String comp = execToComp.get(exec);
Iterable<String> sortedNodesIter = sortAllNodes(state.td, exec, favoredNodeIds, unFavoredNodeIds);
int progressIdx = -1;
@@ -367,8 +462,8 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
continue;
}
progressIdxForExec[execIndex]++;
- LOG.debug("backtrackSearch: loopCnt = {}, state.execIndex = {}, node/slot-ordinal = {}, nodeId = {}",
- loopCnt, execIndex, progressIdx, nodeId);
+ LOG.debug("backtrackSearch: loopCnt = {}, state.execIndex = {}, comp = {}, node/slot-ordinal = {}, nodeId = {}",
+ loopCnt, execIndex, comp, progressIdx, nodeId);
if (!isExecAssignmentToWorkerValid(workerSlot, state)) {
continue;
@@ -378,20 +473,21 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
state.tryToSchedule(execToComp, node, workerSlot);
if (state.areAllExecsScheduled()) {
//Everything is scheduled correctly, so no need to search any more.
- LOG.info("backtrackSearch: AllExecsScheduled at loopCnt = {} in {} milliseconds, elapsedtime in state={}",
- loopCnt, System.currentTimeMillis() - startTimeMilli, Time.currentTimeMillis() - state.startTimeMillis);
+ LOG.info("backtrackSearch: AllExecsScheduled at loopCnt={} in {} ms, elapsedtime in state={}, backtrackCnt={}",
+ loopCnt, System.currentTimeMillis() - startTimeMilli, Time.currentTimeMillis() - state.startTimeMillis,
+ state.numBacktrack);
return new SolverResult(state, true);
}
state = state.nextExecutor();
nodeForExec[execIndex] = node;
workerSlotForExec[execIndex] = workerSlot;
- LOG.debug("backtrackSearch: Assigned execId={} to node={}, node/slot-ordinal={} at loopCnt={}",
- execIndex, nodeId, progressIdx, loopCnt);
+ LOG.debug("backtrackSearch: Assigned execId={}, comp={} to node={}, node/slot-ordinal={} at loopCnt={}",
+ execIndex, comp, nodeId, progressIdx, loopCnt);
continue OUTERMOST_LOOP;
}
}
// if here, then the executor was not assigned, backtrack;
- LOG.debug("backtrackSearch: Failed to schedule execId = {} at loopCnt = {}", execIndex, loopCnt);
+ LOG.debug("backtrackSearch: Failed to schedule execId={}, comp={} at loopCnt={}", execIndex, comp, loopCnt);
if (execIndex == 0) {
break;
} else {
@@ -400,8 +496,8 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
}
}
boolean success = state.areAllExecsScheduled();
- LOG.info("backtrackSearch: Scheduled={} in {} milliseconds, elapsedtime in state={}",
- success, System.currentTimeMillis() - startTimeMilli, Time.currentTimeMillis() - state.startTimeMillis);
+ LOG.info("backtrackSearch: Scheduled={} in {} milliseconds, elapsedtime in state={}, backtrackCnt={}",
+ success, System.currentTimeMillis() - startTimeMilli, Time.currentTimeMillis() - state.startTimeMillis, state.numBacktrack);
return new SolverResult(state, success);
}
@@ -420,11 +516,11 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
//check if exec can be on worker based on user defined component exclusions
String execComp = execToComp.get(exec);
- Set<String> components = state.workerCompAssignment.get(worker);
- if (components != null) {
- Map<String, Integer> subMatrix = constraintMatrix.get(execComp);
- for (String comp : components) {
- if (subMatrix.get(comp) != 0) {
+ Map<String, Integer> compAssignmentCnts = state.workerCompAssignmentCnts.get(worker);
+ if (compAssignmentCnts != null && constraintConfig.incompatibleComponents.containsKey(execComp)) {
+ Set<String> subMatrix = constraintConfig.incompatibleComponents.get(execComp);
+ for (String comp : compAssignmentCnts.keySet()) {
+ if (subMatrix.contains(comp)) {
LOG.trace("{} found {} constraint violation {} on {}", exec, execComp, comp, worker);
return false;
}
@@ -432,9 +528,12 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
}
//check if exec satisfy spread
- if (spreadComps.contains(execComp)) {
- if (state.nodeCompAssignment.computeIfAbsent(node, (k) -> new HashSet<>()).contains(execComp)) {
- LOG.trace("{} Found spread violation {} on node {}", exec, execComp, node.getId());
+ if (constraintConfig.maxCoLocationCnts.containsKey(execComp)) {
+ int coLocationMaxCnt = constraintConfig.maxCoLocationCnts.get(execComp);
+ if (state.nodeCompAssignmentCnts.containsKey(node)
+ && state.nodeCompAssignmentCnts.get(node).getOrDefault(execComp, 0) >= coLocationMaxCnt) {
+ LOG.trace("{} Found MaxCoLocationCnt violation {} on node {}, count {} >= colocation count {}",
+ exec, execComp, node.getId(), state.nodeCompAssignmentCnts.get(node).get(execComp), coLocationMaxCnt);
return false;
}
}
@@ -447,22 +546,24 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
return retMap;
}
- private ArrayList<ExecutorDetails> getSortedExecs(HashSet<String> spreadComps, Map<String, Map<String, Integer>> constraintMatrix,
+ private ArrayList<ExecutorDetails> getSortedExecs(Map<String, Integer> spreadCompCnts,
+ Map<String, Set<String>> constraintMatrix,
Map<String, Set<ExecutorDetails>> compToExecs) {
ArrayList<ExecutorDetails> retList = new ArrayList<>();
//find number of constraints per component
//Key->Comp Value-># of constraints
- Map<String, Integer> compConstraintCountMap = new HashMap<>();
+ Map<String, Double> compConstraintCountMap = new HashMap<>();
constraintMatrix.forEach((comp, subMatrix) -> {
- int count = subMatrix.values().stream().mapToInt(Number::intValue).sum();
- //check component is declared for spreading
- if (spreadComps.contains(comp)) {
- count++;
+ double count = subMatrix.size();
+ // check if component is declared for spreading
+ if (spreadCompCnts.containsKey(comp)) {
+ // lower (1 and above only) value is most constrained should have higher count
+ count += (compToExecs.size() / spreadCompCnts.get(comp));
}
- compConstraintCountMap.put(comp, count);
+ compConstraintCountMap.put(comp, count); // higher count sorts to the front
});
//Sort comps by number of constraints
- NavigableMap<String, Integer> sortedCompConstraintCountMap = sortByValues(compConstraintCountMap);
+ NavigableMap<String, Double> sortedCompConstraintCountMap = sortByValues(compConstraintCountMap);
//sort executors based on component constraints
for (String comp : sortedCompConstraintCountMap.keySet()) {
retList.addAll(compToExecs.get(comp));
@@ -471,7 +572,7 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
}
/**
- * Used to sort a Map by the values.
+ * Used to sort a Map by the values - higher values up front.
*/
@VisibleForTesting
public <K extends Comparable<K>, V extends Comparable<V>> NavigableMap<K, V> sortByValues(final Map<K, V> map) {
@@ -488,13 +589,15 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
return sortedByValues;
}
- protected static class SolverResult {
+ protected static final class SolverResult {
+ private final SearcherState state;
private final int statesSearched;
private final boolean success;
private final long timeTakenMillis;
private final int backtracked;
public SolverResult(SearcherState state, boolean success) {
+ this.state = state;
this.statesSearched = state.getStatesSearched();
this.success = success;
timeTakenMillis = Time.currentTimeMillis() - state.startTimeMillis;
@@ -506,20 +609,21 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
return SchedulingResult.success("Fully Scheduled by ConstraintSolverStrategy (" + statesSearched
+ " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)");
}
+ state.logNodeCompAssignments();
return SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES,
"Cannot find scheduling that satisfies all constraints (" + statesSearched
+ " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)");
}
}
- protected static class SearcherState {
+ protected static final class SearcherState {
final long startTimeMillis;
private final long maxEndTimeMs;
// A map of the worker to the components in the worker to be able to enforce constraints.
- private final Map<WorkerSlot, Set<String>> workerCompAssignment;
+ private final Map<WorkerSlot, Map<String, Integer>> workerCompAssignmentCnts;
private final boolean[] okToRemoveFromWorker;
// for the currently tested assignment a Map of the node to the components on it to be able to enforce constraints
- private final Map<RasNode, Set<String>> nodeCompAssignment;
+ private final Map<RasNode, Map<String, Integer>> nodeCompAssignmentCnts;
private final boolean[] okToRemoveFromNode;
// Static State
// The list of all executors (preferably sorted to make assignments simpler).
@@ -537,13 +641,14 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
// The current executor we are trying to schedule
private int execIndex = 0;
- private SearcherState(Map<WorkerSlot, Set<String>> workerCompAssignment, Map<RasNode, Set<String>> nodeCompAssignment,
- int maxStatesSearched, long maxTimeMs, List<ExecutorDetails> execs, TopologyDetails td) {
+ private SearcherState(Map<WorkerSlot, Map<String, Integer>> workerCompAssignmentCnts,
+ Map<RasNode, Map<String, Integer>> nodeCompAssignmentCnts, int maxStatesSearched, long maxTimeMs,
+ List<ExecutorDetails> execs, TopologyDetails td) {
assert !execs.isEmpty();
assert execs != null;
- this.workerCompAssignment = workerCompAssignment;
- this.nodeCompAssignment = nodeCompAssignment;
+ this.workerCompAssignmentCnts = workerCompAssignmentCnts;
+ this.nodeCompAssignmentCnts = nodeCompAssignmentCnts;
this.maxStatesSearched = maxStatesSearched;
this.execs = execs;
okToRemoveFromWorker = new boolean[execs.size()];
@@ -593,13 +698,26 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
return execs.get(execIndex);
}
+ /**
+ * Assign executor to worker and node.
+ * TODO: tryToSchedule is a misnomer, since it always schedules.
+ * Assignment validity check is done before the call to tryToSchedule().
+ *
+ * @param execToComp Mapping from executor to component name.
+ * @param node RasNode on which to schedule.
+ * @param workerSlot WorkerSlot on which to schedule.
+ */
public void tryToSchedule(Map<ExecutorDetails, String> execToComp, RasNode node, WorkerSlot workerSlot) {
ExecutorDetails exec = currentExec();
String comp = execToComp.get(exec);
LOG.trace("Trying assignment of {} {} to {}", exec, comp, workerSlot);
- //It is possible that this component is already scheduled on this node or worker. If so when we backtrack we cannot remove it
- okToRemoveFromWorker[execIndex] = workerCompAssignment.computeIfAbsent(workerSlot, (k) -> new HashSet<>()).add(comp);
- okToRemoveFromNode[execIndex] = nodeCompAssignment.computeIfAbsent(node, (k) -> new HashSet<>()).add(comp);
+ // It is possible that this component is already scheduled on this node or worker. If so when we backtrack we cannot remove it
+ Map<String, Integer> oneMap = workerCompAssignmentCnts.computeIfAbsent(workerSlot, (k) -> new HashMap<>());
+ oneMap.put(comp, oneMap.getOrDefault(comp, 0) + 1); // increment assignment count
+ okToRemoveFromWorker[execIndex] = true;
+ oneMap = nodeCompAssignmentCnts.computeIfAbsent(node, (k) -> new HashMap<>());
+ oneMap.put(comp, oneMap.getOrDefault(comp, 0) + 1); // increment assignment count
+ okToRemoveFromNode[execIndex] = true;
node.assignSingleExecutor(workerSlot, exec, td);
}
@@ -613,14 +731,51 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
String comp = execToComp.get(exec);
LOG.trace("Backtracking {} {} from {}", exec, comp, workerSlot);
if (okToRemoveFromWorker[execIndex]) {
- workerCompAssignment.get(workerSlot).remove(comp);
+ Map<String, Integer> oneMap = workerCompAssignmentCnts.get(workerSlot);
+ oneMap.put(comp, oneMap.getOrDefault(comp, 0) - 1); // decrement assignment count
okToRemoveFromWorker[execIndex] = false;
}
if (okToRemoveFromNode[execIndex]) {
- nodeCompAssignment.get(node).remove(comp);
+ Map<String, Integer> oneMap = nodeCompAssignmentCnts.get(node);
+ oneMap.put(comp, oneMap.getOrDefault(comp, 0) - 1); // decrement assignment count
okToRemoveFromNode[execIndex] = false;
}
node.freeSingleExecutor(exec, td);
}
+
+ /**
+ * Use this method to log the current component assignments on the Node.
+ * Useful for debugging and tests.
+ */
+ public void logNodeCompAssignments() {
+ if (nodeCompAssignmentCnts == null || nodeCompAssignmentCnts.isEmpty()) {
+ LOG.info("NodeCompAssignment is empty");
+ return;
+ }
+ StringBuffer sb = new StringBuffer();
+ int cntAllNodes = 0;
+ int cntFilledNodes = 0;
+ for (RasNode node: new TreeSet<>(nodeCompAssignmentCnts.keySet())) {
+ cntAllNodes++;
+ Map<String, Integer> oneMap = nodeCompAssignmentCnts.get(node);
+ if (oneMap.isEmpty()) {
+ continue;
+ }
+ cntFilledNodes++;
+ String oneMapJoined = String.join(
+ ",",
+ oneMap.entrySet()
+ .stream().map(e -> String.format("%s: %s", e.getKey(), e.getValue()))
+ .collect(Collectors.toList())
+ );
+ sb.append(String.format("\n\t(%d) Node %s: %s", cntFilledNodes, node.getId(), oneMapJoined));
+ }
+ LOG.info("NodeCompAssignments available for {} of {} nodes {}", cntFilledNodes, cntAllNodes, sb);
+ LOG.info("Executors assignments attempted (cnt={}) are: \n\t{}",
+ execs.size(), execs.stream().map(x -> x.toString()).collect(Collectors.joining(","))
+ );
+ }
}
}
+
+
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
index f245d90..65d230d 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
@@ -18,6 +18,8 @@
package org.apache.storm.scheduler.resource.strategies.scheduling;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.NavigableMap;
import java.util.Set;
@@ -33,13 +35,15 @@ import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
import org.apache.storm.scheduler.resource.ResourceAwareScheduler;
import org.apache.storm.scheduler.resource.SchedulingResult;
-import org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
import org.junit.Assert;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,27 +58,70 @@ import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareSched
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.scheduler.resource.normalization.ResourceMetrics;
+@RunWith(Parameterized.class)
public class TestConstraintSolverStrategy {
+ @Parameters
+ public static Object[] data() {
+ return new Object[] { false, true };
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(TestConstraintSolverStrategy.class);
private static final int MAX_TRAVERSAL_DEPTH = 2000;
private static final int NORMAL_BOLT_PARALLEL = 11;
//Dropping the parallelism of the bolts to 3 instead of 11 so we can find a solution in a reasonable amount of work when backtracking.
private static final int BACKTRACK_BOLT_PARALLEL = 3;
+ private static final int CO_LOCATION_CNT = 2;
- public Map<String, Object> makeTestTopoConf() {
+ // class members
+ public Boolean consolidatedConfigFlag = Boolean.TRUE;
+
+ public TestConstraintSolverStrategy(boolean consolidatedConfigFlag) {
+ this.consolidatedConfigFlag = consolidatedConfigFlag;
+ LOG.info("Running tests with consolidatedConfigFlag={}", consolidatedConfigFlag);
+ }
+
+ /**
+ * Helper function to add a constraint specifying two components that cannot co-exist.
+ * Note that it is redundant to specify the converse.
+ *
+ * @param comp1 first component name
+ * @param comp2 second component name
+ * @param constraints the resulting constraint list of lists which is updated
+ */
+ public static void addConstraints(String comp1, String comp2, List<List<String>> constraints) {
+ LinkedList<String> constraintPair = new LinkedList<>();
+ constraintPair.add(comp1);
+ constraintPair.add(comp2);
+ constraints.add(constraintPair);
+ }
+
+ /**
+ * Make test Topology configuration, but with the newer spread constraints that allow associating a number
+ * with the spread. This number represents the maximum co-located component count. Default under the old
+ * configuration is assumed to be 1.
+ *
+ * @param maxCoLocationCnt Maximum co-located component (spout-0), minimum value is 1.
+ * @return
+ */
+ public Map<String, Object> makeTestTopoConf(int maxCoLocationCnt) {
+ if (maxCoLocationCnt < 1) {
+ maxCoLocationCnt = 1;
+ }
List<List<String>> constraints = new LinkedList<>();
- addContraints("spout-0", "bolt-0", constraints);
- addContraints("bolt-2", "spout-0", constraints);
- addContraints("bolt-1", "bolt-2", constraints);
- addContraints("bolt-1", "bolt-0", constraints);
- addContraints("bolt-1", "spout-0", constraints);
- List<String> spread = new LinkedList<>();
- spread.add("spout-0");
+ addConstraints("spout-0", "bolt-0", constraints);
+ addConstraints("bolt-2", "spout-0", constraints);
+ addConstraints("bolt-1", "bolt-2", constraints);
+ addConstraints("bolt-1", "bolt-0", constraints);
+ addConstraints("bolt-1", "spout-0", constraints);
+
+ Map<String, Integer> spreads = new HashMap<>();
+ spreads.put("spout-0", maxCoLocationCnt);
Map<String, Object> config = Utils.readDefaultConfig();
+
+ setConstraintConfig(constraints, spreads, config);
+
config.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_STATE_SEARCH, MAX_TRAVERSAL_DEPTH);
- config.put(Config.TOPOLOGY_SPREAD_COMPONENTS, spread);
- config.put(Config.TOPOLOGY_RAS_CONSTRAINTS, constraints);
config.put(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH, MAX_TRAVERSAL_DEPTH);
config.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 100_000);
config.put(Config.TOPOLOGY_PRIORITY, 1);
@@ -85,6 +132,54 @@ public class TestConstraintSolverStrategy {
return config;
}
+ /**
+ * Set Config.TOPOLOGY_RAS_CONSTRAINTS (when consolidatedConfigFlag is true) or both
+ * Config.TOPOLOGY_RAS_CONSTRAINTS/Config.TOPOLOGY_SPREAD_COMPONENTS (when consolidatedConfigFlag is false).
+ *
+ * When consolidatedConfigFlag is true, use the new more consolidated format to set Config.TOPOLOGY_RAS_CONSTRAINTS.
+ * When false, use the old configuration format for Config.TOPOLOGY_RAS_CONSTRAINTS/TOPOLOGY_SPREAD_COMPONENTS.
+ *
+ * @param constraints List of components, where the first one cannot co-exist with the others in the list
+ * @param spreads Map of component and its maxCoLocationCnt
+ * @param config Configuration to be updated
+ */
+ private void setConstraintConfig(List<List<String>> constraints, Map<String, Integer> spreads, Map<String, Object> config) {
+ if (consolidatedConfigFlag) {
+ // single configuration for each component
+ Map<String, Map<String,Object>> modifiedConstraints = new HashMap<>();
+ for (List<String> constraint: constraints) {
+ if (constraint.size() < 2) {
+ continue;
+ }
+ String comp = constraint.get(0);
+ List<String> others = constraint.subList(1, constraint.size());
+ List<Object> incompatibleComponents = (List<Object>) modifiedConstraints.computeIfAbsent(comp, k -> new HashMap<>())
+ .computeIfAbsent(ConstraintSolverStrategy.CONSTRAINT_TYPE_INCOMPATIBLE_COMPONENTS, k -> new ArrayList<>());
+ incompatibleComponents.addAll(others);
+ }
+ for (String comp: spreads.keySet()) {
+ modifiedConstraints.computeIfAbsent(comp, k -> new HashMap<>()).put(ConstraintSolverStrategy.CONSTRAINT_TYPE_MAX_NODE_CO_LOCATION_CNT, "" + spreads.get(comp));
+ }
+ config.put(Config.TOPOLOGY_RAS_CONSTRAINTS, modifiedConstraints);
+ } else {
+ // constraint and MaxCoLocationCnts are separate - no maxCoLocationCnt implied as 1
+ config.put(Config.TOPOLOGY_RAS_CONSTRAINTS, constraints);
+ for (Map.Entry<String, Integer> e: spreads.entrySet()) {
+ if (e.getValue() > 1) {
+ Assert.fail(String.format("Invalid %s=%d for component=%s, expecting 1 for old-style configuration",
+ ConstraintSolverStrategy.CONSTRAINT_TYPE_MAX_NODE_CO_LOCATION_CNT,
+ e.getValue(),
+ e.getKey()));
+ }
+ }
+ config.put(Config.TOPOLOGY_SPREAD_COMPONENTS, new ArrayList(spreads.keySet()));
+ }
+ }
+
+ public Map<String, Object> makeTestTopoConf() {
+ return makeTestTopoConf(1);
+ }
+
public TopologyDetails makeTopology(Map<String, Object> config, int boltParallel) {
return genTopology("testTopo", config, 1, 4, 4, boltParallel, 0, 0, "user");
}
@@ -101,8 +196,8 @@ public class TestConstraintSolverStrategy {
return new Cluster(new INimbusTest(), new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
}
- public void basicUnitTestWithKillAndRecover(ConstraintSolverStrategy cs, int boltParallel) {
- Map<String, Object> config = makeTestTopoConf();
+ public void basicUnitTestWithKillAndRecover(ConstraintSolverStrategy cs, int boltParallel, int coLocationCnt) {
+ Map<String, Object> config = makeTestTopoConf(coLocationCnt);
cs.prepare(config);
TopologyDetails topo = makeTopology(config, boltParallel);
@@ -114,9 +209,9 @@ public class TestConstraintSolverStrategy {
LOG.info("Done scheduling {}...", result);
Assert.assertTrue("Assert scheduling topology success " + result, result.isSuccess());
- Assert.assertEquals("topo all executors scheduled? " + cluster.getUnassignedExecutors(topo),
+ Assert.assertEquals("Assert no unassigned executors, found unassigned: " + cluster.getUnassignedExecutors(topo),
0, cluster.getUnassignedExecutors(topo).size());
- Assert.assertTrue("Valid Scheduling?", ConstraintSolverStrategy.validateSolution(cluster, topo));
+ Assert.assertTrue("Valid Scheduling?", ConstraintSolverStrategy.validateSolution(cluster, topo, null));
LOG.info("Slots Used {}", cluster.getAssignmentById(topo.getId()).getSlots());
LOG.info("Assignment {}", cluster.getAssignmentById(topo.getId()).getSlotToExecutors());
@@ -146,25 +241,84 @@ public class TestConstraintSolverStrategy {
Assert.assertTrue("Assert scheduling topology success " + result, result.isSuccess());
Assert.assertEquals("topo all executors scheduled?", 0, cluster.getUnassignedExecutors(topo).size());
- Assert.assertTrue("Valid Scheduling?", ConstraintSolverStrategy.validateSolution(cluster, topo));
+ Assert.assertTrue("Valid Scheduling?", ConstraintSolverStrategy.validateSolution(cluster, topo, null));
+ }
+
+ @Test
+ public void testNewConstraintFormat() {
+ String s = String.format(
+ "{ \"comp-1\": "
+ + " { \"%s\": 2, "
+ + " \"%s\": [\"comp-2\", \"comp-3\" ] }, "
+ + " \"comp-2\": "
+ + " { \"%s\": [ \"comp-4\" ] }, "
+ + " \"comp-3\": "
+ + " { \"%s\": \"comp-5\" } "
+ + "}",
+ ConstraintSolverStrategy.CONSTRAINT_TYPE_MAX_NODE_CO_LOCATION_CNT,
+ ConstraintSolverStrategy.CONSTRAINT_TYPE_INCOMPATIBLE_COMPONENTS,
+ ConstraintSolverStrategy.CONSTRAINT_TYPE_INCOMPATIBLE_COMPONENTS,
+ ConstraintSolverStrategy.CONSTRAINT_TYPE_INCOMPATIBLE_COMPONENTS
+ );
+ Object jsonValue = JSONValue.parse(s);
+ Map<String, Object> config = Utils.readDefaultConfig();
+ config.put(Config.TOPOLOGY_RAS_CONSTRAINTS, jsonValue);
+ Set<String> allComps = new HashSet<>();
+ allComps.addAll(Arrays.asList("comp-1", "comp-2", "comp-3", "comp-4", "comp-5"));
+ ConstraintSolverStrategy.ConstraintConfig constraintConfig = new ConstraintSolverStrategy.ConstraintConfig(config, allComps);
+
+ Set<String> expectedSetComp1 = new HashSet<>();
+ expectedSetComp1.addAll(Arrays.asList("comp-2", "comp-3"));
+ Set<String> expectedSetComp2 = new HashSet<>();
+ expectedSetComp2.addAll(Arrays.asList("comp-1", "comp-4"));
+ Set<String> expectedSetComp3 = new HashSet<>();
+ expectedSetComp3.addAll(Arrays.asList("comp-1", "comp-5"));
+ Assert.assertEquals("comp-1 incompatible components", expectedSetComp1, constraintConfig.getIncompatibleComponents().get("comp-1"));
+ Assert.assertEquals("comp-2 incompatible components", expectedSetComp2, constraintConfig.getIncompatibleComponents().get("comp-2"));
+ Assert.assertEquals("comp-3 incompatible components", expectedSetComp3, constraintConfig.getIncompatibleComponents().get("comp-3"));
+ Assert.assertEquals("comp-1 maxNodeCoLocationCnt", 2, (int) constraintConfig.getMaxCoLocationCnts().getOrDefault("comp-1", -1));
+ Assert.assertNull("comp-2 maxNodeCoLocationCnt", constraintConfig.getMaxCoLocationCnts().get("comp-2"));
}
@Test
- public void testConstraintSolverForceBacktrack() {
+ public void testConstraintSolverForceBacktrackWithSpreadCoLocation() {
//The best way to force backtracking is to change the heuristic so the components are reversed, so it is hard
// to find an answer.
+ if (CO_LOCATION_CNT > 1 && !consolidatedConfigFlag) {
+ LOG.info("INFO: Skipping Test {} with {}={} (required 1), and consolidatedConfigFlag={} (required false)",
+ "testConstraintSolverForceBacktrackWithSpreadCoLocation",
+ ConstraintSolverStrategy.CONSTRAINT_TYPE_MAX_NODE_CO_LOCATION_CNT,
+ CO_LOCATION_CNT,
+ consolidatedConfigFlag);
+ return;
+ }
+
ConstraintSolverStrategy cs = new ConstraintSolverStrategy() {
@Override
public <K extends Comparable<K>, V extends Comparable<V>> NavigableMap<K, V> sortByValues(final Map<K, V> map) {
return super.sortByValues(map).descendingMap();
}
};
- basicUnitTestWithKillAndRecover(cs, BACKTRACK_BOLT_PARALLEL);
+ basicUnitTestWithKillAndRecover(cs, BACKTRACK_BOLT_PARALLEL, CO_LOCATION_CNT);
}
@Test
public void testConstraintSolver() {
- basicUnitTestWithKillAndRecover(new ConstraintSolverStrategy(), NORMAL_BOLT_PARALLEL);
+ basicUnitTestWithKillAndRecover(new ConstraintSolverStrategy(), NORMAL_BOLT_PARALLEL, 1);
+ }
+
+ @Test
+ public void testConstraintSolverWithSpreadCoLocation() {
+ if (CO_LOCATION_CNT > 1 && !consolidatedConfigFlag) {
+ LOG.info("INFO: Skipping Test {} with {}={} (required 1), and consolidatedConfigFlag={} (required false)",
+ "testConstraintSolverWithSpreadCoLocation",
+ ConstraintSolverStrategy.CONSTRAINT_TYPE_MAX_NODE_CO_LOCATION_CNT,
+ CO_LOCATION_CNT,
+ consolidatedConfigFlag);
+ return;
+ }
+
+ basicUnitTestWithKillAndRecover(new ConstraintSolverStrategy(), NORMAL_BOLT_PARALLEL, CO_LOCATION_CNT);
}
public void basicFailureTest(String confKey, Object confValue, ConstraintSolverStrategy cs) {
@@ -204,14 +358,30 @@ public class TestConstraintSolverStrategy {
}
}
+ @Test
+ public void testScheduleLargeExecutorConstraintCountSmall() {
+ testScheduleLargeExecutorConstraintCount(1);
+ }
+
/*
* Test scheduling large number of executors and constraints.
+ * This test can succeed only with new style config that allows maxCoLocationCnt = parallelismMultiplier.
+ * In prior code, this test would succeed because effectively the old code did not properly enforce the
+ * SPREAD constraint.
*
* Cluster has sufficient resources for scheduling to succeed but can fail due to StackOverflowError.
*/
- @ParameterizedTest
- @ValueSource(ints = {1, 20})
- public void testScheduleLargeExecutorConstraintCount(int parallelismMultiplier) {
+ @Test
+ public void testScheduleLargeExecutorConstraintCountLarge() {
+ testScheduleLargeExecutorConstraintCount(20);
+ }
+
+ private void testScheduleLargeExecutorConstraintCount(int parallelismMultiplier) {
+ if (parallelismMultiplier > 1 && !consolidatedConfigFlag) {
+ Assert.assertFalse("Large parallelism test requires new consolidated constraint format with maxCoLocationCnt=" + parallelismMultiplier, consolidatedConfigFlag);
+ return;
+ }
+
// Add 1 topology with large number of executors and constraints. Too many can cause a java.lang.StackOverflowError
Config config = createCSSClusterConfig(10, 10, 0, null);
config.put(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH, 50000);
@@ -219,15 +389,20 @@ public class TestConstraintSolverStrategy {
config.put(DaemonConfig.SCHEDULING_TIMEOUT_SECONDS_PER_TOPOLOGY, 120);
List<List<String>> constraints = new LinkedList<>();
- addContraints("spout-0", "spout-0", constraints);
- addContraints("bolt-1", "bolt-1", constraints);
- addContraints("spout-0", "bolt-0", constraints);
- addContraints("bolt-2", "spout-0", constraints);
- addContraints("bolt-1", "bolt-2", constraints);
- addContraints("bolt-1", "bolt-0", constraints);
- addContraints("bolt-1", "spout-0", constraints);
-
- config.put(Config.TOPOLOGY_RAS_CONSTRAINTS, constraints);
+ addConstraints("spout-0", "spout-0", constraints);
+ addConstraints("bolt-1", "bolt-1", constraints);
+ addConstraints("spout-0", "bolt-0", constraints);
+ addConstraints("bolt-2", "spout-0", constraints);
+ addConstraints("bolt-1", "bolt-2", constraints);
+ addConstraints("bolt-1", "bolt-0", constraints);
+ addConstraints("bolt-1", "spout-0", constraints);
+
+ Map<String, Integer> spreads = new HashMap<>();
+ spreads.put("spout-0", parallelismMultiplier);
+ spreads.put("bolt-1", parallelismMultiplier);
+
+ setConstraintConfig(constraints, spreads, config);
+
TopologyDetails topo = genTopology("testTopo-" + parallelismMultiplier, config, 10, 10, 30 * parallelismMultiplier, 30 * parallelismMultiplier, 31414, 0, "user");
Topologies topologies = new Topologies(topo);
@@ -239,24 +414,20 @@ public class TestConstraintSolverStrategy {
scheduler.schedule(topologies, cluster);
boolean scheduleSuccess = isStatusSuccess(cluster.getStatus(topo.getId()));
- LOG.info("testScheduleLargeExecutorCount scheduling {} with {}x executor multiplier", scheduleSuccess ? "succeeds" : "fails",
- parallelismMultiplier);
+ LOG.info("testScheduleLargeExecutorCount scheduling {} with {}x executor multiplier, consolidatedConfigFlag={}",
+ scheduleSuccess ? "succeeds" : "fails", parallelismMultiplier, consolidatedConfigFlag);
Assert.assertTrue(scheduleSuccess);
}
@Test
public void testIntegrationWithRAS() {
- List<List<String>> constraints = new LinkedList<>();
- addContraints("spout-0", "bolt-0", constraints);
- addContraints("bolt-1", "bolt-1", constraints);
- addContraints("bolt-1", "bolt-2", constraints);
- List<String> spread = new LinkedList<>();
- spread.add("spout-0");
+ if (!consolidatedConfigFlag) {
+ LOG.info("Skipping test since bolt-1 maxCoLocationCnt=10 requires consolidatedConfigFlag=true, current={}", consolidatedConfigFlag);
+ return;
+ }
Map<String, Object> config = Utils.readDefaultConfig();
config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, ConstraintSolverStrategy.class.getName());
- config.put(Config.TOPOLOGY_SPREAD_COMPONENTS, spread);
- config.put(Config.TOPOLOGY_RAS_CONSTRAINTS, constraints);
config.put(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH, MAX_TRAVERSAL_DEPTH);
config.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 100_000);
config.put(Config.TOPOLOGY_PRIORITY, 1);
@@ -264,17 +435,28 @@ public class TestConstraintSolverStrategy {
config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 100);
config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 0.0);
+ List<List<String>> constraints = new LinkedList<>();
+ addConstraints("spout-0", "bolt-0", constraints);
+ addConstraints("bolt-1", "bolt-1", constraints);
+ addConstraints("bolt-1", "bolt-2", constraints);
+
+ Map<String, Integer> spreads = new HashMap<String, Integer>();
+ spreads.put("spout-0", 1);
+ spreads.put("bolt-1", 10);
+
+ setConstraintConfig(constraints, spreads, config);
+
TopologyDetails topo = genTopology("testTopo", config, 2, 3, 30, 300, 0, 0, "user");
Map<String, TopologyDetails> topoMap = new HashMap<>();
topoMap.put(topo.getId(), topo);
Topologies topologies = new Topologies(topoMap);
- Map<String, SupervisorDetails> supMap = genSupervisors(30, 16, 400, 1024 * 4);
+ // Fails with 36 supervisors, works with 37
+ Map<String, SupervisorDetails> supMap = genSupervisors(37, 16, 400, 1024 * 4);
Cluster cluster = makeCluster(topologies, supMap);
ResourceAwareScheduler rs = new ResourceAwareScheduler();
rs.prepare(config);
try {
rs.schedule(topologies, cluster);
-
assertStatusSuccess(cluster, topo.getId());
Assert.assertEquals("topo all executors scheduled?", 0, cluster.getUnassignedExecutors(topo).size());
} finally {
@@ -293,22 +475,14 @@ public class TestConstraintSolverStrategy {
Map<String, SchedulerAssignment> newAssignments = new HashMap<>();
newAssignments.put(topo.getId(), new SchedulerAssignmentImpl(topo.getId(), newExecToSlot, null, null));
cluster.setAssignments(newAssignments, false);
-
+
rs.prepare(config);
try {
rs.schedule(topologies, cluster);
-
assertStatusSuccess(cluster, topo.getId());
Assert.assertEquals("topo all executors scheduled?", 0, cluster.getUnassignedExecutors(topo).size());
} finally {
rs.cleanup();
}
}
-
- public static void addContraints(String comp1, String comp2, List<List<String>> constraints) {
- LinkedList<String> constraintPair = new LinkedList<>();
- constraintPair.add(comp1);
- constraintPair.add(comp2);
- constraints.add(constraintPair);
- }
}