You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ki...@apache.org on 2020/04/15 21:10:47 UTC

[storm] branch master updated: [STORM-3600] Add caching in Cluster.calculateSharedOffHeapNodeMemory (#3244)

This is an automated email from the ASF dual-hosted git repository.

kishorvpatil pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ffc44d  [STORM-3600] Add caching in Cluster.calculateSharedOffHeapNodeMemory (#3244)
7ffc44d is described below

commit 7ffc44d083707bc60749fe6821f662fc730c6466
Author: Bipin Prasad <bi...@yahoo.com>
AuthorDate: Wed Apr 15 16:10:34 2020 -0500

    [STORM-3600] Add caching in Cluster.calculateSharedOffHeapNodeMemory (#3244)
    
    * [STORM-3600] Add caching in Cluster.calculateSharedOffHeapNodeMemory to yield 4 fourfold increase in speed when scheduling large cluster.
---
 .../java/org/apache/storm/scheduler/Cluster.java   |  65 ++-
 .../strategies/scheduling/TestLargeCluster.java    | 461 +++++++++++++++++++++
 .../scheduling/TestTopologyAnonymizerUtils.java    | 358 ++++++++++++++++
 .../largeCluster01/TopologyName00000-stormcode.ser | Bin 0 -> 2640 bytes
 .../largeCluster01/TopologyName00000-stormconf.ser | Bin 0 -> 7689 bytes
 .../largeCluster01/TopologyName00001-stormcode.ser | Bin 0 -> 13937 bytes
 .../largeCluster01/TopologyName00001-stormconf.ser | Bin 0 -> 7668 bytes
 .../largeCluster01/TopologyName00002-stormcode.ser | Bin 0 -> 4135 bytes
 .../largeCluster01/TopologyName00002-stormconf.ser | Bin 0 -> 7916 bytes
 .../largeCluster01/TopologyName00003-stormcode.ser | Bin 0 -> 3685 bytes
 .../largeCluster01/TopologyName00003-stormconf.ser | Bin 0 -> 7939 bytes
 .../largeCluster01/TopologyName00004-stormcode.ser | Bin 0 -> 14293 bytes
 .../largeCluster01/TopologyName00004-stormconf.ser | Bin 0 -> 7742 bytes
 .../largeCluster01/TopologyName00005-stormcode.ser | Bin 0 -> 26147 bytes
 .../largeCluster01/TopologyName00005-stormconf.ser | Bin 0 -> 6133 bytes
 .../largeCluster01/TopologyName00006-stormcode.ser | Bin 0 -> 26469 bytes
 .../largeCluster01/TopologyName00006-stormconf.ser | Bin 0 -> 6133 bytes
 .../largeCluster01/TopologyName00007-stormcode.ser | Bin 0 -> 48666 bytes
 .../largeCluster01/TopologyName00007-stormconf.ser | Bin 0 -> 6670 bytes
 .../largeCluster01/TopologyName00008-stormcode.ser | Bin 0 -> 18390 bytes
 .../largeCluster01/TopologyName00008-stormconf.ser | Bin 0 -> 6670 bytes
 .../largeCluster01/TopologyName00009-stormcode.ser | Bin 0 -> 3497 bytes
 .../largeCluster01/TopologyName00009-stormconf.ser | Bin 0 -> 6112 bytes
 .../largeCluster01/TopologyName00010-stormcode.ser | Bin 0 -> 3451 bytes
 .../largeCluster01/TopologyName00010-stormconf.ser | Bin 0 -> 6112 bytes
 .../largeCluster01/TopologyName00011-stormcode.ser | Bin 0 -> 47468 bytes
 .../largeCluster01/TopologyName00011-stormconf.ser | Bin 0 -> 8130 bytes
 .../largeCluster01/TopologyName00012-stormcode.ser | Bin 0 -> 47351 bytes
 .../largeCluster01/TopologyName00012-stormconf.ser | Bin 0 -> 8119 bytes
 .../largeCluster01/TopologyName00013-stormcode.ser | Bin 0 -> 25959 bytes
 .../largeCluster01/TopologyName00013-stormconf.ser | Bin 0 -> 7219 bytes
 .../largeCluster01/TopologyName00014-stormcode.ser | Bin 0 -> 26377 bytes
 .../largeCluster01/TopologyName00014-stormconf.ser | Bin 0 -> 5985 bytes
 .../largeCluster01/TopologyName00015-stormcode.ser | Bin 0 -> 2285 bytes
 .../largeCluster01/TopologyName00015-stormconf.ser | Bin 0 -> 5874 bytes
 .../largeCluster01/TopologyName00016-stormcode.ser | Bin 0 -> 13040 bytes
 .../largeCluster01/TopologyName00016-stormconf.ser | Bin 0 -> 5898 bytes
 .../largeCluster01/TopologyName00017-stormcode.ser | Bin 0 -> 2290 bytes
 .../largeCluster01/TopologyName00017-stormconf.ser | Bin 0 -> 5882 bytes
 .../largeCluster01/TopologyName00018-stormcode.ser | Bin 0 -> 2280 bytes
 .../largeCluster01/TopologyName00018-stormconf.ser | Bin 0 -> 5879 bytes
 .../largeCluster01/TopologyName00019-stormcode.ser | Bin 0 -> 2282 bytes
 .../largeCluster01/TopologyName00019-stormconf.ser | Bin 0 -> 5876 bytes
 .../largeCluster01/TopologyName00020-stormcode.ser | Bin 0 -> 2284 bytes
 .../largeCluster01/TopologyName00020-stormconf.ser | Bin 0 -> 5874 bytes
 .../largeCluster01/TopologyName00021-stormcode.ser | Bin 0 -> 2486 bytes
 .../largeCluster01/TopologyName00021-stormconf.ser | Bin 0 -> 5831 bytes
 .../largeCluster01/TopologyName00022-stormcode.ser | Bin 0 -> 3290 bytes
 .../largeCluster01/TopologyName00022-stormconf.ser | Bin 0 -> 6222 bytes
 .../largeCluster01/TopologyName00023-stormcode.ser | Bin 0 -> 57457 bytes
 .../largeCluster01/TopologyName00023-stormconf.ser | Bin 0 -> 5900 bytes
 .../largeCluster01/TopologyName00024-stormcode.ser | Bin 0 -> 44399 bytes
 .../largeCluster01/TopologyName00024-stormconf.ser | Bin 0 -> 5898 bytes
 .../largeCluster01/TopologyName00025-stormcode.ser | Bin 0 -> 7012 bytes
 .../largeCluster01/TopologyName00025-stormconf.ser | Bin 0 -> 6186 bytes
 .../largeCluster01/TopologyName00026-stormcode.ser | Bin 0 -> 4801 bytes
 .../largeCluster01/TopologyName00026-stormconf.ser | Bin 0 -> 6188 bytes
 .../largeCluster01/TopologyName00027-stormcode.ser | Bin 0 -> 27924 bytes
 .../largeCluster01/TopologyName00027-stormconf.ser | Bin 0 -> 5854 bytes
 .../largeCluster01/TopologyName00028-stormcode.ser | Bin 0 -> 172752 bytes
 .../largeCluster01/TopologyName00028-stormconf.ser | Bin 0 -> 5989 bytes
 .../largeCluster01/TopologyName00029-stormcode.ser | Bin 0 -> 829420 bytes
 .../largeCluster01/TopologyName00029-stormconf.ser | Bin 0 -> 6195 bytes
 .../largeCluster01/TopologyName00030-stormcode.ser | Bin 0 -> 1111850 bytes
 .../largeCluster01/TopologyName00030-stormconf.ser | Bin 0 -> 6193 bytes
 .../largeCluster01/TopologyName00031-stormcode.ser | Bin 0 -> 411929 bytes
 .../largeCluster01/TopologyName00031-stormconf.ser | Bin 0 -> 6232 bytes
 .../largeCluster01/TopologyName00032-stormcode.ser | Bin 0 -> 216295 bytes
 .../largeCluster01/TopologyName00032-stormconf.ser | Bin 0 -> 5983 bytes
 .../largeCluster01/TopologyName00033-stormcode.ser | Bin 0 -> 3581 bytes
 .../largeCluster01/TopologyName00033-stormconf.ser | Bin 0 -> 5759 bytes
 .../largeCluster01/TopologyName00034-stormcode.ser | Bin 0 -> 108593 bytes
 .../largeCluster01/TopologyName00034-stormconf.ser | Bin 0 -> 6112 bytes
 .../largeCluster01/TopologyName00035-stormcode.ser | Bin 0 -> 298502 bytes
 .../largeCluster01/TopologyName00035-stormconf.ser | Bin 0 -> 6167 bytes
 .../largeCluster01/TopologyName00036-stormcode.ser | Bin 0 -> 242606 bytes
 .../largeCluster01/TopologyName00036-stormconf.ser | Bin 0 -> 5982 bytes
 .../largeCluster01/TopologyName00037-stormcode.ser | Bin 0 -> 297184 bytes
 .../largeCluster01/TopologyName00037-stormconf.ser | Bin 0 -> 6158 bytes
 .../largeCluster01/TopologyName00038-stormcode.ser | Bin 0 -> 20439 bytes
 .../largeCluster01/TopologyName00038-stormconf.ser | Bin 0 -> 6332 bytes
 .../largeCluster01/TopologyName00039-stormcode.ser | Bin 0 -> 6884 bytes
 .../largeCluster01/TopologyName00039-stormconf.ser | Bin 0 -> 6479 bytes
 83 files changed, 873 insertions(+), 11 deletions(-)

diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
index b5fe659..06b922c 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -33,6 +33,7 @@ import org.apache.storm.Constants;
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.daemon.nimbus.TopologyResources;
 import org.apache.storm.generated.SharedMemory;
+import org.apache.storm.generated.StormTopology;
 import org.apache.storm.generated.WorkerResources;
 import org.apache.storm.networktopography.DNSToSwitchMapping;
 import org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping;
@@ -86,6 +87,8 @@ public class Cluster implements ISchedulingState {
     private List<String> greyListedSupervisors = new ArrayList<>();
     private INimbus inimbus;
     private double minWorkerCpu = 0.0;
+    private final Map<String, Boolean> topoSharedOffHeapMemoryNodeFlag = new HashMap<>();
+    private final Map<String, Map<String, Map<String, Collection<ExecutorDetails>>>> topoIdToNodeIdToSlotIdToExecutors = new HashMap<>();
 
     private static <K, V> Map<K, V> makeMap(String key) {
         return new HashMap<>();
@@ -651,6 +654,14 @@ public class Cluster implements ISchedulingState {
                 "slot: [" + slot.getNodeId() + ", " + slot.getPort() + "] is already occupied.");
         }
 
+        Collection<ExecutorDetails> executorDetails =
+                topoIdToNodeIdToSlotIdToExecutors
+                        .computeIfAbsent(topologyId, Cluster::makeMap)
+                        .computeIfAbsent(slot.getNodeId(), Cluster::makeMap)
+                        .computeIfAbsent(slot.getId(), Cluster::makeSet);
+        executorDetails.clear();
+        executorDetails.addAll(executors);
+
         TopologyDetails td = topologies.getById(topologyId);
         if (td == null) {
             throw new IllegalArgumentException(
@@ -710,6 +721,29 @@ public class Cluster implements ISchedulingState {
     }
 
     /**
+     * Initialize the flag to true if specified topology uses SharedOffHeapNodeMemory, false otherwise.
+     *
+     * @param td TopologyDetails to examine
+     */
+    private void initializeTopoSharedOffHeapNodeMemoryFlag(TopologyDetails td) {
+        String topoId = td.getId();
+        topoSharedOffHeapMemoryNodeFlag.put(topoId, false);
+        StormTopology topology = td.getTopology();
+        if (topology == null) {
+            return; // accommodate multitenant_scheduler_test.clj
+        }
+        if (topology.is_set_shared_memory()) {
+            for (SharedMemory sharedMemory : topology.get_shared_memory().values()) {
+                double val = sharedMemory.get_off_heap_node();
+                if (val > 0.0) {
+                    topoSharedOffHeapMemoryNodeFlag.put(topoId, true);
+                    return;
+                }
+            }
+        }
+    }
+
+    /**
      * Calculate the amount of shared off heap node memory on a given node with the given assignment.
      *
      * @param nodeId     the id of the node
@@ -723,14 +757,19 @@ public class Cluster implements ISchedulingState {
 
     private double calculateSharedOffHeapNodeMemory(
         String nodeId, SchedulerAssignmentImpl assignment, TopologyDetails td, ExecutorDetails extra) {
-        Set<ExecutorDetails> executorsOnNode = new HashSet<>();
-        if (assignment != null) {
-            for (Entry<WorkerSlot, Collection<ExecutorDetails>> entry : assignment.getSlotToExecutors().entrySet()) {
-                if (nodeId.equals(entry.getKey().getNodeId())) {
-                    executorsOnNode.addAll(entry.getValue());
-                }
-            }
+        // short-circuit calculation if topology does not use SharedOffHeapMemory
+        String topoId = td.getId();
+        if (!topoSharedOffHeapMemoryNodeFlag.containsKey(topoId)) {
+            initializeTopoSharedOffHeapNodeMemoryFlag(td);
         }
+        if (!topoSharedOffHeapMemoryNodeFlag.get(topoId)) {
+            return 0.0;
+        }
+
+        Set<ExecutorDetails> executorsOnNode = new HashSet<>();
+        topoIdToNodeIdToSlotIdToExecutors.computeIfAbsent(td.getId(), Cluster::makeMap).computeIfAbsent(nodeId, Cluster::makeMap)
+                .forEach((k, v) -> executorsOnNode.addAll(v));
+
         if (extra != null) {
             executorsOnNode.add(extra);
         }
@@ -749,12 +788,16 @@ public class Cluster implements ISchedulingState {
      */
     public void freeSlot(WorkerSlot slot) {
         // remove the slot from the existing assignments
+        final String nodeId = slot.getNodeId();
         for (SchedulerAssignmentImpl assignment : assignments.values()) {
             if (assignment.isSlotOccupied(slot)) {
-                assertValidTopologyForModification(assignment.getTopologyId());
+                final String topologyId = assignment.getTopologyId();
+                assertValidTopologyForModification(topologyId);
                 assignment.unassignBySlot(slot);
-                String nodeId = slot.getNodeId();
-                TopologyDetails td = topologies.getById(assignment.getTopologyId());
+                topoIdToNodeIdToSlotIdToExecutors.computeIfAbsent(topologyId, Cluster::makeMap).computeIfAbsent(nodeId, Cluster::makeMap)
+                        .computeIfAbsent(slot.getId(), Cluster::makeSet)
+                        .clear();
+                TopologyDetails td = topologies.getById(topologyId);
                 assignment.setTotalSharedOffHeapNodeMemory(
                     nodeId, calculateSharedOffHeapNodeMemory(nodeId, assignment, td));
                 nodeToScheduledResourcesCache.computeIfAbsent(nodeId, Cluster::makeMap).put(slot, new NormalizedResourceRequest());
@@ -762,7 +805,7 @@ public class Cluster implements ISchedulingState {
             }
         }
         //Invalidate the cache as something on the node changed
-        totalResourcesPerNodeCache.remove(slot.getNodeId());
+        totalResourcesPerNodeCache.remove(nodeId);
     }
 
     /**
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
new file mode 100644
index 0000000..cf18c6c
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
@@ -0,0 +1,461 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.scheduling;
+
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.core.config.Configurator;
+import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.ExecutorDetails;
+import org.apache.storm.scheduler.INimbus;
+import org.apache.storm.scheduler.IScheduler;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.Topologies;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.resource.ResourceAwareScheduler;
+import org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResources;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourcesExtension;
+import org.apache.storm.scheduler.resource.normalization.ResourceMetrics;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.junit.Assert;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+@ExtendWith({NormalizedResourcesExtension.class})
+public class TestLargeCluster {
+    private static final Logger LOG = LoggerFactory.getLogger(TestLargeCluster.class);
+
+    public static final String TEST_CLUSTER_NAME = "largeCluster01";
+    public static final String TEST_RESOURCE_PATH = "clusterconf/" + TEST_CLUSTER_NAME;
+    public static final String COMPRESSED_SERIALIZED_TOPOLOGY_FILENAME_ENDING = "code.ser";
+    public static final String COMPRESSED_SERIALIZED_CONFIG_FILENAME_ENDING = "conf.ser";
+
+    private static IScheduler scheduler = null;
+
+    @AfterEach
+    public void cleanup() {
+        if (scheduler != null) {
+            scheduler.cleanup();
+            scheduler = null;
+        }
+    }
+
+    /**
+     * Get the list of serialized topology (*code.ser) and configuration (*conf.ser)
+     * resource files in the path. The resources are sorted so that paired topology and conf
+     * files are sequential. Unpaired files may be ignored by the caller.
+     *
+     * @param path directory in which resources exist.
+     * @return
+     * @throws IOException
+     */
+    public static List<String> getResourceFiles(String path) throws IOException {
+        List<String> fileNames = new ArrayList<>();
+
+        try (
+                InputStream in = getResourceAsStream(path);
+                BufferedReader br = new BufferedReader(new InputStreamReader(in))
+        ) {
+            String resource;
+
+            while ((resource = br.readLine()) != null) {
+                if (resource.endsWith(COMPRESSED_SERIALIZED_TOPOLOGY_FILENAME_ENDING)
+                        || resource.endsWith(COMPRESSED_SERIALIZED_CONFIG_FILENAME_ENDING)) {
+                    fileNames.add(path + "/" + resource);
+                }
+            }
+            Collections.sort(fileNames);
+        }
+        return fileNames;
+    }
+
+    /**
+     * InputStream to read the fully qualified resource path.
+     *
+     * @param resource
+     * @return
+     */
+    public static InputStream getResourceAsStream(String resource) {
+        final InputStream in = getContextClassLoader().getResourceAsStream(resource);
+        return in == null ? ClassLoader.getSystemClassLoader().getResourceAsStream(resource) : in;
+    }
+
+    /**
+     * Read the contents of the fully qualified resource path.
+     *
+     * @param resource
+     * @return
+     * @throws Exception
+     */
+    public static byte[] getResourceAsBytes(String resource) throws Exception {
+        InputStream in = getResourceAsStream(resource);
+        if (in == null) {
+            return null;
+        }
+        try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+            while (in.available() > 0) {
+                out.write(in.read());
+            }
+            return out.toByteArray();
+        }
+    }
+
+    public static ClassLoader getContextClassLoader() {
+        return Thread.currentThread().getContextClassLoader();
+    }
+
+    /**
+     * Create an array of TopologyDetails by reading serialized files for topology and configuration in the
+     * resource path.
+     *
+     * @param failOnParseError throw exception if there are unmatched files, otherwise ignore unmatched and read errors.
+     * @return An array of TopologyDetails representing resource files.
+     * @throws Exception
+     */
+    public static TopologyDetails[] createTopoDetailsArray(boolean failOnParseError) throws Exception {
+        List<TopologyDetails> topoDetailsList = new ArrayList<>();
+        List<String> errors = new ArrayList<>();
+        List<String> resources = getResourceFiles(TEST_RESOURCE_PATH);
+        Map<String, String> codeResourceMap = new TreeMap<>();
+        Map<String, String> confResourceMap = new HashMap<>();
+        for (int i = 0 ; i < resources.size() ; i++) {
+            String resource = resources.get(i);
+            int idxOfSlash = resource.lastIndexOf("/");
+            int idxOfDash = resource.lastIndexOf("-");
+            String nm = idxOfDash > idxOfSlash
+                    ? resource.substring(idxOfSlash + 1, idxOfDash)
+                    : resource.substring(idxOfSlash + 1, resource.length() - COMPRESSED_SERIALIZED_TOPOLOGY_FILENAME_ENDING.length());
+            if (resource.endsWith(COMPRESSED_SERIALIZED_TOPOLOGY_FILENAME_ENDING)) {
+                codeResourceMap.put(nm, resource);
+            } else if (resource.endsWith(COMPRESSED_SERIALIZED_CONFIG_FILENAME_ENDING)) {
+                confResourceMap.put(nm, resource);
+            } else {
+                LOG.info("Ignoring unsupported resource file " + resource);
+            }
+        }
+        String[] examinedConfParams = {
+                Config.TOPOLOGY_NAME,
+                Config.TOPOLOGY_SCHEDULER_STRATEGY,
+                Config.TOPOLOGY_PRIORITY,
+                Config.TOPOLOGY_WORKERS,
+                Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB,
+                Config.TOPOLOGY_SUBMITTER_USER,
+                Config.TOPOLOGY_ACKER_CPU_PCORE_PERCENT,
+                Config.TOPOLOGY_ACKER_RESOURCES_OFFHEAP_MEMORY_MB,
+                Config.TOPOLOGY_ACKER_RESOURCES_ONHEAP_MEMORY_MB,
+        };
+
+        for (String nm : codeResourceMap.keySet()) {
+            String codeResource = codeResourceMap.get(nm);
+            if (!confResourceMap.containsKey(nm)) {
+                String err = String.format("Ignoring topology file %s because of missing config file for %s", codeResource, nm);
+                errors.add(err);
+                LOG.error(err);
+                continue;
+            }
+            String confResource = confResourceMap.get(nm);
+            LOG.info("Found matching topology and config files: {}, {}", codeResource, confResource);
+            StormTopology stormTopology;
+            try {
+                stormTopology = Utils.deserialize(getResourceAsBytes(codeResource), StormTopology.class);
+            } catch (Exception ex) {
+                String err = String.format("Cannot read topology from resource %s", codeResource);
+                errors.add(err);
+                LOG.error(err, ex);
+                continue;
+            }
+
+            Map<String, Object> conf;
+            try {
+                conf = Utils.fromCompressedJsonConf(getResourceAsBytes(confResource));
+            } catch (RuntimeException | IOException ex) {
+                String err = String.format("Cannot read configuration from resource %s", confResource);
+                errors.add(err);
+                LOG.error(err, ex);
+                continue;
+            }
+            // fix 0.10 conf class names
+            String[] configParamsToFix = {Config.TOPOLOGY_SCHEDULER_STRATEGY, Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN,
+                    DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY };
+            for (String configParam: configParamsToFix) {
+                if (!conf.containsKey(configParam)) {
+                    continue;
+                }
+                String className = (String) conf.get(configParam);
+                if (className.startsWith("backtype")) {
+                    className = className.replace("backtype", "org.apache");
+                    conf.put(configParam, className);
+                }
+            }
+            // fix conf params used by ConstraintSolverStrategy
+            if (!conf.containsKey(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_STATE_SEARCH)) {
+                conf.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_STATE_SEARCH, 10_000);
+            }
+            if (!conf.containsKey(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH)) {
+                conf.put(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH, 10_000);
+            }
+
+            String topoId = nm;
+            String topoName = (String) conf.getOrDefault(Config.TOPOLOGY_NAME, nm);
+
+            // conf
+            StringBuffer sb = new StringBuffer("Config for " + nm + ": ");
+            for (String param : examinedConfParams) {
+                Object val = conf.getOrDefault(param, "<null>");
+                sb.append(param).append("=").append(val).append(", ");
+            }
+            LOG.info(sb.toString());
+
+            // topo
+            Map<ExecutorDetails, String> execToComp = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology);
+            LOG.info("Topology \"{}\" spouts={}, bolts={}, execToComp size is {}", topoName,
+                    stormTopology.get_spouts_size(), stormTopology.get_bolts_size(), execToComp.size());
+            int numWorkers = Integer.parseInt("" + conf.getOrDefault(Config.TOPOLOGY_WORKERS, "0"));
+            TopologyDetails topo = new TopologyDetails(topoId, conf, stormTopology,  numWorkers,
+                    execToComp, Time.currentTimeSecs(), "user");
+            topo.getComponents(); // sanity check - normally this should not fail
+
+            topoDetailsList.add(topo);
+        }
+        if (!errors.isEmpty() && failOnParseError) {
+            throw new Exception("Unable to parse all serialized objects\n\t" + String.join("\n\t", errors));
+        }
+        return topoDetailsList.toArray(new TopologyDetails[0]);
+    }
+
+    /**
+     * Check if the files in the resource directory are matched, can be read properly, and code/config files occur
+     * in matched pairs.
+     *
+     * @throws Exception showing bad and unmatched resource files.
+     */
+    @Test
+    public void testReadSerializedTopologiesAndConfigs() throws Exception {
+        List<String> resources = getResourceFiles(TEST_RESOURCE_PATH);
+        Assert.assertTrue("No resource files found in " + TEST_RESOURCE_PATH, !resources.isEmpty());
+        TopologyDetails[] topoDetailsArray = createTopoDetailsArray(true);
+    }
+
+    /**
+     * Create one supervisor and add to the supervisors list.
+     *
+     * @param rack rack-number
+     * @param superInRack supervisor number in the rack
+     * @param cpu percentage
+     * @param mem in megabytes
+     * @param numPorts number of ports on this supervisor
+     * @param sups returned map os supervisors
+     */
+    private static void createAndAddOneSupervisor(
+            int rack, int superInRack, double cpu, double mem, int numPorts,
+            Map<String, SupervisorDetails> sups) {
+
+        List<Number> ports = new LinkedList<>();
+        for (int p = 0; p < numPorts; p++) {
+            ports.add(p);
+        }
+        String superId = String.format("r%03ds%03d", rack, superInRack);
+        String hostId  = String.format("host-%03d-rack-%03d", superInRack, rack);
+        Map<String, Double> resourceMap = new HashMap<>();
+        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, cpu);
+        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, mem);
+        resourceMap.put("network.resource.units", 50.0);
+        SupervisorDetails sup = new SupervisorDetails(superId,
+                hostId, null, ports,
+                NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resourceMap));
+        sups.put(sup.getId(), sup);
+    }
+
+    /**
+     * Create supervisors.
+     *
+     * @param uniformSupervisors true if all supervisors are of the same size, false otherwise.
+     * @return supervisor details indexed by id
+     */
+    private static Map<String, SupervisorDetails> createSupervisors(boolean uniformSupervisors) {
+        Map<String, SupervisorDetails> retVal;
+        if (uniformSupervisors) {
+            int numRacks = 16;
+            int numSupersPerRack = 82;
+            int numPorts = 50; // note: scheduling is slower when components with large cpu/mem leave large percent of workerslots unused
+            int rackStart = 0;
+            int superInRackStart = 1;
+            double cpu = 7200; // %percent
+            double mem = 356_000; // MB
+            Map<String, Double> miscResources = new HashMap<>();
+            miscResources.put("network.resource.units", 100.0);
+
+            return TestUtilsForResourceAwareScheduler.genSupervisorsWithRacks(
+                    numRacks, numSupersPerRack, numPorts, rackStart, superInRackStart, cpu, mem, miscResources);
+
+        } else {
+            // this non-uniform supervisor distribution closely (but not exactly) mimics a large cluster in use
+            int numSupersPerRack = 82;
+            int numPorts = 50;
+
+            Map<String, SupervisorDetails> retList = new HashMap<>();
+
+            for (int rack = 0 ; rack < 12 ; rack++) {
+                double cpu = 3600; // %percent
+                double mem = 178_000; // MB
+                for (int superInRack = 0; superInRack < numSupersPerRack ; superInRack++) {
+                    createAndAddOneSupervisor(rack, superInRack, cpu - 100 * (superInRack % 2), mem, numPorts, retList);
+                }
+            }
+            for (int rack = 12 ; rack < 14 ; rack++) {
+                double cpu = 2400; // %percent
+                double mem = 118_100; // MB
+                for (int superInRack = 0; superInRack < numSupersPerRack ; superInRack++) {
+                    createAndAddOneSupervisor(rack, superInRack, cpu - 100 * (superInRack % 2), mem, numPorts, retList);
+                }
+            }
+            for (int rack = 14 ; rack < 16 ; rack++) {
+                double cpu = 1200; // %percent
+                double mem = 42_480; // MB
+                for (int superInRack = 0; superInRack < numSupersPerRack ; superInRack++) {
+                    createAndAddOneSupervisor(rack, superInRack, cpu - 100 * (superInRack % 2), mem, numPorts, retList);
+                }
+            }
+            return retList;
+        }
+    }
+
+    /**
+     * Create a large cluster, read topologies and configuration from resource directory and schedule.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testLargeCluster() throws Exception {
+        boolean uniformSupervisors = false; // false means non-uniform supervisor distribution
+
+        Map<String, SupervisorDetails> supervisors = createSupervisors(uniformSupervisors);
+
+        TopologyDetails[] topoDetailsArray = createTopoDetailsArray(false);
+        Assert.assertTrue("No topologies found", topoDetailsArray.length > 0);
+        Topologies topologies = new Topologies(topoDetailsArray);
+
+        Config confWithDefaultStrategy = new Config();
+        confWithDefaultStrategy.putAll(topoDetailsArray[0].getConf());
+        confWithDefaultStrategy.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, DefaultResourceAwareStrategy.class.getName());
+
+        INimbus iNimbus = new INimbusTest();
+        Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supervisors, new HashMap<>(),
+                topologies, confWithDefaultStrategy);
+
+        scheduler = new ResourceAwareScheduler();
+
+        List<Class> classesToDebug = Arrays.asList(DefaultResourceAwareStrategy.class,
+                GenericResourceAwareStrategy.class, ResourceAwareScheduler.class,
+                Cluster.class
+        );
+        Level logLevel = Level.INFO ; // switch to Level.DEBUG for verbose otherwise Level.INFO
+        classesToDebug.forEach(x -> Configurator.setLevel(x.getName(), logLevel));
+        long startTime = System.currentTimeMillis();
+        scheduler.prepare(confWithDefaultStrategy, new StormMetricsRegistry());
+        scheduler.schedule(topologies, cluster);
+        long endTime = System.currentTimeMillis();
+        LOG.info("Scheduling Time: {} topologies in {} seconds", topoDetailsArray.length, (endTime - startTime) / 1000.0);
+
+        for (TopologyDetails td : topoDetailsArray) {
+            TestUtilsForResourceAwareScheduler.assertTopologiesFullyScheduled(cluster, td.getName());
+        }
+
+        // Remove topology and reschedule it
+        for (int i = 0 ; i < topoDetailsArray.length ; i++) {
+            startTime = System.currentTimeMillis();
+            TopologyDetails topoDetails = topoDetailsArray[i];
+            cluster.unassign(topoDetails.getId());
+            LOG.info("({}) Removed topology {}", i, topoDetails.getName());
+            IScheduler rescheduler = new ResourceAwareScheduler();
+            rescheduler.prepare(confWithDefaultStrategy, new StormMetricsRegistry());
+            rescheduler.schedule(topologies, cluster);
+            TestUtilsForResourceAwareScheduler.assertTopologiesFullyScheduled(cluster, topoDetails.getName());
+            endTime = System.currentTimeMillis();
+            LOG.info("({}) Scheduling Time: Removed topology {} and rescheduled in {} seconds", i, topoDetails.getName(), (endTime - startTime) / 1000.0);
+        }
+        classesToDebug.forEach(x -> Configurator.setLevel(x.getName(), Level.INFO));
+    }
+
+    public static class INimbusTest implements INimbus {
+        @Override
+        public void prepare(Map<String, Object> topoConf, String schedulerLocalDir) {
+
+        }
+
+        @Override
+        public Collection<WorkerSlot> allSlotsAvailableForScheduling(Collection<SupervisorDetails> existingSupervisors,
+                                                                     Topologies topologies, Set<String> topologiesMissingAssignments) {
+            //return null;
+            Set<WorkerSlot> ret = new HashSet<>();
+            for (SupervisorDetails sd : existingSupervisors) {
+                String id = sd.getId();
+                for (Number port : (Collection<Number>) sd.getMeta()) {
+                    ret.add(new WorkerSlot(id, port));
+                }
+            }
+            return ret;
+        }
+
+        @Override
+        public void assignSlots(Topologies topologies, Map<String, Collection<WorkerSlot>> newSlotsByTopologyId) {
+
+        }
+
+        @Override
+        public String getHostName(Map<String, SupervisorDetails> existingSupervisors, String nodeId) {
+            if (existingSupervisors.containsKey(nodeId)) {
+                return existingSupervisors.get(nodeId).getHost();
+            }
+            return null;
+        }
+
+        @Override
+        public IScheduler getForcedScheduler() {
+            return null;
+        }
+    }
+}
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestTopologyAnonymizerUtils.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestTopologyAnonymizerUtils.java
new file mode 100644
index 0000000..ee1b337
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestTopologyAnonymizerUtils.java
@@ -0,0 +1,358 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.scheduling;
+
+import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.serialization.GzipThriftSerializationDelegate;
+import org.apache.storm.utils.Utils;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Anonymize Serialized Topologies and Configs with the goal of taking internally developed topologies and configuration
+ * and make them publicly available for testing.
+ *
+ * Assume that topologies and configurations exist in the specified resource directory with names ending in
+ * {@link #COMPRESSED_SERIALIZED_TOPOLOGY_FILENAME_ENDING} and {@link #COMPRESSED_SERIALIZED_CONFIG_FILENAME_ENDING}
+ * respectively as they exist in blobstore. Also, when both these files exist for the same topology,
+ * they share the same file name prefix.
+ *
+ * <li> Rename topologies and its corresponding configuration (as identified by its resource name). Ensure that renamed
+ * configuration file for a topology retains the proper linkage so that:
+ *     <p>&lt;old-topo-name&gt;-stormcode.ser -&gt; &lt;new-topo-name&gt;-stormcode.ser</p> and its old conf
+ *     <p>&lt;old-topo-name&gt;-stormconf.ser -&gt; &lt;new-topo-name&gt;-stormconf.ser</p>
+ * </li>
+ *
+ * <li>Rename components in each of the topologies.</li>
+ *
+ * The new converted resource files can be copied to a resource directory under "clusterconf" and made available for use
+ * in TestLargeCluster class.
+ */
+public class TestTopologyAnonymizerUtils {
+    private static final Logger LOG = LoggerFactory.getLogger(TestTopologyAnonymizerUtils.class);
+
+    private static final String DEFAULT_ORIGINAL_RESOURCES_PATH = "clusterconf/iridiumblue";
+    private static final String DEFAULT_ANONYMIZED_RESOURCES_OUTDIR = "src/test/resources/clusterconf/largeCluster01";
+    public static final String COMPRESSED_SERIALIZED_TOPOLOGY_FILENAME_ENDING = "stormcode.ser";
+    public static final String COMPRESSED_SERIALIZED_CONFIG_FILENAME_ENDING = "stormconf.ser";
+
+    private String originalResourcePath;
+    private String outputDirPath;
+
+    public TestTopologyAnonymizerUtils() {
+        this.originalResourcePath = DEFAULT_ORIGINAL_RESOURCES_PATH;
+        this.outputDirPath = DEFAULT_ANONYMIZED_RESOURCES_OUTDIR;
+    }
+
+    /**
+     * Check if resource files are available in the resource path defined by originalResourcePath.
+     *
+     * @throws Exception if there are no resource files in input directory.
+     */
+    public void testResourceAvailability() throws Exception {
+        List<String> resources = getResourceFiles(originalResourcePath);
+        if (resources.isEmpty()) {
+            throw new Exception("No resource files found in resource path " + originalResourcePath);
+        }
+    }
+
+    /**
+     * Take all compressed serialized files in {@link #originalResourcePath} and create anonymized versions or the
+     * topology and configuration in the {@link #outputDirPath}.
+     *
+     * @throws Exception
+     */
+    public void anonymizeDirectory() throws Exception {
+        Map<String, Integer> seenTopoNameIndex = new HashMap<>();
+        List<String> errs = new ArrayList<>();
+
+        List<String> resources = getResourceFiles(originalResourcePath);
+        for (String resource : resources) {
+            if (resource.length() <= COMPRESSED_SERIALIZED_TOPOLOGY_FILENAME_ENDING.length()) {
+                String err = String.format("Resource %s name is too short", resource);
+                errs.add(err);
+                LOG.error(err);
+                continue;
+            }
+            String resType = resource.substring(resource.length() - COMPRESSED_SERIALIZED_TOPOLOGY_FILENAME_ENDING.length());
+            String entryName = getEntryName(
+                    resource.substring(0, resource.length() - COMPRESSED_SERIALIZED_TOPOLOGY_FILENAME_ENDING.length()),
+                    seenTopoNameIndex);
+            int entryNum = seenTopoNameIndex.get(entryName);
+            String topoName = String.format("TopologyName%05d", entryNum);
+            String topoId = String.format("TopologyId%05d", entryNum);
+            String newResourceName = String.format("%s-%s", topoName, resType);
+
+            switch (resType) {
+                case COMPRESSED_SERIALIZED_TOPOLOGY_FILENAME_ENDING:
+                    // anonymize StormTopology
+                    LOG.info("Anonymizing Topology {} as {}, with topoId={}", resource, newResourceName, topoId);
+                    StormTopology stormTopology = readAndAnonymizeTopology(resource, errs);
+                    writeCompressedResource(newResourceName, new GzipThriftSerializationDelegate().serialize(stormTopology));
+                    break;
+
+                case COMPRESSED_SERIALIZED_CONFIG_FILENAME_ENDING:
+                    // anonymize config
+                    LOG.info("Anonymizing Config {} as {}", resource, newResourceName);
+                    Map<String, Object> conf = readAndAnonymizeConfig(resource, topoName, errs);
+                    writeCompressedResource(newResourceName, Utils.toCompressedJsonConf(conf));
+                    break;
+
+                default:
+                    String err = String.format("Resource %s is not recognized as one of supported types", resource);
+                    errs.add(err);
+                    LOG.warn(err);
+            }
+        }
+        if (!errs.isEmpty()) {
+            throw new Exception("Unable to parse all serialized objects\n\t" + String.join("\n\t", errs));
+        }
+    }
+
+    /**
+     * InputStream to read the fully qualified resource path.
+     *
+     * @param resourcePath
+     * @return
+     */
+    private static InputStream getResourceAsStream(String resourcePath) {
+        final InputStream in = Thread.currentThread().getContextClassLoader().getResourceAsStream(resourcePath);
+        return in == null ? ClassLoader.getSystemClassLoader().getResourceAsStream(resourcePath) : in;
+    }
+
+    /**
+     * Get the list of serialized topology (ending with {@link #COMPRESSED_SERIALIZED_TOPOLOGY_FILENAME_ENDING}
+     * and configuration (ending with {@link #COMPRESSED_SERIALIZED_TOPOLOGY_FILENAME_ENDING})
+     * resource files in the path.
+     *
+     * @param path directory in which resources exist.
+     * @return
+     * @throws IOException
+     */
+    public static List<String> getResourceFiles(String path) throws IOException {
+        List<String> fileNames = new ArrayList<>();
+
+        try (
+                InputStream in = getResourceAsStream(path);
+                BufferedReader br = new BufferedReader(new InputStreamReader(in))
+        ) {
+            String resource;
+
+            while ((resource = br.readLine()) != null) {
+                if (resource.endsWith(COMPRESSED_SERIALIZED_TOPOLOGY_FILENAME_ENDING)
+                        || resource.endsWith(COMPRESSED_SERIALIZED_CONFIG_FILENAME_ENDING)) {
+                    fileNames.add(path + "/" + resource);
+                }
+            }
+            Collections.sort(fileNames);
+        }
+        return fileNames;
+    }
+
+    /**
+     * Read the contents of the fully qualified resource path.
+     *
+     * @param resourcePath
+     * @return
+     * @throws Exception
+     */
+    private static byte[] getResourceAsBytes(String resourcePath) throws Exception {
+        InputStream in = getResourceAsStream(resourcePath);
+        if (in == null) {
+            return null;
+        }
+        try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+            while (in.available() > 0) {
+                out.write(in.read());
+            }
+            return out.toByteArray();
+        }
+    }
+
+    private String getEntryName(String resourceNamePrefix, Map<String, Integer> seenTopoNameIndex) {
+        int lastIdxOfSlash = resourceNamePrefix.lastIndexOf("/");
+        String baseName = resourceNamePrefix.substring(lastIdxOfSlash + 1);
+        seenTopoNameIndex.putIfAbsent(baseName, seenTopoNameIndex.size());
+        return baseName;
+    }
+
+    private StormTopology readAndAnonymizeTopology(String resource, List<String> errors) {
+        StormTopology stormTopology;
+        try {
+            stormTopology = Utils.deserialize(getResourceAsBytes(resource), StormTopology.class);
+        } catch (Exception ex) {
+            String err = String.format("Cannot read topology from resource %s", resource);
+            errors.add(err);
+            LOG.error(err, ex);
+            return null;
+        }
+
+        Map<String, String> renameMap = new HashMap<>();
+        if (stormTopology.get_spouts() != null){
+            for (String name : stormTopology.get_spouts().keySet()) {
+                String newName = String.format("Spout-%d", renameMap.size());
+                renameMap.putIfAbsent(name, newName);
+            }
+        }
+        int spoutCnt = renameMap.size();
+        if (stormTopology.get_bolts() != null) {
+            for (String name : stormTopology.get_bolts().keySet()) {
+                String newName = String.format("Bolt-%d", renameMap.size() - spoutCnt);
+                renameMap.putIfAbsent(name, newName);
+            }
+        }
+        int boltCnt = renameMap.size() - spoutCnt;
+        // rename components
+        StormTopology retVal = stormTopology.deepCopy();
+        if (spoutCnt > 0) {
+            Map<String, SpoutSpec> spouts = retVal.get_spouts();
+            for (String name: renameMap.keySet()) {
+                if (spouts.containsKey(name)) {
+                    spouts.put(renameMap.get(name), spouts.remove(name));
+                }
+            }
+            retVal.get_spouts().values().forEach(spec -> {
+                for (GlobalStreamId globalId : spec.get_common().get_inputs().keySet()) {
+                    if (renameMap.containsKey(globalId.get_componentId())) {
+                        globalId.set_componentId(renameMap.get(globalId.get_componentId()));
+                    }
+                }
+            });
+        }
+
+        if (boltCnt > 0) {
+            Map<String, Bolt> bolts = retVal.get_bolts();
+            for (String name: renameMap.keySet()) {
+                if (bolts.containsKey(name)) {
+                    bolts.put(renameMap.get(name), bolts.remove(name));
+                }
+            }
+            retVal.get_bolts().values().forEach(spec -> {
+                for (GlobalStreamId globalId : spec.get_common().get_inputs().keySet()) {
+                    if (renameMap.containsKey(globalId.get_componentId())) {
+                        globalId.set_componentId(renameMap.get(globalId.get_componentId()));
+                    }
+                }
+            });
+        }
+        return retVal;
+    }
+
+    private Map<String, Object> readAndAnonymizeConfig(String confResource, String topoName, List<String> errors) {
+        Map<String, Object> conf;
+        try {
+            conf = Utils.fromCompressedJsonConf(getResourceAsBytes(confResource));
+        } catch (Exception ex) {
+            String err = String.format("Cannot read configuration from resource %s", confResource);
+            errors.add(err);
+            LOG.error(err, ex);
+            return null;
+        }
+
+        conf.put(Config.TOPOLOGY_NAME, topoName);
+        if (!conf.containsKey(Config.TOPOLOGY_RAS_ONE_EXECUTOR_PER_WORKER)) {
+            conf.put(Config.TOPOLOGY_RAS_ONE_EXECUTOR_PER_WORKER, false);
+        }
+        if (!conf.containsKey(Config.TOPOLOGY_RAS_ONE_COMPONENT_PER_WORKER)) {
+            conf.put(Config.TOPOLOGY_RAS_ONE_COMPONENT_PER_WORKER, false);
+        }
+        // Fix 0.10 topology, config param used by ConstraintSolverStrategy
+        if (!conf.containsKey(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_STATE_SEARCH)) {
+            conf.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_STATE_SEARCH, 10_000);
+        }
+        if (!conf.containsKey(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH)) {
+            conf.put(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH, 10_000);
+        }
+        return conf;
+    }
+
+    private void writeCompressedResource(String newResourceName, byte[] compressedBytes) throws IOException {
+        File dir = new File(outputDirPath);
+        if (!dir.exists()) {
+            dir.mkdirs();
+        }
+        try (FileOutputStream fos = new FileOutputStream(new File(dir, newResourceName))
+        ) {
+            fos.write(compressedBytes);
+        }
+    }
+
+    /**
+     * In order to create resources as part of a test run:
+     *    <li>Download compressed topologies and configurations (from blobstore) into resource path
+     *    {@link #DEFAULT_ORIGINAL_RESOURCES_PATH}. The resource names must end with either
+     *    {@link #COMPRESSED_SERIALIZED_TOPOLOGY_FILENAME_ENDING} or {@link #COMPRESSED_SERIALIZED_CONFIG_FILENAME_ENDING}</li>
+     *    <li>Change pathnames for {@link #DEFAULT_ORIGINAL_RESOURCES_PATH} and {@link #DEFAULT_ANONYMIZED_RESOURCES_OUTDIR}</li>
+     *    <li>Uncomment annotation so that this method is executed as a test</li>
+     *    <li>add files in {@link #DEFAULT_ANONYMIZED_RESOURCES_OUTDIR} to the resource path "clusterconf/new-cluster-name"</li>
+     *    <li>use TestLargeCluster to test these newly generated files after changing
+     *    {@link TestLargeCluster#TEST_CLUSTER_NAME} to "new-cluster-name"</li>
+     *    
+     * @throws Exception
+     */
+    // @Test
+    public void testAnonymizer() throws Exception {
+        String[] args = { DEFAULT_ORIGINAL_RESOURCES_PATH, DEFAULT_ANONYMIZED_RESOURCES_OUTDIR };
+        TestTopologyAnonymizerUtils instance = new TestTopologyAnonymizerUtils();
+        instance.originalResourcePath = args[0];
+        instance.outputDirPath = args[1];
+        instance.testResourceAvailability();
+        instance.anonymizeDirectory();
+        LOG.info("Read resources in {} and wrote anonymized files to {}", instance.originalResourcePath, instance.outputDirPath);
+    }
+
+    public static void main(String[] args) {
+        if (args == null || args.length == 0) {
+            args = new String[]{DEFAULT_ORIGINAL_RESOURCES_PATH, DEFAULT_ANONYMIZED_RESOURCES_OUTDIR};
+        }
+        if (args.length != 2) {
+            LOG.error("Expecting two arguments <sourceResourcePath> <targetDir>, received {} args", args.length);
+            System.exit(-1);
+        }
+
+        TestTopologyAnonymizerUtils instance = new TestTopologyAnonymizerUtils();
+        instance.originalResourcePath = args[0];
+        instance.outputDirPath = args[1];
+        try {
+            instance.testResourceAvailability();
+            instance.anonymizeDirectory();
+        } catch (Exception ex) {
+            LOG.error(ex.getMessage(), ex);
+        }
+    }
+}
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00000-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00000-stormcode.ser
new file mode 100644
index 0000000..230ca0e
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00000-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00000-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00000-stormconf.ser
new file mode 100644
index 0000000..1cff252
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00000-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00001-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00001-stormcode.ser
new file mode 100644
index 0000000..2a952da
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00001-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00001-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00001-stormconf.ser
new file mode 100644
index 0000000..b4a7087
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00001-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00002-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00002-stormcode.ser
new file mode 100644
index 0000000..f2fb021
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00002-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00002-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00002-stormconf.ser
new file mode 100644
index 0000000..7ed5170
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00002-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00003-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00003-stormcode.ser
new file mode 100644
index 0000000..ef1615b
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00003-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00003-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00003-stormconf.ser
new file mode 100644
index 0000000..48683dc
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00003-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00004-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00004-stormcode.ser
new file mode 100644
index 0000000..bf0cbd3
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00004-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00004-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00004-stormconf.ser
new file mode 100644
index 0000000..9227b8a
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00004-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00005-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00005-stormcode.ser
new file mode 100644
index 0000000..71f58d4
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00005-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00005-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00005-stormconf.ser
new file mode 100644
index 0000000..76e951a
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00005-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00006-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00006-stormcode.ser
new file mode 100644
index 0000000..bc450ec
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00006-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00006-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00006-stormconf.ser
new file mode 100644
index 0000000..0a82e80
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00006-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00007-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00007-stormcode.ser
new file mode 100644
index 0000000..e0ceac6
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00007-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00007-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00007-stormconf.ser
new file mode 100644
index 0000000..df9a4ec
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00007-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00008-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00008-stormcode.ser
new file mode 100644
index 0000000..4a8cc08
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00008-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00008-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00008-stormconf.ser
new file mode 100644
index 0000000..5e04c8c
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00008-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00009-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00009-stormcode.ser
new file mode 100644
index 0000000..ca7806e
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00009-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00009-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00009-stormconf.ser
new file mode 100644
index 0000000..7c3433a
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00009-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00010-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00010-stormcode.ser
new file mode 100644
index 0000000..61a399a
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00010-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00010-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00010-stormconf.ser
new file mode 100644
index 0000000..4076062
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00010-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00011-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00011-stormcode.ser
new file mode 100644
index 0000000..b575e72
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00011-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00011-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00011-stormconf.ser
new file mode 100644
index 0000000..5e1bc69
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00011-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00012-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00012-stormcode.ser
new file mode 100644
index 0000000..03da91f
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00012-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00012-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00012-stormconf.ser
new file mode 100644
index 0000000..5ed158b
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00012-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00013-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00013-stormcode.ser
new file mode 100644
index 0000000..da26170
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00013-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00013-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00013-stormconf.ser
new file mode 100644
index 0000000..0b76c15
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00013-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00014-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00014-stormcode.ser
new file mode 100644
index 0000000..d6bbc69
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00014-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00014-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00014-stormconf.ser
new file mode 100644
index 0000000..3bd1971
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00014-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00015-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00015-stormcode.ser
new file mode 100644
index 0000000..bf910fe
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00015-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00015-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00015-stormconf.ser
new file mode 100644
index 0000000..c8afda2
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00015-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00016-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00016-stormcode.ser
new file mode 100644
index 0000000..63c35fc
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00016-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00016-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00016-stormconf.ser
new file mode 100644
index 0000000..10eb6ca
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00016-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00017-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00017-stormcode.ser
new file mode 100644
index 0000000..bf7d530
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00017-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00017-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00017-stormconf.ser
new file mode 100644
index 0000000..12eb9ed
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00017-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00018-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00018-stormcode.ser
new file mode 100644
index 0000000..1ed360d
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00018-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00018-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00018-stormconf.ser
new file mode 100644
index 0000000..97b342b
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00018-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00019-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00019-stormcode.ser
new file mode 100644
index 0000000..6640f04
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00019-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00019-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00019-stormconf.ser
new file mode 100644
index 0000000..b443c29
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00019-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00020-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00020-stormcode.ser
new file mode 100644
index 0000000..afa6660
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00020-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00020-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00020-stormconf.ser
new file mode 100644
index 0000000..ec1ec00
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00020-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00021-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00021-stormcode.ser
new file mode 100644
index 0000000..4515494
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00021-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00021-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00021-stormconf.ser
new file mode 100644
index 0000000..bb3352b
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00021-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00022-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00022-stormcode.ser
new file mode 100644
index 0000000..6b208b0
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00022-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00022-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00022-stormconf.ser
new file mode 100644
index 0000000..ed85ed4
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00022-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00023-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00023-stormcode.ser
new file mode 100644
index 0000000..e57a4d0
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00023-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00023-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00023-stormconf.ser
new file mode 100644
index 0000000..23e8e33
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00023-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00024-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00024-stormcode.ser
new file mode 100644
index 0000000..f7fd6ae
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00024-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00024-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00024-stormconf.ser
new file mode 100644
index 0000000..02f8c6a
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00024-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00025-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00025-stormcode.ser
new file mode 100644
index 0000000..a525528
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00025-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00025-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00025-stormconf.ser
new file mode 100644
index 0000000..c702dbb
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00025-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00026-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00026-stormcode.ser
new file mode 100644
index 0000000..0b90e6e
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00026-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00026-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00026-stormconf.ser
new file mode 100644
index 0000000..a5f5ce6
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00026-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00027-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00027-stormcode.ser
new file mode 100644
index 0000000..4bcc7f5
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00027-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00027-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00027-stormconf.ser
new file mode 100644
index 0000000..e36e10d
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00027-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00028-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00028-stormcode.ser
new file mode 100644
index 0000000..0105cb3
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00028-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00028-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00028-stormconf.ser
new file mode 100644
index 0000000..bd1c57d
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00028-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00029-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00029-stormcode.ser
new file mode 100644
index 0000000..fa2e87e
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00029-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00029-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00029-stormconf.ser
new file mode 100644
index 0000000..a664c13
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00029-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00030-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00030-stormcode.ser
new file mode 100644
index 0000000..aea165f
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00030-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00030-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00030-stormconf.ser
new file mode 100644
index 0000000..0c277c3
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00030-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00031-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00031-stormcode.ser
new file mode 100644
index 0000000..51c66ae
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00031-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00031-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00031-stormconf.ser
new file mode 100644
index 0000000..41e0a6b
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00031-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00032-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00032-stormcode.ser
new file mode 100644
index 0000000..7ee5589
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00032-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00032-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00032-stormconf.ser
new file mode 100644
index 0000000..d2b1366
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00032-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00033-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00033-stormcode.ser
new file mode 100644
index 0000000..6e48889
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00033-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00033-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00033-stormconf.ser
new file mode 100644
index 0000000..baaec65
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00033-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00034-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00034-stormcode.ser
new file mode 100644
index 0000000..0585c6c
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00034-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00034-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00034-stormconf.ser
new file mode 100644
index 0000000..fc6be7d
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00034-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00035-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00035-stormcode.ser
new file mode 100644
index 0000000..d437a0e
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00035-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00035-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00035-stormconf.ser
new file mode 100644
index 0000000..396c5f4
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00035-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00036-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00036-stormcode.ser
new file mode 100644
index 0000000..a649fee
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00036-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00036-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00036-stormconf.ser
new file mode 100644
index 0000000..b04c7bc
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00036-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00037-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00037-stormcode.ser
new file mode 100644
index 0000000..5d7f25b
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00037-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00037-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00037-stormconf.ser
new file mode 100644
index 0000000..92af7cb
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00037-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00038-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00038-stormcode.ser
new file mode 100644
index 0000000..8aec850
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00038-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00038-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00038-stormconf.ser
new file mode 100644
index 0000000..388c278
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00038-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00039-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00039-stormcode.ser
new file mode 100644
index 0000000..536ab4a
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00039-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00039-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00039-stormconf.ser
new file mode 100644
index 0000000..e3447dc
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00039-stormconf.ser differ