You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/03/11 23:10:44 UTC

[storm] branch master updated: [STORM-3585] New compact Constraint config including maxNodeCoLocationCnt and incompatibleComponents (#3215)

This is an automated email from the ASF dual-hosted git repository.

ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 1137f61  [STORM-3585] New compact Constraint config including maxNodeCoLocationCnt and incompatibleComponents (#3215)
1137f61 is described below

commit 1137f61aad3e277f102fb8baf6ee24a5d183da32
Author: Bipin Prasad <bi...@yahoo.com>
AuthorDate: Wed Mar 11 18:10:30 2020 -0500

    [STORM-3585] New compact Constraint config including maxNodeCoLocationCnt and incompatibleComponents (#3215)
---
 storm-client/src/jvm/org/apache/storm/Config.java  |  35 +-
 .../apache/storm/validation/ConfigValidation.java  | 133 ++++++-
 .../validation/ConfigValidationAnnotations.java    |  12 +
 .../jvm/org/apache/storm/TestConfigValidate.java   | 143 ++++++-
 .../apache/storm/scheduler/resource/RasNode.java   |   7 +-
 .../scheduling/ConstraintSolverStrategy.java       | 411 ++++++++++++++-------
 .../scheduling/TestConstraintSolverStrategy.java   | 284 +++++++++++---
 7 files changed, 829 insertions(+), 196 deletions(-)

diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index 7a1f689..2c95521 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -29,6 +29,7 @@ import org.apache.storm.metric.IEventLogger;
 import org.apache.storm.policy.IWaitStrategy;
 import org.apache.storm.serialization.IKryoDecorator;
 import org.apache.storm.serialization.IKryoFactory;
+import org.apache.storm.utils.ShellLogHandler;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.validation.ConfigValidation;
 import org.apache.storm.validation.ConfigValidation.EventLoggerRegistryValidator;
@@ -36,9 +37,11 @@ import org.apache.storm.validation.ConfigValidation.ListOfListOfStringValidator;
 import org.apache.storm.validation.ConfigValidation.MapOfStringToMapOfStringToObjectValidator;
 import org.apache.storm.validation.ConfigValidation.MetricRegistryValidator;
 import org.apache.storm.validation.ConfigValidation.MetricReportersValidator;
+import org.apache.storm.validation.ConfigValidation.RasConstraintsTypeValidator;
 import org.apache.storm.validation.ConfigValidationAnnotations;
 import org.apache.storm.validation.ConfigValidationAnnotations.CustomValidator;
 import org.apache.storm.validation.ConfigValidationAnnotations.IsBoolean;
+import org.apache.storm.validation.ConfigValidationAnnotations.IsExactlyOneOf;
 import org.apache.storm.validation.ConfigValidationAnnotations.IsImplementationOfClass;
 import org.apache.storm.validation.ConfigValidationAnnotations.IsInteger;
 import org.apache.storm.validation.ConfigValidationAnnotations.IsKryoReg;
@@ -304,16 +307,40 @@ public class Config extends HashMap<String, Object> {
     // an error will be thrown by nimbus on topology submission and not by the client prior to submitting
     // the topology.
     public static final String TOPOLOGY_SCHEDULER_STRATEGY = "topology.scheduler.strategy";
+
     /**
-     * Declare scheduling constraints for a topology used by the constraint solver strategy. A List of pairs (also a list) of components
-     * that cannot coexist in the same worker.
+     * Declare scheduling constraints for a topology used by the constraint solver strategy. The format can be either
+     * old style (validated by ListOfListOfStringValidator.class or the newer style, which is a list of specific type of
+     * Maps (validated by RasConstraintsTypeValidator.class). The value must be in one or the other format.
+     *
+     * <p>
+     * Old style Config.TOPOLOGY_RAS_CONSTRAINTS (ListOfListOfString) specified a list of components that cannot
+     * co-exist on the same Worker.
+     * </p>
+     *
+     * <p>
+     * New style Config.TOPOLOGY_RAS_CONSTRAINTS is map where each component has a list of other incompatible components
+     * (which serves the same function as the old style configuration) and optional number that specifies
+     * the maximum co-location count for the component on a node.
+     * </p>
+     *
+     * <p>comp-1 cannot exist on same worker as comp-2 or comp-3, and at most "2" comp-1 on same node</p>
+     * <p>comp-2 and comp-4 cannot be on same worker (missing comp-1 is implied from comp-1 constraint)</p>
+     *
+     *  <p>
+     *      { "comp-1": { "maxNodeCoLocationCnt": 2, "incompatibleComponents": ["comp-2", "comp-3" ] },
+     *        "comp-2": { "incompatibleComponents": [ "comp-4" ] }
+     *      }
+     *  </p>
      */
-    @CustomValidator(validatorClass = ListOfListOfStringValidator.class)
+    @IsExactlyOneOf(valueValidatorClasses = { ListOfListOfStringValidator.class, RasConstraintsTypeValidator.class })
     public static final String TOPOLOGY_RAS_CONSTRAINTS = "topology.ras.constraints";
     /**
      * Array of components that scheduler should try to place on separate hosts when using the constraint solver strategy or the
-     * multi-tenant scheduler.
+     * multi-tenant scheduler. Note that this configuration can be specified in TOPOLOGY_RAS_CONSTRAINTS using the
+     * "maxNodeCoLocationCnt" map entry with value of 1.
      */
+    @Deprecated
     @IsStringList
     public static final String TOPOLOGY_SPREAD_COMPONENTS = "topology.spread.components";
     /**
diff --git a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
index eb4ce7d..4bf7432 100644
--- a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
+++ b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
@@ -27,6 +27,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
+
 import org.apache.storm.Config;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.validation.ConfigValidationAnnotations.ValidatorParams;
@@ -586,8 +588,8 @@ public class ConfigValidation {
                         ((Validator) v).validateField(name + " list entry", entry);
                     } else {
                         LOG.warn(
-                            "validator: {} cannot be used in ListEntryCustomValidator.  Individual entry validators must a instance of "
-                                    + "Validator class",
+                            "validator: {} cannot be used in ListEntryCustomValidator. "
+                                    + "Individual entry validators must be an instance of Validator class",
                             validator.getName());
                     }
                 }
@@ -656,8 +658,8 @@ public class ConfigValidation {
                         ((Validator) keyValidator).validateField(name + " Map key", entry.getKey());
                     } else {
                         LOG.warn(
-                            "validator: {} cannot be used in MapEntryCustomValidator to validate keys.  Individual entry validators must "
-                                    + "a instance of Validator class",
+                            "validator: {} cannot be used in MapEntryCustomValidator to validate keys. "
+                                    + "Individual entry validators must be an instance of Validator class",
                             kv.getName());
                     }
                 }
@@ -667,8 +669,8 @@ public class ConfigValidation {
                         ((Validator) valueValidator).validateField(name + " Map value", entry.getValue());
                     } else {
                         LOG.warn(
-                            "validator: {} cannot be used in MapEntryCustomValidator to validate values.  Individual entry validators "
-                                    + "must a instance of Validator class",
+                            "validator: {} cannot be used in MapEntryCustomValidator to validate values. "
+                                    + "Individual entry validators must be an instance of Validator class",
                             vv.getName());
                     }
                 }
@@ -850,6 +852,125 @@ public class ConfigValidation {
         }
     }
 
+    public static class CustomIsExactlyOneOfValidators extends Validator {
+        private Class<?>[] subValidators;
+        private List<String> validatorClassNames;
+
+        public CustomIsExactlyOneOfValidators(Map<String, Object> params) {
+            this.subValidators = (Class<?>[]) params.get(ConfigValidationAnnotations.ValidatorParams.VALUE_VALIDATOR_CLASSES);
+            this.validatorClassNames = Arrays.asList(subValidators).stream().map(x -> x.getName()).collect(Collectors.toList());
+        }
+
+        @Override
+        public void validateField(String name, Object o) {
+            if (o == null) {
+                return;
+            }
+
+            HashMap<String, Exception> validatorExceptions = new HashMap<>();
+            Set<String> selectedValidators = new HashSet<>();
+            for (Class<?> vv : subValidators) {
+                Object valueValidator;
+                try {
+                    valueValidator = vv.getConstructor().newInstance();
+                } catch (Exception ex) {
+                    throw new IllegalArgumentException(vv.getName() + " instantiation failure", ex);
+                }
+                if (valueValidator instanceof Validator) {
+                    try {
+                        ((Validator) valueValidator).validateField(name + " " + vv.getSimpleName() + " value", o);
+                        selectedValidators.add(vv.getName());
+                    } catch (Exception ex) {
+                        // only one will pass, so ignore all validation errors - stored for future use
+                        validatorExceptions.put(vv.getName(), ex);
+                    }
+                } else {
+                    String err = String.format("validator: %s cannot be used in CustomExactlyOneOfValidators to validate values. "
+                            + "Individual entry validators must a instance of Validator class", vv.getName());
+                    LOG.warn(err);
+                }
+            }
+            // check if one and only one validation succeeded
+            if (selectedValidators.isEmpty()) {
+                String parseErrs = String.join(";\n\t", validatorExceptions.entrySet().stream()
+                        .map(e -> String.format("%s:%s", e.getKey(), e.getValue())).collect(Collectors.toList()));
+                String err = String.format("Field %s must be one of %s; parse errors are \n\t%s", name,
+                        String.join(", ", validatorClassNames), parseErrs);
+                throw new IllegalArgumentException(err);
+            }
+            if (selectedValidators.size() > 1) {
+                throw new IllegalArgumentException("Field " + name + " must match exactly one of " + String.join(", ", selectedValidators));
+            }
+        }
+    }
+
+    public static class RasConstraintsTypeValidator extends Validator {
+        public static final String CONSTRAINT_TYPE_MAX_NODE_CO_LOCATION_CNT = "maxNodeCoLocationCnt";
+        public static final String CONSTRAINT_TYPE_INCOMPATIBLE_COMPONENTS = "incompatibleComponents";
+
+        @Override
+        public void validateField(String name, Object o) {
+            if (o == null) {
+                return;
+            }
+            if (!(o instanceof Map)) {
+                throw new IllegalArgumentException(
+                        "Field " + name + " must be an Iterable containing only Map of Maps");
+            }
+            Map<String, Object> map1 = (Map<String, Object>) o;
+            for (Map.Entry<String, Object> entry1: map1.entrySet()) {
+                String comp1 = entry1.getKey();
+                Object o2 = entry1.getValue();
+                if (!(o2 instanceof Map)) {
+                    String err = String.format("Field %s, component %s, expecting constraints Map with keys [\"%s\", \"%s\"], in \"%s\"",
+                            name, comp1, CONSTRAINT_TYPE_MAX_NODE_CO_LOCATION_CNT, CONSTRAINT_TYPE_INCOMPATIBLE_COMPONENTS, o);
+                    throw new IllegalArgumentException(err);
+                }
+                Map<String, Object> map2 = (Map<String, Object>) o2;
+                for (Map.Entry<String, Object> entry2: map2.entrySet()) {
+                    String constraintType = entry2.getKey();
+                    Object o3 = entry2.getValue();
+                    switch (constraintType) {
+                        case CONSTRAINT_TYPE_MAX_NODE_CO_LOCATION_CNT:
+                            try {
+                                Integer.parseInt("" + o3);
+                            } catch (Exception ex) {
+                                String err = String.format("Field %s, component %s, constraint %s should be a number, not \"%s\"",
+                                        name, comp1, constraintType, o3);
+                                throw new IllegalArgumentException(err);
+                            }
+                            break;
+
+                        case CONSTRAINT_TYPE_INCOMPATIBLE_COMPONENTS:
+                            if (o3 instanceof String) {
+                                break;
+                            } else if (o3 instanceof List) {
+                                for (Object otherComp : (List) o3) {
+                                    if (otherComp instanceof String) {
+                                        continue;
+                                    }
+                                    String err = String.format(
+                                            "Field %s, component %s, constraintType \"%s\", expecting incompatible component-name, "
+                                                    + "found instance of class \"%s\" value \"%s\"",
+                                            name, comp1, constraintType, o3.getClass().getName(), otherComp);
+                                    throw new IllegalArgumentException(err);
+                                }
+                            }
+                            break;
+
+                        default:
+                            String err = String.format(
+                                    "Field %s, component %s, has unsupported constraintType \"%s\", expecting one of [\"%s\", \"%s\"], "
+                                            + "in \"%s\"",
+                                    name, comp1, constraintType, CONSTRAINT_TYPE_MAX_NODE_CO_LOCATION_CNT,
+                                    CONSTRAINT_TYPE_INCOMPATIBLE_COMPONENTS, o);
+                            throw new IllegalArgumentException(err);
+                    }
+                }
+            }
+        }
+    }
+
     public static class UserResourcePoolEntryValidator extends Validator {
 
         @Override
diff --git a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidationAnnotations.java b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidationAnnotations.java
index ac8ba9d..ce22c3c 100644
--- a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidationAnnotations.java
+++ b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidationAnnotations.java
@@ -197,6 +197,18 @@ public class ConfigValidationAnnotations {
         Class<?> validatorClass();
     }
 
+    /**
+     * Custom validator where exactly one of the validations must be successful.
+     * Used for overloaded configuration, where value must match one (and exactly one)
+     * format of supplied values.
+     */
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface IsExactlyOneOf {
+        Class<?> validatorClass() default ConfigValidation.CustomIsExactlyOneOfValidators.class;
+        Class<?>[] valueValidatorClasses();
+    }
+
     @Retention(RetentionPolicy.RUNTIME)
     @Target(ElementType.FIELD)
     public @interface Password {
diff --git a/storm-client/test/jvm/org/apache/storm/TestConfigValidate.java b/storm-client/test/jvm/org/apache/storm/TestConfigValidate.java
index 2ece764..dedf937 100644
--- a/storm-client/test/jvm/org/apache/storm/TestConfigValidate.java
+++ b/storm-client/test/jvm/org/apache/storm/TestConfigValidate.java
@@ -26,7 +26,10 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import javax.security.auth.Subject;
+
 import org.apache.storm.blobstore.BlobStore;
 import org.apache.storm.blobstore.NimbusBlobStore;
 import org.apache.storm.generated.AuthorizationException;
@@ -41,13 +44,15 @@ import org.apache.storm.validation.ConfigValidation.ImpersonationAclUserEntryVal
 import org.apache.storm.validation.ConfigValidation.IntegerValidator;
 import org.apache.storm.validation.ConfigValidation.KryoRegValidator;
 import org.apache.storm.validation.ConfigValidation.ListEntryTypeValidator;
+import org.apache.storm.validation.ConfigValidation.ListOfListOfStringValidator;
 import org.apache.storm.validation.ConfigValidation.NoDuplicateInListValidator;
 import org.apache.storm.validation.ConfigValidation.NotNullValidator;
 import org.apache.storm.validation.ConfigValidation.PositiveNumberValidator;
 import org.apache.storm.validation.ConfigValidation.PowerOf2Validator;
+import org.apache.storm.validation.ConfigValidation.RasConstraintsTypeValidator;
 import org.apache.storm.validation.ConfigValidation.StringValidator;
 import org.apache.storm.validation.ConfigValidation.UserResourcePoolEntryValidator;
-import org.apache.storm.validation.ConfigValidationAnnotations.NotNull;
+import org.apache.storm.validation.ConfigValidationAnnotations.IsExactlyOneOf;
 import org.apache.storm.validation.ConfigValidationAnnotations.IsImplementationOfClass;
 import org.apache.storm.validation.ConfigValidationAnnotations.IsListEntryCustom;
 import org.apache.storm.validation.ConfigValidationAnnotations.IsListEntryType;
@@ -55,6 +60,8 @@ import org.apache.storm.validation.ConfigValidationAnnotations.IsMapEntryCustom;
 import org.apache.storm.validation.ConfigValidationAnnotations.IsMapEntryType;
 import org.apache.storm.validation.ConfigValidationAnnotations.IsNoDuplicateInList;
 import org.apache.storm.validation.ConfigValidationAnnotations.IsString;
+import org.apache.storm.validation.ConfigValidationAnnotations.NotNull;
+
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -178,7 +185,6 @@ public class TestConfigValidate {
         ConfigValidation.validateFields(conf);
     }
 
-
     @Test
     public void testWorkerChildoptsIsStringOrStringList() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException,
         InstantiationException, IllegalAccessException {
@@ -542,6 +548,135 @@ public class TestConfigValidate {
     }
 
     @Test
+    public void testExactlyOneOfCustomAnnotation() {
+        TestConfig config = new TestConfig();
+        Collection<Object> passCases = new LinkedList<Object>();
+        Collection<Object> failCases = new LinkedList<Object>();
+
+        List<Object> passCaseListOfList = new ArrayList<>();
+        passCaseListOfList.add(Arrays.asList("comp1", "comp2"));
+        passCaseListOfList.add(Arrays.asList("comp1", "comp3"));
+        passCaseListOfList.add(Arrays.asList("comp2", "comp4"));
+        passCaseListOfList.add(Arrays.asList("comp2", "comp5"));
+
+        Map<Object, Object> passCaseMapOfMap = new HashMap<>();
+        passCaseMapOfMap.put("comp1",
+                Stream.of(new Object[][] {
+                        { RasConstraintsTypeValidator.CONSTRAINT_TYPE_MAX_NODE_CO_LOCATION_CNT, 10 },
+                        { RasConstraintsTypeValidator.CONSTRAINT_TYPE_INCOMPATIBLE_COMPONENTS, Arrays.asList("comp2", "comp3")},
+                }).collect(Collectors.toMap(data -> data[0], data -> data[1]))
+        );
+        passCaseMapOfMap.put("comp2",
+                Stream.of(new Object[][] {
+                        { RasConstraintsTypeValidator.CONSTRAINT_TYPE_MAX_NODE_CO_LOCATION_CNT, 2 },
+                        { RasConstraintsTypeValidator.CONSTRAINT_TYPE_INCOMPATIBLE_COMPONENTS, Arrays.asList("comp4", "comp5")},
+                }).collect(Collectors.toMap(data -> data[0], data -> data[1]))
+        );
+        passCases.add(passCaseMapOfMap);
+
+        passCaseMapOfMap = new HashMap<>();
+        passCaseMapOfMap.put("comp1",
+                Stream.of(new Object[][] {
+                        { RasConstraintsTypeValidator.CONSTRAINT_TYPE_INCOMPATIBLE_COMPONENTS, Arrays.asList("comp2", "comp3")},
+                }).collect(Collectors.toMap(data -> data[0], data -> data[1]))
+        );
+        passCaseMapOfMap.put("comp2",
+                Stream.of(new Object[][] {
+                        { RasConstraintsTypeValidator.CONSTRAINT_TYPE_MAX_NODE_CO_LOCATION_CNT, 2 },
+                        { RasConstraintsTypeValidator.CONSTRAINT_TYPE_INCOMPATIBLE_COMPONENTS, Arrays.asList("comp4", "comp5")},
+                }).collect(Collectors.toMap(data -> data[0], data -> data[1]))
+        );
+        passCases.add(passCaseMapOfMap);
+
+        passCaseMapOfMap = new HashMap<>();
+        passCaseMapOfMap.put("comp1",
+                Stream.of(new Object[][] {
+                        { RasConstraintsTypeValidator.CONSTRAINT_TYPE_INCOMPATIBLE_COMPONENTS, "comp2"},
+                }).collect(Collectors.toMap(data -> data[0], data -> data[1]))
+        );
+        passCaseMapOfMap.put("comp2",
+                Stream.of(new Object[][] {
+                        { RasConstraintsTypeValidator.CONSTRAINT_TYPE_MAX_NODE_CO_LOCATION_CNT, 2 },
+                        { RasConstraintsTypeValidator.CONSTRAINT_TYPE_INCOMPATIBLE_COMPONENTS, "comp4"},
+                }).collect(Collectors.toMap(data -> data[0], data -> data[1]))
+        );
+        passCases.add(passCaseMapOfMap);
+
+        for (Object value : passCases) {
+            config.put(TestConfig.TEST_MAP_CONFIG_9, value);
+            ConfigValidation.validateFields(config, Arrays.asList(TestConfig.class));
+        }
+
+        List<Object> failCaseList = new ArrayList<>();
+        failCaseList.add(Arrays.asList("comp1", Arrays.asList("comp2", "comp3")));
+        failCaseList.add(Arrays.asList("comp3", Arrays.asList("comp4", "comp5")));
+        failCases.add(failCaseList);
+
+        Map<String, Object> failCaseMapOfMap = new HashMap<>();
+        failCaseMapOfMap.put("comp1",
+                Stream.of(new Object[][] {
+                        { RasConstraintsTypeValidator.CONSTRAINT_TYPE_MAX_NODE_CO_LOCATION_CNT, 10 },
+                        { RasConstraintsTypeValidator.CONSTRAINT_TYPE_INCOMPATIBLE_COMPONENTS, Arrays.asList(1, 2, 3)},
+                }).collect(Collectors.toMap(data -> data[0], data -> data[1]))
+        );
+        failCaseMapOfMap.put("comp2",
+                Stream.of(new Object[][] {
+                        { RasConstraintsTypeValidator.CONSTRAINT_TYPE_MAX_NODE_CO_LOCATION_CNT, 2 },
+                        { RasConstraintsTypeValidator.CONSTRAINT_TYPE_INCOMPATIBLE_COMPONENTS, Arrays.asList("comp4", "comp5")},
+                }).collect(Collectors.toMap(data -> data[0], data -> data[1]))
+        );
+        failCases.add(failCaseMapOfMap);
+
+        failCaseMapOfMap = new HashMap<>();
+        failCaseMapOfMap.put("comp1",
+                Stream.of(new Object[][] {
+                        { RasConstraintsTypeValidator.CONSTRAINT_TYPE_MAX_NODE_CO_LOCATION_CNT, 10 },
+                        { RasConstraintsTypeValidator.CONSTRAINT_TYPE_INCOMPATIBLE_COMPONENTS, Arrays.asList("comp1", 3)},
+                }).collect(Collectors.toMap(data -> data[0], data -> data[1]))
+        );
+        failCases.add(failCaseMapOfMap);
+
+        failCaseMapOfMap = new HashMap<>();
+        failCaseMapOfMap.put("comp1", Arrays.asList("comp2", "comp3"));
+        failCaseMapOfMap.put("comp2", Arrays.asList("comp4", "comp5"));
+        failCases.add(failCaseMapOfMap);
+
+        failCaseMapOfMap = new HashMap<>();
+        failCaseMapOfMap.put("aaa", "str");
+        failCaseMapOfMap.put("bbb", 6);
+        failCaseMapOfMap.put("ccc", 7);
+        failCases.add(failCaseMapOfMap);
+
+        failCaseMapOfMap = new HashMap<>();
+        failCaseMapOfMap.put("aaa", -1);
+        failCaseMapOfMap.put("bbb", 6);
+        failCaseMapOfMap.put("ccc", 7);
+        failCases.add(failCaseMapOfMap);
+
+        failCaseMapOfMap = new HashMap<>();
+        failCaseMapOfMap.put("aaa", 1);
+        failCaseMapOfMap.put("bbb", 6);
+        failCaseMapOfMap.put("ccc", 7.4);
+        failCases.add(failCaseMapOfMap);
+
+        failCaseMapOfMap = new HashMap<>();
+        failCaseMapOfMap.put("comp1", "comp2");
+        failCaseMapOfMap.put("comp2", "comp4");
+        failCases.add(failCaseMapOfMap);
+
+        failCases.add(null);
+
+        for (Object value : failCases) {
+            try {
+                config.put(TestConfig.TEST_MAP_CONFIG_9, value);
+                ConfigValidation.validateFields(config, Arrays.asList(TestConfig.class));
+                Assert.fail("Expected Exception not Thrown for value: " + value);
+            } catch (IllegalArgumentException Ex) {
+            }
+        }
+    }
+
+    @Test
     public void testListEntryTypeAnnotation() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException,
         InstantiationException, IllegalAccessException {
         TestConfig config = new TestConfig();
@@ -796,5 +931,9 @@ public class TestConfigValidate {
         @IsImplementationOfClass(implementsClass = org.apache.storm.networktopography.DNSToSwitchMapping.class)
         @NotNull
         public static final String TEST_MAP_CONFIG_8 = "test.map.config.8";
+
+        @IsExactlyOneOf(valueValidatorClasses = {ListOfListOfStringValidator.class, RasConstraintsTypeValidator.class})
+        @NotNull
+        public static final String TEST_MAP_CONFIG_9 = "test.map.config.9";
     }
 }
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RasNode.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RasNode.java
index 2050bc3..63c9417 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RasNode.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RasNode.java
@@ -41,7 +41,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Represents a single node in the cluster.
  */
-public class RasNode {
+public class RasNode implements Comparable<RasNode> {
     private static final Logger LOG = LoggerFactory.getLogger(RasNode.class);
     private final String nodeId;
     private final Cluster cluster;
@@ -523,4 +523,9 @@ public class RasNode {
             return 0.0;
         }
     }
+
+    @Override
+    public int compareTo(RasNode o) {
+        return this.nodeId.compareTo(o.nodeId);
+    }
 }
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
index 81994ba..66d2b15 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
@@ -12,7 +12,10 @@
 
 package org.apache.storm.scheduler.resource.strategies.scheduling;
 
+import com.google.common.collect.Sets;
+
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -24,6 +27,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.stream.Collectors;
+
 import org.apache.storm.Config;
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.scheduler.Cluster;
@@ -38,71 +42,176 @@ import org.apache.storm.scheduler.resource.SchedulingStatus;
 import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.Time;
+import org.apache.storm.validation.ConfigValidation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
-    //hard coded max number of states to search
     private static final Logger LOG = LoggerFactory.getLogger(ConstraintSolverStrategy.class);
 
-    //constraints and spreads
-    private Map<String, Map<String, Integer>> constraintMatrix;
-    private HashSet<String> spreadComps = new HashSet<>();
+    public static final String CONSTRAINT_TYPE_MAX_NODE_CO_LOCATION_CNT = "maxNodeCoLocationCnt";
+    public static final String CONSTRAINT_TYPE_INCOMPATIBLE_COMPONENTS = "incompatibleComponents";
 
-    private Map<String, RasNode> nodes;
-    private Map<ExecutorDetails, String> execToComp;
-    private Map<String, Set<ExecutorDetails>> compToExecs;
-    private List<String> favoredNodeIds;
-    private List<String> unFavoredNodeIds;
+    /**
+     * Component constraint as derived from configuration.
+     * This is backward compatible and can parse old style Config.TOPOLOGY_RAS_CONSTRAINTS and Config.TOPOLOGY_SPREAD_COMPONENTS.
+     * New style Config.TOPOLOGY_RAS_CONSTRAINTS is map where each component has a list of other incompatible components
+     * and an optional number that specifies the maximum co-location count for the component on a node.
+     *
+     * <p>comp-1 cannot exist on same worker as comp-2 or comp-3, and at most "2" comp-1 on same node</p>
+     * <p>comp-2 and comp-4 cannot be on same worker (missing comp-1 is implied from comp-1 constraint)</p>
+     *
+     *  <p>
+     *      { "comp-1": { "maxNodeCoLocationCnt": 2, "incompatibleComponents": ["comp-2", "comp-3" ] },
+     *        "comp-2": { "incompatibleComponents": [ "comp-4" ] }
+     *      }
+     *  </p>
+     */
+    public static final class ConstraintConfig {
+        private Map<String, Set<String>> incompatibleComponents = new HashMap<>();
+        private Map<String, Integer> maxCoLocationCnts = new HashMap<>(); // maximum node CoLocationCnt for restricted components
 
-    static Map<String, Map<String, Integer>> getConstraintMap(TopologyDetails topo, Set<String> comps) {
-        Map<String, Map<String, Integer>> matrix = new HashMap<>();
-        for (String comp : comps) {
-            matrix.put(comp, new HashMap<>());
-            for (String comp2 : comps) {
-                matrix.get(comp).put(comp2, 0);
-            }
+        ConstraintConfig(TopologyDetails topo) {
+            // getExecutorToComponent().values() also contains system components
+            this(topo.getConf(), Sets.union(topo.getComponents().keySet(), new HashSet(topo.getExecutorToComponent().values())));
         }
-        List<List<String>> constraints = (List<List<String>>) topo.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINTS);
-        if (constraints != null) {
-            for (List<String> constraintPair : constraints) {
-                String comp1 = constraintPair.get(0);
-                String comp2 = constraintPair.get(1);
-                if (!matrix.containsKey(comp1)) {
-                    LOG.warn("Comp: {} declared in constraints is not valid!", comp1);
-                    continue;
+
+        ConstraintConfig(Map<String, Object> conf, Set<String> comps) {
+            Object rasConstraints = conf.get(Config.TOPOLOGY_RAS_CONSTRAINTS);
+            comps.forEach(k -> incompatibleComponents.computeIfAbsent(k, x -> new HashSet<>()));
+            if (rasConstraints instanceof List) {
+                // old style
+                List<List<String>> constraints = (List<List<String>>) rasConstraints;
+                for (List<String> constraintPair : constraints) {
+                    String comp1 = constraintPair.get(0);
+                    String comp2 = constraintPair.get(1);
+                    if (!comps.contains(comp1)) {
+                        LOG.warn("Comp: {} declared in constraints is not valid!", comp1);
+                        continue;
+                    }
+                    if (!comps.contains(comp2)) {
+                        LOG.warn("Comp: {} declared in constraints is not valid!", comp2);
+                        continue;
+                    }
+                    incompatibleComponents.get(comp1).add(comp2);
+                    incompatibleComponents.get(comp2).add(comp1);
                 }
-                if (!matrix.containsKey(comp2)) {
-                    LOG.warn("Comp: {} declared in constraints is not valid!", comp2);
-                    continue;
+            } else {
+                Map<String, Map<String, ?>> constraintMap = (Map<String, Map<String, ?>>) rasConstraints;
+                constraintMap.forEach((comp1, v) -> {
+                    if (comps.contains(comp1)) {
+                        v.forEach((ctype, constraint) -> {
+                            switch (ctype) {
+                                case CONSTRAINT_TYPE_MAX_NODE_CO_LOCATION_CNT:
+                                    try {
+                                        int numValue = Integer.parseInt("" + constraint);
+                                        if (numValue < 1) {
+                                            LOG.warn("{} {} declared for Comp {} is not valid, expected >= 1", ctype, numValue, comp1);
+                                        } else {
+                                            maxCoLocationCnts.put(comp1, numValue);
+                                        }
+                                    } catch (Exception ex) {
+                                        LOG.warn("{} {} declared for Comp {} is not valid, expected >= 1", ctype, constraint, comp1);
+                                    }
+                                    break;
+
+                                case CONSTRAINT_TYPE_INCOMPATIBLE_COMPONENTS:
+                                    if (!(constraint instanceof List || constraint instanceof String)) {
+                                        LOG.warn("{} {} declared for Comp {} is not valid, expecting a list of components or 1 component",
+                                                ctype, constraint, comp1);
+                                        break;
+                                    }
+                                    List<String> list;
+                                    list = (constraint instanceof String) ? Arrays.asList((String) constraint) : (List<String>) constraint;
+                                    for (String comp2: list) {
+                                        if (!comps.contains(comp2)) {
+                                            LOG.warn("{} {} declared for Comp {} is not a valid component", ctype, comp2, comp1);
+                                            continue;
+                                        }
+                                        incompatibleComponents.get(comp1).add(comp2);
+                                        incompatibleComponents.get(comp2).add(comp1);
+                                    }
+                                    break;
+
+                                default:
+                                    LOG.warn("ConstraintType={} invalid for component={}, valid values are {} and {}, ignoring value={}",
+                                            ctype, comp1, CONSTRAINT_TYPE_MAX_NODE_CO_LOCATION_CNT,
+                                            CONSTRAINT_TYPE_INCOMPATIBLE_COMPONENTS, constraint);
+                                    break;
+                            }
+                        });
+                    } else {
+                        LOG.warn("Component {} is not a valid component", comp1);
+                    }
+                });
+            }
+
+            // process Config.TOPOLOGY_SPREAD_COMPONENTS - old style
+            // override only if not defined already using Config.TOPOLOGY_RAS_COMPONENTS above
+            Object obj = conf.get(Config.TOPOLOGY_SPREAD_COMPONENTS);
+            if (obj instanceof List) {
+                List<String> spread = (List<String>) obj;
+                if (spread != null) {
+                    for (String comp : spread) {
+                        if (!comps.contains(comp)) {
+                            LOG.warn("Comp {} declared for spread not valid", comp);
+                            continue;
+                        }
+                        if (maxCoLocationCnts.containsKey(comp)) {
+                            LOG.warn("Comp {} maxNodeCoLocationCnt={} already defined in {}, ignoring spread config in {}", comp,
+                                    maxCoLocationCnts.get(comp), Config.TOPOLOGY_RAS_CONSTRAINTS, Config.TOPOLOGY_SPREAD_COMPONENTS);
+                            continue;
+                        }
+                        maxCoLocationCnts.put(comp, 1);
+                    }
                 }
-                matrix.get(comp1).put(comp2, 1);
-                matrix.get(comp2).put(comp1, 1);
+            } else {
+                LOG.warn("Ignoring invalid {} config={}", Config.TOPOLOGY_SPREAD_COMPONENTS, obj);
             }
         }
-        return matrix;
+
+        public Map<String, Set<String>> getIncompatibleComponents() {
+            return incompatibleComponents;
+        }
+
+        public Map<String, Integer> getMaxCoLocationCnts() {
+            return maxCoLocationCnts;
+        }
     }
 
+    private Map<String, RasNode> nodes;
+    private Map<ExecutorDetails, String> execToComp;
+    private Map<String, Set<ExecutorDetails>> compToExecs;
+    private List<String> favoredNodeIds;
+    private List<String> unFavoredNodeIds;
+    private ConstraintConfig constraintConfig;
+
     /**
      * Determines if a scheduling is valid and all constraints are satisfied.
      */
     @VisibleForTesting
-    public static boolean validateSolution(Cluster cluster, TopologyDetails td) {
-        return checkSpreadSchedulingValid(cluster, td)
-               && checkConstraintsSatisfied(cluster, td)
+    public static boolean validateSolution(Cluster cluster, TopologyDetails td, ConstraintConfig constraintConfig) {
+        if (constraintConfig == null) {
+            constraintConfig = new ConstraintConfig(td);
+        }
+        return checkSpreadSchedulingValid(cluster, td, constraintConfig)
+               && checkConstraintsSatisfied(cluster, td, constraintConfig)
                && checkResourcesCorrect(cluster, td);
     }
 
     /**
      * Check if constraints are satisfied.
      */
-    private static boolean checkConstraintsSatisfied(Cluster cluster, TopologyDetails topo) {
+    private static boolean checkConstraintsSatisfied(Cluster cluster, TopologyDetails topo, ConstraintConfig constraintConfig) {
         LOG.info("Checking constraints...");
         assert (cluster.getAssignmentById(topo.getId()) != null);
+        if (constraintConfig == null) {
+            constraintConfig = new ConstraintConfig(topo);
+        }
         Map<ExecutorDetails, WorkerSlot> result = cluster.getAssignmentById(topo.getId()).getExecutorToSlot();
         Map<ExecutorDetails, String> execToComp = topo.getExecutorToComponent();
         //get topology constraints
-        Map<String, Map<String, Integer>> constraintMatrix = getConstraintMap(topo, new HashSet<>(topo.getExecutorToComponent().values()));
+        Map<String, Set<String>> constraintMatrix = constraintConfig.incompatibleComponents;
 
         Map<WorkerSlot, Set<String>> workerCompMap = new HashMap<>();
         result.forEach((exec, worker) -> {
@@ -113,7 +222,7 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
             Set<String> comps = entry.getValue();
             for (String comp1 : comps) {
                 for (String comp2 : comps) {
-                    if (!comp1.equals(comp2) && constraintMatrix.get(comp1).get(comp2) != 0) {
+                    if (!comp1.equals(comp2) && constraintMatrix.get(comp1).contains(comp2)) {
                         LOG.error("Incorrect Scheduling: worker exclusion for Component {} and {} not satisfied on WorkerSlot: {}",
                                   comp1, comp2, entry.getKey());
                         return false;
@@ -134,38 +243,38 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
         return workerToNodes;
     }
 
-    private static boolean checkSpreadSchedulingValid(Cluster cluster, TopologyDetails topo) {
+    private static boolean checkSpreadSchedulingValid(Cluster cluster, TopologyDetails topo, ConstraintConfig constraintConfig) {
         LOG.info("Checking for a valid scheduling...");
         assert (cluster.getAssignmentById(topo.getId()) != null);
-        Map<ExecutorDetails, WorkerSlot> result = cluster.getAssignmentById(topo.getId()).getExecutorToSlot();
+        if (constraintConfig == null) {
+            constraintConfig = new ConstraintConfig(topo);
+        }
         Map<ExecutorDetails, String> execToComp = topo.getExecutorToComponent();
-        Map<WorkerSlot, HashSet<ExecutorDetails>> workerExecMap = new HashMap<>();
-        Map<WorkerSlot, HashSet<String>> workerCompMap = new HashMap<>();
-        Map<RasNode, HashSet<String>> nodeCompMap = new HashMap<>();
+        Map<String, Map<String, Integer>> nodeCompMap = new HashMap<>(); // this is the critical count
         Map<WorkerSlot, RasNode> workerToNodes = workerToNodes(cluster);
         boolean ret = true;
 
-        HashSet<String> spreadComps = getSpreadComps(topo);
-        for (Map.Entry<ExecutorDetails, WorkerSlot> entry : result.entrySet()) {
+        Map<String, Integer> spreadCompCnts = constraintConfig.maxCoLocationCnts;
+        for (Map.Entry<ExecutorDetails, WorkerSlot> entry : cluster.getAssignmentById(topo.getId()).getExecutorToSlot().entrySet()) {
             ExecutorDetails exec = entry.getKey();
+            String comp = execToComp.get(exec);
             WorkerSlot worker = entry.getValue();
             RasNode node = workerToNodes.get(worker);
-
-            if (workerExecMap.computeIfAbsent(worker, (k) -> new HashSet<>()).contains(exec)) {
-                LOG.error("Incorrect Scheduling: Found duplicate in scheduling");
-                return false;
-            }
-            workerExecMap.get(worker).add(exec);
-            String comp = execToComp.get(exec);
-            workerCompMap.computeIfAbsent(worker, (k) -> new HashSet<>()).add(comp);
-            if (spreadComps.contains(comp)) {
-                if (nodeCompMap.computeIfAbsent(node, (k) -> new HashSet<>()).contains(comp)) {
-                    LOG.error("Incorrect Scheduling: Spread for Component: {} {} on node {} not satisfied {}",
-                              comp, exec, node.getId(), nodeCompMap.get(node));
+            String nodeId = node.getId();
+
+            if (spreadCompCnts.containsKey(comp)) {
+                int allowedColocationMaxCnt = spreadCompCnts.get(comp);
+                Map<String, Integer> oneNodeCompMap = nodeCompMap.computeIfAbsent(nodeId, (k) -> new HashMap<>());
+                oneNodeCompMap.put(comp, oneNodeCompMap.getOrDefault(comp, 0) + 1);
+                if (allowedColocationMaxCnt < oneNodeCompMap.get(comp)) {
+                    LOG.error("Incorrect Scheduling: MaxCoLocationCnt for Component: {} {} on node {} not satisfied, cnt {} > allowed {}",
+                            comp, exec, nodeId, oneNodeCompMap.get(comp), allowedColocationMaxCnt);
                     ret = false;
                 }
             }
-            nodeCompMap.computeIfAbsent(node, (k) -> new HashSet<>()).add(comp);
+        }
+        if (!ret) {
+            LOG.error("Incorrect MaxCoLocationCnts: Node-Component-Cnt {}", nodeCompMap);
         }
         return ret;
     }
@@ -224,29 +333,13 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
         return true;
     }
 
-    private static HashSet<String> getSpreadComps(TopologyDetails topo) {
-        HashSet<String> retSet = new HashSet<>();
-        List<String> spread = (List<String>) topo.getConf().get(Config.TOPOLOGY_SPREAD_COMPONENTS);
-        if (spread != null) {
-            Set<String> comps = topo.getComponents().keySet();
-            for (String comp : spread) {
-                if (comps.contains(comp)) {
-                    retSet.add(comp);
-                } else {
-                    LOG.warn("Comp {} declared for spread not valid", comp);
-                }
-            }
-        }
-        return retSet;
-    }
-
     @Override
     public SchedulingResult schedule(Cluster cluster, TopologyDetails td) {
         prepare(cluster);
         LOG.debug("Scheduling {}", td.getId());
         nodes = RasNodes.getAllNodesFrom(cluster);
-        Map<WorkerSlot, Set<String>> workerCompAssignment = new HashMap<>();
-        Map<RasNode, Set<String>> nodeCompAssignment = new HashMap<>();
+        Map<WorkerSlot, Map<String, Integer>> workerCompAssignment = new HashMap<>();
+        Map<RasNode, Map<String, Integer>> nodeCompAssignment = new HashMap<>();
 
         int confMaxStateSearch = ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH));
         int daemonMaxStateSearch = ObjectReader.getInt(cluster.getConf().get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_STATE_SEARCH));
@@ -262,17 +355,15 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
         //get mapping of components to executors
         compToExecs = getCompToExecs(execToComp);
 
-        //get topology constraints
-        constraintMatrix = getConstraintMap(td, compToExecs.keySet());
-
-        //get spread components
-        spreadComps = getSpreadComps(td);
+        // get constraint configuration
+        constraintConfig = new ConstraintConfig(td);
 
         //get a sorted list of unassigned executors based on number of constraints
         Set<ExecutorDetails> unassignedExecutors = new HashSet<>(cluster.getUnassignedExecutors(td));
-        List<ExecutorDetails> sortedExecs = getSortedExecs(spreadComps, constraintMatrix, compToExecs).stream()
-                                                                                                      .filter(unassignedExecutors::contains)
-                                                                                                      .collect(Collectors.toList());
+        List<ExecutorDetails> sortedExecs;
+        sortedExecs = getSortedExecs(constraintConfig.maxCoLocationCnts, constraintConfig.incompatibleComponents, compToExecs).stream()
+                .filter(unassignedExecutors::contains)
+                .collect(Collectors.toList());
 
         //populate with existing assignments
         SchedulerAssignment existingAssignment = cluster.getAssignmentById(td.getId());
@@ -280,10 +371,11 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
             existingAssignment.getExecutorToSlot().forEach((exec, ws) -> {
                 String compId = execToComp.get(exec);
                 RasNode node = nodes.get(ws.getNodeId());
-                //populate node to component Assignments
-                nodeCompAssignment.computeIfAbsent(node, (k) -> new HashSet<>()).add(compId);
+                Map<String, Integer> oneMap = nodeCompAssignment.computeIfAbsent(node, (k) -> new HashMap<>());
+                oneMap.put(compId, oneMap.getOrDefault(compId, 0) + 1); // increment
                 //populate worker to comp assignments
-                workerCompAssignment.computeIfAbsent(ws, (k) -> new HashSet<>()).add(compId);
+                oneMap = workerCompAssignment.computeIfAbsent(ws, (k) -> new HashMap<>());
+                oneMap.put(compId, oneMap.getOrDefault(compId, 0) + 1); // increment
             });
         }
 
@@ -297,11 +389,13 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
     }
 
     private boolean checkSchedulingFeasibility(int maxStateSearch) {
-        for (String comp : spreadComps) {
+        for (Map.Entry<String, Integer> entry : constraintConfig.maxCoLocationCnts.entrySet()) {
+            String comp = entry.getKey();
+            int maxCoLocationCnt = entry.getValue();
             int numExecs = compToExecs.get(comp).size();
-            if (numExecs > nodes.size()) {
+            if (numExecs > nodes.size() * maxCoLocationCnt) {
                 LOG.error("Unsatisfiable constraint: Component: {} marked as spread has {} executors which is larger "
-                          + "than number of nodes: {}", comp, numExecs, nodes.size());
+                          + "than number of nodes * maxCoLocationCnt: {} * {} ", comp, numExecs, nodes.size(), maxCoLocationCnt);
                 return false;
             }
         }
@@ -345,7 +439,7 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
         for (int loopCnt = 0 ; true ; loopCnt++) {
             LOG.debug("backtrackSearch: loopCnt = {}, state.execIndex = {}", loopCnt, state.execIndex);
             if (state.areSearchLimitsExceeded()) {
-                LOG.warn("backtrackSearch: Search limits exceeded");
+                LOG.warn("backtrackSearch: Search limits exceeded, backtracked {} times, looped {} times", state.numBacktrack, loopCnt);
                 return new SolverResult(state, false);
             }
 
@@ -356,6 +450,7 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
             int execIndex = state.execIndex;
 
             ExecutorDetails exec = state.currentExec();
+            String comp = execToComp.get(exec);
             Iterable<String> sortedNodesIter = sortAllNodes(state.td, exec, favoredNodeIds, unFavoredNodeIds);
 
             int progressIdx = -1;
@@ -367,8 +462,8 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
                         continue;
                     }
                     progressIdxForExec[execIndex]++;
-                    LOG.debug("backtrackSearch: loopCnt = {}, state.execIndex = {}, node/slot-ordinal = {}, nodeId = {}",
-                        loopCnt, execIndex, progressIdx, nodeId);
+                    LOG.debug("backtrackSearch: loopCnt = {}, state.execIndex = {}, comp = {}, node/slot-ordinal = {}, nodeId = {}",
+                            loopCnt, execIndex, comp, progressIdx, nodeId);
 
                     if (!isExecAssignmentToWorkerValid(workerSlot, state)) {
                         continue;
@@ -378,20 +473,21 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
                     state.tryToSchedule(execToComp, node, workerSlot);
                     if (state.areAllExecsScheduled()) {
                         //Everything is scheduled correctly, so no need to search any more.
-                        LOG.info("backtrackSearch: AllExecsScheduled at loopCnt = {} in {} milliseconds, elapsedtime in state={}",
-                            loopCnt, System.currentTimeMillis() - startTimeMilli, Time.currentTimeMillis() - state.startTimeMillis);
+                        LOG.info("backtrackSearch: AllExecsScheduled at loopCnt={} in {} ms, elapsedtime in state={}, backtrackCnt={}",
+                                loopCnt, System.currentTimeMillis() - startTimeMilli, Time.currentTimeMillis() - state.startTimeMillis,
+                                state.numBacktrack);
                         return new SolverResult(state, true);
                     }
                     state = state.nextExecutor();
                     nodeForExec[execIndex] = node;
                     workerSlotForExec[execIndex] = workerSlot;
-                    LOG.debug("backtrackSearch: Assigned execId={} to node={}, node/slot-ordinal={} at loopCnt={}",
-                        execIndex, nodeId, progressIdx, loopCnt);
+                    LOG.debug("backtrackSearch: Assigned execId={}, comp={} to node={}, node/slot-ordinal={} at loopCnt={}",
+                            execIndex, comp, nodeId, progressIdx, loopCnt);
                     continue OUTERMOST_LOOP;
                 }
             }
             // if here, then the executor was not assigned, backtrack;
-            LOG.debug("backtrackSearch: Failed to schedule execId = {} at loopCnt = {}", execIndex, loopCnt);
+            LOG.debug("backtrackSearch: Failed to schedule execId={}, comp={} at loopCnt={}", execIndex, comp, loopCnt);
             if (execIndex == 0) {
                 break;
             } else {
@@ -400,8 +496,8 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
             }
         }
         boolean success = state.areAllExecsScheduled();
-        LOG.info("backtrackSearch: Scheduled={} in {} milliseconds, elapsedtime in state={}",
-            success, System.currentTimeMillis() - startTimeMilli, Time.currentTimeMillis() - state.startTimeMillis);
+        LOG.info("backtrackSearch: Scheduled={} in {} milliseconds, elapsedtime in state={}, backtrackCnt={}",
+            success, System.currentTimeMillis() - startTimeMilli, Time.currentTimeMillis() - state.startTimeMillis, state.numBacktrack);
         return new SolverResult(state, success);
     }
 
@@ -420,11 +516,11 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
 
         //check if exec can be on worker based on user defined component exclusions
         String execComp = execToComp.get(exec);
-        Set<String> components = state.workerCompAssignment.get(worker);
-        if (components != null) {
-            Map<String, Integer> subMatrix = constraintMatrix.get(execComp);
-            for (String comp : components) {
-                if (subMatrix.get(comp) != 0) {
+        Map<String, Integer> compAssignmentCnts = state.workerCompAssignmentCnts.get(worker);
+        if (compAssignmentCnts != null && constraintConfig.incompatibleComponents.containsKey(execComp)) {
+            Set<String> subMatrix = constraintConfig.incompatibleComponents.get(execComp);
+            for (String comp : compAssignmentCnts.keySet()) {
+                if (subMatrix.contains(comp)) {
                     LOG.trace("{} found {} constraint violation {} on {}", exec, execComp, comp, worker);
                     return false;
                 }
@@ -432,9 +528,12 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
         }
 
         //check if exec satisfy spread
-        if (spreadComps.contains(execComp)) {
-            if (state.nodeCompAssignment.computeIfAbsent(node, (k) -> new HashSet<>()).contains(execComp)) {
-                LOG.trace("{} Found spread violation {} on node {}", exec, execComp, node.getId());
+        if (constraintConfig.maxCoLocationCnts.containsKey(execComp)) {
+            int coLocationMaxCnt = constraintConfig.maxCoLocationCnts.get(execComp);
+            if (state.nodeCompAssignmentCnts.containsKey(node)
+                    && state.nodeCompAssignmentCnts.get(node).getOrDefault(execComp, 0) >= coLocationMaxCnt) {
+                LOG.trace("{} Found MaxCoLocationCnt violation {} on node {}, count {} >= colocation count {}",
+                        exec, execComp, node.getId(), state.nodeCompAssignmentCnts.get(node).get(execComp), coLocationMaxCnt);
                 return false;
             }
         }
@@ -447,22 +546,24 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
         return retMap;
     }
 
-    private ArrayList<ExecutorDetails> getSortedExecs(HashSet<String> spreadComps, Map<String, Map<String, Integer>> constraintMatrix,
+    private ArrayList<ExecutorDetails> getSortedExecs(Map<String, Integer> spreadCompCnts,
+                                                      Map<String, Set<String>> constraintMatrix,
                                                       Map<String, Set<ExecutorDetails>> compToExecs) {
         ArrayList<ExecutorDetails> retList = new ArrayList<>();
         //find number of constraints per component
         //Key->Comp Value-># of constraints
-        Map<String, Integer> compConstraintCountMap = new HashMap<>();
+        Map<String, Double> compConstraintCountMap = new HashMap<>();
         constraintMatrix.forEach((comp, subMatrix) -> {
-            int count = subMatrix.values().stream().mapToInt(Number::intValue).sum();
-            //check component is declared for spreading
-            if (spreadComps.contains(comp)) {
-                count++;
+            double count = subMatrix.size();
+            // check if component is declared for spreading
+            if (spreadCompCnts.containsKey(comp)) {
+                // lower (1 and above only) value is most constrained should have higher count
+                count += (compToExecs.size() / spreadCompCnts.get(comp));
             }
-            compConstraintCountMap.put(comp, count);
+            compConstraintCountMap.put(comp, count); // higher count sorts to the front
         });
         //Sort comps by number of constraints
-        NavigableMap<String, Integer> sortedCompConstraintCountMap = sortByValues(compConstraintCountMap);
+        NavigableMap<String, Double> sortedCompConstraintCountMap = sortByValues(compConstraintCountMap);
         //sort executors based on component constraints
         for (String comp : sortedCompConstraintCountMap.keySet()) {
             retList.addAll(compToExecs.get(comp));
@@ -471,7 +572,7 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
     }
 
     /**
-     * Used to sort a Map by the values.
+     * Used to sort a Map by the values - higher values up front.
      */
     @VisibleForTesting
     public <K extends Comparable<K>, V extends Comparable<V>> NavigableMap<K, V> sortByValues(final Map<K, V> map) {
@@ -488,13 +589,15 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
         return sortedByValues;
     }
 
-    protected static class SolverResult {
+    protected static final class SolverResult {
+        private final SearcherState state;
         private final int statesSearched;
         private final boolean success;
         private final long timeTakenMillis;
         private final int backtracked;
 
         public SolverResult(SearcherState state, boolean success) {
+            this.state = state;
             this.statesSearched = state.getStatesSearched();
             this.success = success;
             timeTakenMillis = Time.currentTimeMillis() - state.startTimeMillis;
@@ -506,20 +609,21 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
                 return SchedulingResult.success("Fully Scheduled by ConstraintSolverStrategy (" + statesSearched
                                                 + " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)");
             }
+            state.logNodeCompAssignments();
             return SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES,
                                             "Cannot find scheduling that satisfies all constraints (" + statesSearched
                                             + " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)");
         }
     }
 
-    protected static class SearcherState {
+    protected static final class SearcherState {
         final long startTimeMillis;
         private final long maxEndTimeMs;
         // A map of the worker to the components in the worker to be able to enforce constraints.
-        private final Map<WorkerSlot, Set<String>> workerCompAssignment;
+        private final Map<WorkerSlot, Map<String, Integer>> workerCompAssignmentCnts;
         private final boolean[] okToRemoveFromWorker;
         // for the currently tested assignment a Map of the node to the components on it to be able to enforce constraints
-        private final Map<RasNode, Set<String>> nodeCompAssignment;
+        private final Map<RasNode, Map<String, Integer>> nodeCompAssignmentCnts;
         private final boolean[] okToRemoveFromNode;
         // Static State
         // The list of all executors (preferably sorted to make assignments simpler).
@@ -537,13 +641,14 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
         // The current executor we are trying to schedule
         private int execIndex = 0;
 
-        private SearcherState(Map<WorkerSlot, Set<String>> workerCompAssignment, Map<RasNode, Set<String>> nodeCompAssignment,
-                              int maxStatesSearched, long maxTimeMs, List<ExecutorDetails> execs, TopologyDetails td) {
+        private SearcherState(Map<WorkerSlot, Map<String, Integer>> workerCompAssignmentCnts,
+                              Map<RasNode, Map<String, Integer>> nodeCompAssignmentCnts, int maxStatesSearched, long maxTimeMs,
+                              List<ExecutorDetails> execs, TopologyDetails td) {
             assert !execs.isEmpty();
             assert execs != null;
 
-            this.workerCompAssignment = workerCompAssignment;
-            this.nodeCompAssignment = nodeCompAssignment;
+            this.workerCompAssignmentCnts = workerCompAssignmentCnts;
+            this.nodeCompAssignmentCnts = nodeCompAssignmentCnts;
             this.maxStatesSearched = maxStatesSearched;
             this.execs = execs;
             okToRemoveFromWorker = new boolean[execs.size()];
@@ -593,13 +698,26 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
             return execs.get(execIndex);
         }
 
+        /**
+          * Assign executor to worker and node.
+          * TODO: tryToSchedule is a misnomer, since it always schedules.
+          * Assignment validity check is done before the call to tryToSchedule().
+          *
+          * @param execToComp Mapping from executor to component name.
+          * @param node RasNode on which to schedule.
+          * @param workerSlot WorkerSlot on which to schedule.
+          */
         public void tryToSchedule(Map<ExecutorDetails, String> execToComp, RasNode node, WorkerSlot workerSlot) {
             ExecutorDetails exec = currentExec();
             String comp = execToComp.get(exec);
             LOG.trace("Trying assignment of {} {} to {}", exec, comp, workerSlot);
-            //It is possible that this component is already scheduled on this node or worker.  If so when we backtrack we cannot remove it
-            okToRemoveFromWorker[execIndex] = workerCompAssignment.computeIfAbsent(workerSlot, (k) -> new HashSet<>()).add(comp);
-            okToRemoveFromNode[execIndex] = nodeCompAssignment.computeIfAbsent(node, (k) -> new HashSet<>()).add(comp);
+            // It is possible that this component is already scheduled on this node or worker.  If so when we backtrack we cannot remove it
+            Map<String, Integer> oneMap = workerCompAssignmentCnts.computeIfAbsent(workerSlot, (k) -> new HashMap<>());
+            oneMap.put(comp, oneMap.getOrDefault(comp, 0) + 1); // increment assignment count
+            okToRemoveFromWorker[execIndex] = true;
+            oneMap = nodeCompAssignmentCnts.computeIfAbsent(node, (k) -> new HashMap<>());
+            oneMap.put(comp, oneMap.getOrDefault(comp, 0) + 1); // increment assignment count
+            okToRemoveFromNode[execIndex] = true;
             node.assignSingleExecutor(workerSlot, exec, td);
         }
 
@@ -613,14 +731,51 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
             String comp = execToComp.get(exec);
             LOG.trace("Backtracking {} {} from {}", exec, comp, workerSlot);
             if (okToRemoveFromWorker[execIndex]) {
-                workerCompAssignment.get(workerSlot).remove(comp);
+                Map<String, Integer> oneMap = workerCompAssignmentCnts.get(workerSlot);
+                oneMap.put(comp, oneMap.getOrDefault(comp, 0) - 1); // decrement assignment count
                 okToRemoveFromWorker[execIndex] = false;
             }
             if (okToRemoveFromNode[execIndex]) {
-                nodeCompAssignment.get(node).remove(comp);
+                Map<String, Integer> oneMap = nodeCompAssignmentCnts.get(node);
+                oneMap.put(comp, oneMap.getOrDefault(comp, 0) - 1); // decrement assignment count
                 okToRemoveFromNode[execIndex] = false;
             }
             node.freeSingleExecutor(exec, td);
         }
+
+        /**
+         * Use this method to log the current component assignments on the Node.
+         * Useful for debugging and tests.
+         */
+        public void logNodeCompAssignments() {
+            if (nodeCompAssignmentCnts == null || nodeCompAssignmentCnts.isEmpty()) {
+                LOG.info("NodeCompAssignment is empty");
+                return;
+            }
+            StringBuffer sb = new StringBuffer();
+            int cntAllNodes = 0;
+            int cntFilledNodes = 0;
+            for (RasNode node: new TreeSet<>(nodeCompAssignmentCnts.keySet())) {
+                cntAllNodes++;
+                Map<String, Integer> oneMap = nodeCompAssignmentCnts.get(node);
+                if (oneMap.isEmpty()) {
+                    continue;
+                }
+                cntFilledNodes++;
+                String oneMapJoined = String.join(
+                        ",",
+                        oneMap.entrySet()
+                                .stream().map(e -> String.format("%s: %s", e.getKey(), e.getValue()))
+                                .collect(Collectors.toList())
+                );
+                sb.append(String.format("\n\t(%d) Node %s: %s", cntFilledNodes, node.getId(), oneMapJoined));
+            }
+            LOG.info("NodeCompAssignments available for {} of {} nodes {}", cntFilledNodes, cntAllNodes, sb);
+            LOG.info("Executors assignments attempted (cnt={}) are: \n\t{}",
+                    execs.size(), execs.stream().map(x -> x.toString()).collect(Collectors.joining(","))
+            );
+        }
     }
 }
+
+
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
index f245d90..65d230d 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
@@ -18,6 +18,8 @@
 
 package org.apache.storm.scheduler.resource.strategies.scheduling;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.NavigableMap;
 import java.util.Set;
@@ -33,13 +35,15 @@ import org.apache.storm.scheduler.TopologyDetails;
 import org.apache.storm.scheduler.WorkerSlot;
 import org.apache.storm.scheduler.resource.ResourceAwareScheduler;
 import org.apache.storm.scheduler.resource.SchedulingResult;
-import org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy;
 import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Utils;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
 import org.junit.Assert;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,27 +58,70 @@ import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareSched
 import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.scheduler.resource.normalization.ResourceMetrics;
 
+@RunWith(Parameterized.class)
 public class TestConstraintSolverStrategy {
+    @Parameters
+    public static Object[] data() {
+        return new Object[] { false, true };
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(TestConstraintSolverStrategy.class);
     private static final int MAX_TRAVERSAL_DEPTH = 2000;
     private static final int NORMAL_BOLT_PARALLEL = 11;
     //Dropping the parallelism of the bolts to 3 instead of 11 so we can find a solution in a reasonable amount of work when backtracking.
     private static final int BACKTRACK_BOLT_PARALLEL = 3;
+    private static final int CO_LOCATION_CNT = 2;
 
-    public Map<String, Object> makeTestTopoConf() {
+    // class members
+    public Boolean consolidatedConfigFlag = Boolean.TRUE;
+
+    public TestConstraintSolverStrategy(boolean consolidatedConfigFlag) {
+        this.consolidatedConfigFlag = consolidatedConfigFlag;
+        LOG.info("Running tests with consolidatedConfigFlag={}", consolidatedConfigFlag);
+    }
+
+    /**
+     * Helper function to add a constraint specifying two components that cannot co-exist.
+     * Note that it is redundant to specify the converse.
+     *
+     * @param comp1 first component name
+     * @param comp2 second component name
+     * @param constraints the resulting constraint list of lists which is updated
+     */
+    public static void addConstraints(String comp1, String comp2, List<List<String>> constraints) {
+        LinkedList<String> constraintPair = new LinkedList<>();
+        constraintPair.add(comp1);
+        constraintPair.add(comp2);
+        constraints.add(constraintPair);
+    }
+
+    /**
+     * Make test Topology configuration, but with the newer spread constraints that allow associating a number
+     * with the spread. This number represents the maximum co-located component count. Default under the old
+     * configuration is assumed to be 1.
+     *
+     * @param maxCoLocationCnt Maximum co-located component (spout-0), minimum value is 1.
+     * @return
+     */
+    public Map<String, Object> makeTestTopoConf(int maxCoLocationCnt) {
+        if (maxCoLocationCnt < 1) {
+            maxCoLocationCnt = 1;
+        }
         List<List<String>> constraints = new LinkedList<>();
-        addContraints("spout-0", "bolt-0", constraints);
-        addContraints("bolt-2", "spout-0", constraints);
-        addContraints("bolt-1", "bolt-2", constraints);
-        addContraints("bolt-1", "bolt-0", constraints);
-        addContraints("bolt-1", "spout-0", constraints);
-        List<String> spread = new LinkedList<>();
-        spread.add("spout-0");
+        addConstraints("spout-0", "bolt-0", constraints);
+        addConstraints("bolt-2", "spout-0", constraints);
+        addConstraints("bolt-1", "bolt-2", constraints);
+        addConstraints("bolt-1", "bolt-0", constraints);
+        addConstraints("bolt-1", "spout-0", constraints);
+
+        Map<String, Integer> spreads = new HashMap<>();
+        spreads.put("spout-0", maxCoLocationCnt);
 
         Map<String, Object> config = Utils.readDefaultConfig();
+
+        setConstraintConfig(constraints, spreads, config);
+
         config.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_STATE_SEARCH, MAX_TRAVERSAL_DEPTH);
-        config.put(Config.TOPOLOGY_SPREAD_COMPONENTS, spread);
-        config.put(Config.TOPOLOGY_RAS_CONSTRAINTS, constraints);
         config.put(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH, MAX_TRAVERSAL_DEPTH);
         config.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 100_000);
         config.put(Config.TOPOLOGY_PRIORITY, 1);
@@ -85,6 +132,54 @@ public class TestConstraintSolverStrategy {
         return config;
     }
 
+    /**
+     * Set Config.TOPOLOGY_RAS_CONSTRAINTS (when consolidatedConfigFlag is true) or both
+     * Config.TOPOLOGY_RAS_CONSTRAINTS/Config.TOPOLOGY_SPREAD_COMPONENTS (when consolidatedConfigFlag is false).
+     *
+     * When consolidatedConfigFlag is true, use the new more consolidated format to set Config.TOPOLOGY_RAS_CONSTRAINTS.
+     * When false, use the old configuration format for Config.TOPOLOGY_RAS_CONSTRAINTS/TOPOLOGY_SPREAD_COMPONENTS.
+     *
+     * @param constraints List of components, where the first one cannot co-exist with the others in the list
+     * @param spreads Map of component and its maxCoLocationCnt
+     * @param config Configuration to be updated
+     */
+    private void setConstraintConfig(List<List<String>> constraints, Map<String, Integer> spreads, Map<String, Object> config) {
+        if (consolidatedConfigFlag) {
+            // single configuration for each component
+            Map<String, Map<String,Object>> modifiedConstraints = new HashMap<>();
+            for (List<String> constraint: constraints) {
+                if (constraint.size() < 2) {
+                    continue;
+                }
+                String comp = constraint.get(0);
+                List<String> others = constraint.subList(1, constraint.size());
+                List<Object> incompatibleComponents = (List<Object>) modifiedConstraints.computeIfAbsent(comp, k -> new HashMap<>())
+                        .computeIfAbsent(ConstraintSolverStrategy.CONSTRAINT_TYPE_INCOMPATIBLE_COMPONENTS, k -> new ArrayList<>());
+                incompatibleComponents.addAll(others);
+            }
+            for (String comp: spreads.keySet()) {
+                modifiedConstraints.computeIfAbsent(comp, k -> new HashMap<>()).put(ConstraintSolverStrategy.CONSTRAINT_TYPE_MAX_NODE_CO_LOCATION_CNT, "" + spreads.get(comp));
+            }
+            config.put(Config.TOPOLOGY_RAS_CONSTRAINTS, modifiedConstraints);
+        } else {
+            // constraint and MaxCoLocationCnts are separate - no maxCoLocationCnt implied as 1
+            config.put(Config.TOPOLOGY_RAS_CONSTRAINTS, constraints);
+            for (Map.Entry<String, Integer> e: spreads.entrySet()) {
+                if (e.getValue() > 1) {
+                    Assert.fail(String.format("Invalid %s=%d for component=%s, expecting 1 for old-style configuration",
+                            ConstraintSolverStrategy.CONSTRAINT_TYPE_MAX_NODE_CO_LOCATION_CNT,
+                            e.getValue(),
+                            e.getKey()));
+                }
+            }
+            config.put(Config.TOPOLOGY_SPREAD_COMPONENTS, new ArrayList(spreads.keySet()));
+        }
+    }
+
+    public Map<String, Object> makeTestTopoConf() {
+        return makeTestTopoConf(1);
+    }
+
     public TopologyDetails makeTopology(Map<String, Object> config, int boltParallel) {
         return genTopology("testTopo", config, 1, 4, 4, boltParallel, 0, 0, "user");
     }
@@ -101,8 +196,8 @@ public class TestConstraintSolverStrategy {
         return new Cluster(new INimbusTest(), new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
     }
 
-    public void basicUnitTestWithKillAndRecover(ConstraintSolverStrategy cs, int boltParallel) {
-        Map<String, Object> config = makeTestTopoConf();
+    public void basicUnitTestWithKillAndRecover(ConstraintSolverStrategy cs, int boltParallel, int coLocationCnt) {
+        Map<String, Object> config = makeTestTopoConf(coLocationCnt);
         cs.prepare(config);
 
         TopologyDetails topo = makeTopology(config, boltParallel);
@@ -114,9 +209,9 @@ public class TestConstraintSolverStrategy {
         LOG.info("Done scheduling {}...", result);
 
         Assert.assertTrue("Assert scheduling topology success " + result, result.isSuccess());
-        Assert.assertEquals("topo all executors scheduled? " + cluster.getUnassignedExecutors(topo),
+        Assert.assertEquals("Assert no unassigned executors, found unassigned: " + cluster.getUnassignedExecutors(topo),
             0, cluster.getUnassignedExecutors(topo).size());
-        Assert.assertTrue("Valid Scheduling?", ConstraintSolverStrategy.validateSolution(cluster, topo));
+        Assert.assertTrue("Valid Scheduling?", ConstraintSolverStrategy.validateSolution(cluster, topo, null));
         LOG.info("Slots Used {}", cluster.getAssignmentById(topo.getId()).getSlots());
         LOG.info("Assignment {}", cluster.getAssignmentById(topo.getId()).getSlotToExecutors());
 
@@ -146,25 +241,84 @@ public class TestConstraintSolverStrategy {
 
         Assert.assertTrue("Assert scheduling topology success " + result, result.isSuccess());
         Assert.assertEquals("topo all executors scheduled?", 0, cluster.getUnassignedExecutors(topo).size());
-        Assert.assertTrue("Valid Scheduling?", ConstraintSolverStrategy.validateSolution(cluster, topo));
+        Assert.assertTrue("Valid Scheduling?", ConstraintSolverStrategy.validateSolution(cluster, topo, null));
+    }
+
+    @Test
+    public void testNewConstraintFormat() {
+        String s = String.format(
+                "{ \"comp-1\": "
+                        + "                  { \"%s\": 2, "
+                        + "                    \"%s\": [\"comp-2\", \"comp-3\" ] }, "
+                        + "     \"comp-2\": "
+                        + "                  { \"%s\": [ \"comp-4\" ] }, "
+                        + "     \"comp-3\": "
+                        + "                  { \"%s\": \"comp-5\" } "
+                        + "}",
+                ConstraintSolverStrategy.CONSTRAINT_TYPE_MAX_NODE_CO_LOCATION_CNT,
+                ConstraintSolverStrategy.CONSTRAINT_TYPE_INCOMPATIBLE_COMPONENTS,
+                ConstraintSolverStrategy.CONSTRAINT_TYPE_INCOMPATIBLE_COMPONENTS,
+                ConstraintSolverStrategy.CONSTRAINT_TYPE_INCOMPATIBLE_COMPONENTS
+        );
+        Object jsonValue = JSONValue.parse(s);
+        Map<String, Object> config = Utils.readDefaultConfig();
+        config.put(Config.TOPOLOGY_RAS_CONSTRAINTS, jsonValue);
+        Set<String> allComps = new HashSet<>();
+        allComps.addAll(Arrays.asList("comp-1", "comp-2", "comp-3", "comp-4", "comp-5"));
+        ConstraintSolverStrategy.ConstraintConfig constraintConfig = new ConstraintSolverStrategy.ConstraintConfig(config, allComps);
+
+        Set<String> expectedSetComp1 = new HashSet<>();
+        expectedSetComp1.addAll(Arrays.asList("comp-2", "comp-3"));
+        Set<String> expectedSetComp2 = new HashSet<>();
+        expectedSetComp2.addAll(Arrays.asList("comp-1", "comp-4"));
+        Set<String> expectedSetComp3 = new HashSet<>();
+        expectedSetComp3.addAll(Arrays.asList("comp-1", "comp-5"));
+        Assert.assertEquals("comp-1 incompatible components", expectedSetComp1, constraintConfig.getIncompatibleComponents().get("comp-1"));
+        Assert.assertEquals("comp-2 incompatible components", expectedSetComp2, constraintConfig.getIncompatibleComponents().get("comp-2"));
+        Assert.assertEquals("comp-3 incompatible components", expectedSetComp3, constraintConfig.getIncompatibleComponents().get("comp-3"));
+        Assert.assertEquals("comp-1 maxNodeCoLocationCnt", 2, (int) constraintConfig.getMaxCoLocationCnts().getOrDefault("comp-1", -1));
+        Assert.assertNull("comp-2 maxNodeCoLocationCnt", constraintConfig.getMaxCoLocationCnts().get("comp-2"));
     }
 
     @Test
-    public void testConstraintSolverForceBacktrack() {
+    public void testConstraintSolverForceBacktrackWithSpreadCoLocation() {
         //The best way to force backtracking is to change the heuristic so the components are reversed, so it is hard
         // to find an answer.
+        if (CO_LOCATION_CNT > 1 && !consolidatedConfigFlag) {
+            LOG.info("INFO: Skipping Test {} with {}={} (required 1), and consolidatedConfigFlag={} (required false)",
+                    "testConstraintSolverForceBacktrackWithSpreadCoLocation",
+                    ConstraintSolverStrategy.CONSTRAINT_TYPE_MAX_NODE_CO_LOCATION_CNT,
+                    CO_LOCATION_CNT,
+                    consolidatedConfigFlag);
+            return;
+        }
+
         ConstraintSolverStrategy cs = new ConstraintSolverStrategy() {
             @Override
             public <K extends Comparable<K>, V extends Comparable<V>> NavigableMap<K, V> sortByValues(final Map<K, V> map) {
                 return super.sortByValues(map).descendingMap();
             }
         };
-        basicUnitTestWithKillAndRecover(cs, BACKTRACK_BOLT_PARALLEL);
+        basicUnitTestWithKillAndRecover(cs, BACKTRACK_BOLT_PARALLEL, CO_LOCATION_CNT);
     }
 
     @Test
     public void testConstraintSolver() {
-        basicUnitTestWithKillAndRecover(new ConstraintSolverStrategy(), NORMAL_BOLT_PARALLEL);
+        basicUnitTestWithKillAndRecover(new ConstraintSolverStrategy(), NORMAL_BOLT_PARALLEL, 1);
+    }
+
+    @Test
+    public void testConstraintSolverWithSpreadCoLocation() {
+        if (CO_LOCATION_CNT > 1 && !consolidatedConfigFlag) {
+            LOG.info("INFO: Skipping Test {} with {}={} (required 1), and consolidatedConfigFlag={} (required false)",
+                    "testConstraintSolverWithSpreadCoLocation",
+                    ConstraintSolverStrategy.CONSTRAINT_TYPE_MAX_NODE_CO_LOCATION_CNT,
+                    CO_LOCATION_CNT,
+                    consolidatedConfigFlag);
+            return;
+        }
+
+        basicUnitTestWithKillAndRecover(new ConstraintSolverStrategy(), NORMAL_BOLT_PARALLEL, CO_LOCATION_CNT);
     }
 
     public void basicFailureTest(String confKey, Object confValue, ConstraintSolverStrategy cs) {
@@ -204,14 +358,30 @@ public class TestConstraintSolverStrategy {
         }
     }
 
+    @Test
+    public void testScheduleLargeExecutorConstraintCountSmall() {
+        testScheduleLargeExecutorConstraintCount(1);
+    }
+
     /*
      * Test scheduling large number of executors and constraints.
+     * This test can succeed only with new style config that allows maxCoLocationCnt = parallelismMultiplier.
+     * In prior code, this test would succeed because effectively the old code did not properly enforce the
+     * SPREAD constraint.
      *
      * Cluster has sufficient resources for scheduling to succeed but can fail due to StackOverflowError.
      */
-    @ParameterizedTest
-    @ValueSource(ints = {1, 20})
-    public void testScheduleLargeExecutorConstraintCount(int parallelismMultiplier) {
+    @Test
+    public void testScheduleLargeExecutorConstraintCountLarge() {
+        testScheduleLargeExecutorConstraintCount(20);
+    }
+
+    private void testScheduleLargeExecutorConstraintCount(int parallelismMultiplier) {
+        if (parallelismMultiplier > 1 && !consolidatedConfigFlag) {
+            Assert.assertFalse("Large parallelism test requires new consolidated constraint format with maxCoLocationCnt=" + parallelismMultiplier, consolidatedConfigFlag);
+            return;
+        }
+
         // Add 1 topology with large number of executors and constraints. Too many can cause a java.lang.StackOverflowError
         Config config = createCSSClusterConfig(10, 10, 0, null);
         config.put(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH, 50000);
@@ -219,15 +389,20 @@ public class TestConstraintSolverStrategy {
         config.put(DaemonConfig.SCHEDULING_TIMEOUT_SECONDS_PER_TOPOLOGY, 120);
 
         List<List<String>> constraints = new LinkedList<>();
-        addContraints("spout-0", "spout-0", constraints);
-        addContraints("bolt-1", "bolt-1", constraints);
-        addContraints("spout-0", "bolt-0", constraints);
-        addContraints("bolt-2", "spout-0", constraints);
-        addContraints("bolt-1", "bolt-2", constraints);
-        addContraints("bolt-1", "bolt-0", constraints);
-        addContraints("bolt-1", "spout-0", constraints);
-
-        config.put(Config.TOPOLOGY_RAS_CONSTRAINTS, constraints);
+        addConstraints("spout-0", "spout-0", constraints);
+        addConstraints("bolt-1", "bolt-1", constraints);
+        addConstraints("spout-0", "bolt-0", constraints);
+        addConstraints("bolt-2", "spout-0", constraints);
+        addConstraints("bolt-1", "bolt-2", constraints);
+        addConstraints("bolt-1", "bolt-0", constraints);
+        addConstraints("bolt-1", "spout-0", constraints);
+
+        Map<String, Integer> spreads = new HashMap<>();
+        spreads.put("spout-0", parallelismMultiplier);
+        spreads.put("bolt-1", parallelismMultiplier);
+
+        setConstraintConfig(constraints, spreads, config);
+
         TopologyDetails topo = genTopology("testTopo-" + parallelismMultiplier, config, 10, 10, 30 * parallelismMultiplier, 30 * parallelismMultiplier, 31414, 0, "user");
         Topologies topologies = new Topologies(topo);
 
@@ -239,24 +414,20 @@ public class TestConstraintSolverStrategy {
         scheduler.schedule(topologies, cluster);
 
         boolean scheduleSuccess = isStatusSuccess(cluster.getStatus(topo.getId()));
-        LOG.info("testScheduleLargeExecutorCount scheduling {} with {}x executor multiplier", scheduleSuccess ? "succeeds" : "fails",
-                parallelismMultiplier);
+        LOG.info("testScheduleLargeExecutorCount scheduling {} with {}x executor multiplier, consolidatedConfigFlag={}",
+                scheduleSuccess ? "succeeds" : "fails", parallelismMultiplier, consolidatedConfigFlag);
         Assert.assertTrue(scheduleSuccess);
     }
 
     @Test
     public void testIntegrationWithRAS() {
-        List<List<String>> constraints = new LinkedList<>();
-        addContraints("spout-0", "bolt-0", constraints);
-        addContraints("bolt-1", "bolt-1", constraints);
-        addContraints("bolt-1", "bolt-2", constraints);
-        List<String> spread = new LinkedList<>();
-        spread.add("spout-0");
+        if (!consolidatedConfigFlag) {
+            LOG.info("Skipping test since bolt-1 maxCoLocationCnt=10 requires consolidatedConfigFlag=true, current={}", consolidatedConfigFlag);
+            return;
+        }
 
         Map<String, Object> config = Utils.readDefaultConfig();
         config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, ConstraintSolverStrategy.class.getName());
-        config.put(Config.TOPOLOGY_SPREAD_COMPONENTS, spread);
-        config.put(Config.TOPOLOGY_RAS_CONSTRAINTS, constraints);
         config.put(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH, MAX_TRAVERSAL_DEPTH);
         config.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 100_000);
         config.put(Config.TOPOLOGY_PRIORITY, 1);
@@ -264,17 +435,28 @@ public class TestConstraintSolverStrategy {
         config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 100);
         config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 0.0);
 
+        List<List<String>> constraints = new LinkedList<>();
+        addConstraints("spout-0", "bolt-0", constraints);
+        addConstraints("bolt-1", "bolt-1", constraints);
+        addConstraints("bolt-1", "bolt-2", constraints);
+
+        Map<String, Integer> spreads = new HashMap<String, Integer>();
+        spreads.put("spout-0", 1);
+        spreads.put("bolt-1", 10);
+
+        setConstraintConfig(constraints, spreads, config);
+
         TopologyDetails topo = genTopology("testTopo", config, 2, 3, 30, 300, 0, 0, "user");
         Map<String, TopologyDetails> topoMap = new HashMap<>();
         topoMap.put(topo.getId(), topo);
         Topologies topologies = new Topologies(topoMap);
-        Map<String, SupervisorDetails> supMap = genSupervisors(30, 16, 400, 1024 * 4);
+        // Fails with 36 supervisors, works with 37
+        Map<String, SupervisorDetails> supMap = genSupervisors(37, 16, 400, 1024 * 4);
         Cluster cluster = makeCluster(topologies, supMap);
         ResourceAwareScheduler rs = new ResourceAwareScheduler();
         rs.prepare(config);
         try {
             rs.schedule(topologies, cluster);
-
             assertStatusSuccess(cluster, topo.getId());
             Assert.assertEquals("topo all executors scheduled?", 0, cluster.getUnassignedExecutors(topo).size());
         } finally {
@@ -293,22 +475,14 @@ public class TestConstraintSolverStrategy {
         Map<String, SchedulerAssignment> newAssignments = new HashMap<>();
         newAssignments.put(topo.getId(), new SchedulerAssignmentImpl(topo.getId(), newExecToSlot, null, null));
         cluster.setAssignments(newAssignments, false);
-        
+
         rs.prepare(config);
         try {
             rs.schedule(topologies, cluster);
-
             assertStatusSuccess(cluster, topo.getId());
             Assert.assertEquals("topo all executors scheduled?", 0, cluster.getUnassignedExecutors(topo).size());
         } finally {
             rs.cleanup();
         }
     }
-
-    public static void addContraints(String comp1, String comp2, List<List<String>> constraints) {
-        LinkedList<String> constraintPair = new LinkedList<>();
-        constraintPair.add(comp1);
-        constraintPair.add(comp2);
-        constraints.add(constraintPair);
-    }
 }