You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ki...@apache.org on 2020/04/15 21:10:47 UTC
[storm] branch master updated: [STORM-3600] Add caching in
Cluster.calculateSharedOffHeapNodeMemory (#3244)
This is an automated email from the ASF dual-hosted git repository.
kishorvpatil pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 7ffc44d [STORM-3600] Add caching in Cluster.calculateSharedOffHeapNodeMemory (#3244)
7ffc44d is described below
commit 7ffc44d083707bc60749fe6821f662fc730c6466
Author: Bipin Prasad <bi...@yahoo.com>
AuthorDate: Wed Apr 15 16:10:34 2020 -0500
[STORM-3600] Add caching in Cluster.calculateSharedOffHeapNodeMemory (#3244)
* [STORM-3600] Add caching in Cluster.calculateSharedOffHeapNodeMemory to yield 4 fourfold increase in speed when scheduling large cluster.
---
.../java/org/apache/storm/scheduler/Cluster.java | 65 ++-
.../strategies/scheduling/TestLargeCluster.java | 461 +++++++++++++++++++++
.../scheduling/TestTopologyAnonymizerUtils.java | 358 ++++++++++++++++
.../largeCluster01/TopologyName00000-stormcode.ser | Bin 0 -> 2640 bytes
.../largeCluster01/TopologyName00000-stormconf.ser | Bin 0 -> 7689 bytes
.../largeCluster01/TopologyName00001-stormcode.ser | Bin 0 -> 13937 bytes
.../largeCluster01/TopologyName00001-stormconf.ser | Bin 0 -> 7668 bytes
.../largeCluster01/TopologyName00002-stormcode.ser | Bin 0 -> 4135 bytes
.../largeCluster01/TopologyName00002-stormconf.ser | Bin 0 -> 7916 bytes
.../largeCluster01/TopologyName00003-stormcode.ser | Bin 0 -> 3685 bytes
.../largeCluster01/TopologyName00003-stormconf.ser | Bin 0 -> 7939 bytes
.../largeCluster01/TopologyName00004-stormcode.ser | Bin 0 -> 14293 bytes
.../largeCluster01/TopologyName00004-stormconf.ser | Bin 0 -> 7742 bytes
.../largeCluster01/TopologyName00005-stormcode.ser | Bin 0 -> 26147 bytes
.../largeCluster01/TopologyName00005-stormconf.ser | Bin 0 -> 6133 bytes
.../largeCluster01/TopologyName00006-stormcode.ser | Bin 0 -> 26469 bytes
.../largeCluster01/TopologyName00006-stormconf.ser | Bin 0 -> 6133 bytes
.../largeCluster01/TopologyName00007-stormcode.ser | Bin 0 -> 48666 bytes
.../largeCluster01/TopologyName00007-stormconf.ser | Bin 0 -> 6670 bytes
.../largeCluster01/TopologyName00008-stormcode.ser | Bin 0 -> 18390 bytes
.../largeCluster01/TopologyName00008-stormconf.ser | Bin 0 -> 6670 bytes
.../largeCluster01/TopologyName00009-stormcode.ser | Bin 0 -> 3497 bytes
.../largeCluster01/TopologyName00009-stormconf.ser | Bin 0 -> 6112 bytes
.../largeCluster01/TopologyName00010-stormcode.ser | Bin 0 -> 3451 bytes
.../largeCluster01/TopologyName00010-stormconf.ser | Bin 0 -> 6112 bytes
.../largeCluster01/TopologyName00011-stormcode.ser | Bin 0 -> 47468 bytes
.../largeCluster01/TopologyName00011-stormconf.ser | Bin 0 -> 8130 bytes
.../largeCluster01/TopologyName00012-stormcode.ser | Bin 0 -> 47351 bytes
.../largeCluster01/TopologyName00012-stormconf.ser | Bin 0 -> 8119 bytes
.../largeCluster01/TopologyName00013-stormcode.ser | Bin 0 -> 25959 bytes
.../largeCluster01/TopologyName00013-stormconf.ser | Bin 0 -> 7219 bytes
.../largeCluster01/TopologyName00014-stormcode.ser | Bin 0 -> 26377 bytes
.../largeCluster01/TopologyName00014-stormconf.ser | Bin 0 -> 5985 bytes
.../largeCluster01/TopologyName00015-stormcode.ser | Bin 0 -> 2285 bytes
.../largeCluster01/TopologyName00015-stormconf.ser | Bin 0 -> 5874 bytes
.../largeCluster01/TopologyName00016-stormcode.ser | Bin 0 -> 13040 bytes
.../largeCluster01/TopologyName00016-stormconf.ser | Bin 0 -> 5898 bytes
.../largeCluster01/TopologyName00017-stormcode.ser | Bin 0 -> 2290 bytes
.../largeCluster01/TopologyName00017-stormconf.ser | Bin 0 -> 5882 bytes
.../largeCluster01/TopologyName00018-stormcode.ser | Bin 0 -> 2280 bytes
.../largeCluster01/TopologyName00018-stormconf.ser | Bin 0 -> 5879 bytes
.../largeCluster01/TopologyName00019-stormcode.ser | Bin 0 -> 2282 bytes
.../largeCluster01/TopologyName00019-stormconf.ser | Bin 0 -> 5876 bytes
.../largeCluster01/TopologyName00020-stormcode.ser | Bin 0 -> 2284 bytes
.../largeCluster01/TopologyName00020-stormconf.ser | Bin 0 -> 5874 bytes
.../largeCluster01/TopologyName00021-stormcode.ser | Bin 0 -> 2486 bytes
.../largeCluster01/TopologyName00021-stormconf.ser | Bin 0 -> 5831 bytes
.../largeCluster01/TopologyName00022-stormcode.ser | Bin 0 -> 3290 bytes
.../largeCluster01/TopologyName00022-stormconf.ser | Bin 0 -> 6222 bytes
.../largeCluster01/TopologyName00023-stormcode.ser | Bin 0 -> 57457 bytes
.../largeCluster01/TopologyName00023-stormconf.ser | Bin 0 -> 5900 bytes
.../largeCluster01/TopologyName00024-stormcode.ser | Bin 0 -> 44399 bytes
.../largeCluster01/TopologyName00024-stormconf.ser | Bin 0 -> 5898 bytes
.../largeCluster01/TopologyName00025-stormcode.ser | Bin 0 -> 7012 bytes
.../largeCluster01/TopologyName00025-stormconf.ser | Bin 0 -> 6186 bytes
.../largeCluster01/TopologyName00026-stormcode.ser | Bin 0 -> 4801 bytes
.../largeCluster01/TopologyName00026-stormconf.ser | Bin 0 -> 6188 bytes
.../largeCluster01/TopologyName00027-stormcode.ser | Bin 0 -> 27924 bytes
.../largeCluster01/TopologyName00027-stormconf.ser | Bin 0 -> 5854 bytes
.../largeCluster01/TopologyName00028-stormcode.ser | Bin 0 -> 172752 bytes
.../largeCluster01/TopologyName00028-stormconf.ser | Bin 0 -> 5989 bytes
.../largeCluster01/TopologyName00029-stormcode.ser | Bin 0 -> 829420 bytes
.../largeCluster01/TopologyName00029-stormconf.ser | Bin 0 -> 6195 bytes
.../largeCluster01/TopologyName00030-stormcode.ser | Bin 0 -> 1111850 bytes
.../largeCluster01/TopologyName00030-stormconf.ser | Bin 0 -> 6193 bytes
.../largeCluster01/TopologyName00031-stormcode.ser | Bin 0 -> 411929 bytes
.../largeCluster01/TopologyName00031-stormconf.ser | Bin 0 -> 6232 bytes
.../largeCluster01/TopologyName00032-stormcode.ser | Bin 0 -> 216295 bytes
.../largeCluster01/TopologyName00032-stormconf.ser | Bin 0 -> 5983 bytes
.../largeCluster01/TopologyName00033-stormcode.ser | Bin 0 -> 3581 bytes
.../largeCluster01/TopologyName00033-stormconf.ser | Bin 0 -> 5759 bytes
.../largeCluster01/TopologyName00034-stormcode.ser | Bin 0 -> 108593 bytes
.../largeCluster01/TopologyName00034-stormconf.ser | Bin 0 -> 6112 bytes
.../largeCluster01/TopologyName00035-stormcode.ser | Bin 0 -> 298502 bytes
.../largeCluster01/TopologyName00035-stormconf.ser | Bin 0 -> 6167 bytes
.../largeCluster01/TopologyName00036-stormcode.ser | Bin 0 -> 242606 bytes
.../largeCluster01/TopologyName00036-stormconf.ser | Bin 0 -> 5982 bytes
.../largeCluster01/TopologyName00037-stormcode.ser | Bin 0 -> 297184 bytes
.../largeCluster01/TopologyName00037-stormconf.ser | Bin 0 -> 6158 bytes
.../largeCluster01/TopologyName00038-stormcode.ser | Bin 0 -> 20439 bytes
.../largeCluster01/TopologyName00038-stormconf.ser | Bin 0 -> 6332 bytes
.../largeCluster01/TopologyName00039-stormcode.ser | Bin 0 -> 6884 bytes
.../largeCluster01/TopologyName00039-stormconf.ser | Bin 0 -> 6479 bytes
83 files changed, 873 insertions(+), 11 deletions(-)
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
index b5fe659..06b922c 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -33,6 +33,7 @@ import org.apache.storm.Constants;
import org.apache.storm.DaemonConfig;
import org.apache.storm.daemon.nimbus.TopologyResources;
import org.apache.storm.generated.SharedMemory;
+import org.apache.storm.generated.StormTopology;
import org.apache.storm.generated.WorkerResources;
import org.apache.storm.networktopography.DNSToSwitchMapping;
import org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping;
@@ -86,6 +87,8 @@ public class Cluster implements ISchedulingState {
private List<String> greyListedSupervisors = new ArrayList<>();
private INimbus inimbus;
private double minWorkerCpu = 0.0;
+ private final Map<String, Boolean> topoSharedOffHeapMemoryNodeFlag = new HashMap<>();
+ private final Map<String, Map<String, Map<String, Collection<ExecutorDetails>>>> topoIdToNodeIdToSlotIdToExecutors = new HashMap<>();
private static <K, V> Map<K, V> makeMap(String key) {
return new HashMap<>();
@@ -651,6 +654,14 @@ public class Cluster implements ISchedulingState {
"slot: [" + slot.getNodeId() + ", " + slot.getPort() + "] is already occupied.");
}
+ Collection<ExecutorDetails> executorDetails =
+ topoIdToNodeIdToSlotIdToExecutors
+ .computeIfAbsent(topologyId, Cluster::makeMap)
+ .computeIfAbsent(slot.getNodeId(), Cluster::makeMap)
+ .computeIfAbsent(slot.getId(), Cluster::makeSet);
+ executorDetails.clear();
+ executorDetails.addAll(executors);
+
TopologyDetails td = topologies.getById(topologyId);
if (td == null) {
throw new IllegalArgumentException(
@@ -710,6 +721,29 @@ public class Cluster implements ISchedulingState {
}
/**
+ * Initialize the flag to true if specified topology uses SharedOffHeapNodeMemory, false otherwise.
+ *
+ * @param td TopologyDetails to examine
+ */
+ private void initializeTopoSharedOffHeapNodeMemoryFlag(TopologyDetails td) {
+ String topoId = td.getId();
+ topoSharedOffHeapMemoryNodeFlag.put(topoId, false);
+ StormTopology topology = td.getTopology();
+ if (topology == null) {
+ return; // accommodate multitenant_scheduler_test.clj
+ }
+ if (topology.is_set_shared_memory()) {
+ for (SharedMemory sharedMemory : topology.get_shared_memory().values()) {
+ double val = sharedMemory.get_off_heap_node();
+ if (val > 0.0) {
+ topoSharedOffHeapMemoryNodeFlag.put(topoId, true);
+ return;
+ }
+ }
+ }
+ }
+
+ /**
* Calculate the amount of shared off heap node memory on a given node with the given assignment.
*
* @param nodeId the id of the node
@@ -723,14 +757,19 @@ public class Cluster implements ISchedulingState {
private double calculateSharedOffHeapNodeMemory(
String nodeId, SchedulerAssignmentImpl assignment, TopologyDetails td, ExecutorDetails extra) {
- Set<ExecutorDetails> executorsOnNode = new HashSet<>();
- if (assignment != null) {
- for (Entry<WorkerSlot, Collection<ExecutorDetails>> entry : assignment.getSlotToExecutors().entrySet()) {
- if (nodeId.equals(entry.getKey().getNodeId())) {
- executorsOnNode.addAll(entry.getValue());
- }
- }
+ // short-circuit calculation if topology does not use SharedOffHeapMemory
+ String topoId = td.getId();
+ if (!topoSharedOffHeapMemoryNodeFlag.containsKey(topoId)) {
+ initializeTopoSharedOffHeapNodeMemoryFlag(td);
}
+ if (!topoSharedOffHeapMemoryNodeFlag.get(topoId)) {
+ return 0.0;
+ }
+
+ Set<ExecutorDetails> executorsOnNode = new HashSet<>();
+ topoIdToNodeIdToSlotIdToExecutors.computeIfAbsent(td.getId(), Cluster::makeMap).computeIfAbsent(nodeId, Cluster::makeMap)
+ .forEach((k, v) -> executorsOnNode.addAll(v));
+
if (extra != null) {
executorsOnNode.add(extra);
}
@@ -749,12 +788,16 @@ public class Cluster implements ISchedulingState {
*/
public void freeSlot(WorkerSlot slot) {
// remove the slot from the existing assignments
+ final String nodeId = slot.getNodeId();
for (SchedulerAssignmentImpl assignment : assignments.values()) {
if (assignment.isSlotOccupied(slot)) {
- assertValidTopologyForModification(assignment.getTopologyId());
+ final String topologyId = assignment.getTopologyId();
+ assertValidTopologyForModification(topologyId);
assignment.unassignBySlot(slot);
- String nodeId = slot.getNodeId();
- TopologyDetails td = topologies.getById(assignment.getTopologyId());
+ topoIdToNodeIdToSlotIdToExecutors.computeIfAbsent(topologyId, Cluster::makeMap).computeIfAbsent(nodeId, Cluster::makeMap)
+ .computeIfAbsent(slot.getId(), Cluster::makeSet)
+ .clear();
+ TopologyDetails td = topologies.getById(topologyId);
assignment.setTotalSharedOffHeapNodeMemory(
nodeId, calculateSharedOffHeapNodeMemory(nodeId, assignment, td));
nodeToScheduledResourcesCache.computeIfAbsent(nodeId, Cluster::makeMap).put(slot, new NormalizedResourceRequest());
@@ -762,7 +805,7 @@ public class Cluster implements ISchedulingState {
}
}
//Invalidate the cache as something on the node changed
- totalResourcesPerNodeCache.remove(slot.getNodeId());
+ totalResourcesPerNodeCache.remove(nodeId);
}
/**
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
new file mode 100644
index 0000000..cf18c6c
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
@@ -0,0 +1,461 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.scheduling;
+
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.core.config.Configurator;
+import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.ExecutorDetails;
+import org.apache.storm.scheduler.INimbus;
+import org.apache.storm.scheduler.IScheduler;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.Topologies;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.resource.ResourceAwareScheduler;
+import org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResources;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourcesExtension;
+import org.apache.storm.scheduler.resource.normalization.ResourceMetrics;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.junit.Assert;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+@ExtendWith({NormalizedResourcesExtension.class})
+public class TestLargeCluster {
+ private static final Logger LOG = LoggerFactory.getLogger(TestLargeCluster.class);
+
+ public static final String TEST_CLUSTER_NAME = "largeCluster01";
+ public static final String TEST_RESOURCE_PATH = "clusterconf/" + TEST_CLUSTER_NAME;
+ public static final String COMPRESSED_SERIALIZED_TOPOLOGY_FILENAME_ENDING = "code.ser";
+ public static final String COMPRESSED_SERIALIZED_CONFIG_FILENAME_ENDING = "conf.ser";
+
+ private static IScheduler scheduler = null;
+
+ @AfterEach
+ public void cleanup() {
+ if (scheduler != null) {
+ scheduler.cleanup();
+ scheduler = null;
+ }
+ }
+
+ /**
+ * Get the list of serialized topology (*code.ser) and configuration (*conf.ser)
+ * resource files in the path. The resources are sorted so that paired topology and conf
+ * files are sequential. Unpaired files may be ignored by the caller.
+ *
+ * @param path directory in which resources exist.
+ * @return
+ * @throws IOException
+ */
+ public static List<String> getResourceFiles(String path) throws IOException {
+ List<String> fileNames = new ArrayList<>();
+
+ try (
+ InputStream in = getResourceAsStream(path);
+ BufferedReader br = new BufferedReader(new InputStreamReader(in))
+ ) {
+ String resource;
+
+ while ((resource = br.readLine()) != null) {
+ if (resource.endsWith(COMPRESSED_SERIALIZED_TOPOLOGY_FILENAME_ENDING)
+ || resource.endsWith(COMPRESSED_SERIALIZED_CONFIG_FILENAME_ENDING)) {
+ fileNames.add(path + "/" + resource);
+ }
+ }
+ Collections.sort(fileNames);
+ }
+ return fileNames;
+ }
+
+ /**
+ * InputStream to read the fully qualified resource path.
+ *
+ * @param resource
+ * @return
+ */
+ public static InputStream getResourceAsStream(String resource) {
+ final InputStream in = getContextClassLoader().getResourceAsStream(resource);
+ return in == null ? ClassLoader.getSystemClassLoader().getResourceAsStream(resource) : in;
+ }
+
+ /**
+ * Read the contents of the fully qualified resource path.
+ *
+ * @param resource
+ * @return
+ * @throws Exception
+ */
+ public static byte[] getResourceAsBytes(String resource) throws Exception {
+ InputStream in = getResourceAsStream(resource);
+ if (in == null) {
+ return null;
+ }
+ try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+ while (in.available() > 0) {
+ out.write(in.read());
+ }
+ return out.toByteArray();
+ }
+ }
+
+ public static ClassLoader getContextClassLoader() {
+ return Thread.currentThread().getContextClassLoader();
+ }
+
+ /**
+ * Create an array of TopologyDetails by reading serialized files for topology and configuration in the
+ * resource path.
+ *
+ * @param failOnParseError throw exception if there are unmatched files, otherwise ignore unmatched and read errors.
+ * @return An array of TopologyDetails representing resource files.
+ * @throws Exception
+ */
+ public static TopologyDetails[] createTopoDetailsArray(boolean failOnParseError) throws Exception {
+ List<TopologyDetails> topoDetailsList = new ArrayList<>();
+ List<String> errors = new ArrayList<>();
+ List<String> resources = getResourceFiles(TEST_RESOURCE_PATH);
+ Map<String, String> codeResourceMap = new TreeMap<>();
+ Map<String, String> confResourceMap = new HashMap<>();
+ for (int i = 0 ; i < resources.size() ; i++) {
+ String resource = resources.get(i);
+ int idxOfSlash = resource.lastIndexOf("/");
+ int idxOfDash = resource.lastIndexOf("-");
+ String nm = idxOfDash > idxOfSlash
+ ? resource.substring(idxOfSlash + 1, idxOfDash)
+ : resource.substring(idxOfSlash + 1, resource.length() - COMPRESSED_SERIALIZED_TOPOLOGY_FILENAME_ENDING.length());
+ if (resource.endsWith(COMPRESSED_SERIALIZED_TOPOLOGY_FILENAME_ENDING)) {
+ codeResourceMap.put(nm, resource);
+ } else if (resource.endsWith(COMPRESSED_SERIALIZED_CONFIG_FILENAME_ENDING)) {
+ confResourceMap.put(nm, resource);
+ } else {
+ LOG.info("Ignoring unsupported resource file " + resource);
+ }
+ }
+ String[] examinedConfParams = {
+ Config.TOPOLOGY_NAME,
+ Config.TOPOLOGY_SCHEDULER_STRATEGY,
+ Config.TOPOLOGY_PRIORITY,
+ Config.TOPOLOGY_WORKERS,
+ Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB,
+ Config.TOPOLOGY_SUBMITTER_USER,
+ Config.TOPOLOGY_ACKER_CPU_PCORE_PERCENT,
+ Config.TOPOLOGY_ACKER_RESOURCES_OFFHEAP_MEMORY_MB,
+ Config.TOPOLOGY_ACKER_RESOURCES_ONHEAP_MEMORY_MB,
+ };
+
+ for (String nm : codeResourceMap.keySet()) {
+ String codeResource = codeResourceMap.get(nm);
+ if (!confResourceMap.containsKey(nm)) {
+ String err = String.format("Ignoring topology file %s because of missing config file for %s", codeResource, nm);
+ errors.add(err);
+ LOG.error(err);
+ continue;
+ }
+ String confResource = confResourceMap.get(nm);
+ LOG.info("Found matching topology and config files: {}, {}", codeResource, confResource);
+ StormTopology stormTopology;
+ try {
+ stormTopology = Utils.deserialize(getResourceAsBytes(codeResource), StormTopology.class);
+ } catch (Exception ex) {
+ String err = String.format("Cannot read topology from resource %s", codeResource);
+ errors.add(err);
+ LOG.error(err, ex);
+ continue;
+ }
+
+ Map<String, Object> conf;
+ try {
+ conf = Utils.fromCompressedJsonConf(getResourceAsBytes(confResource));
+ } catch (RuntimeException | IOException ex) {
+ String err = String.format("Cannot read configuration from resource %s", confResource);
+ errors.add(err);
+ LOG.error(err, ex);
+ continue;
+ }
+ // fix 0.10 conf class names
+ String[] configParamsToFix = {Config.TOPOLOGY_SCHEDULER_STRATEGY, Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN,
+ DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY };
+ for (String configParam: configParamsToFix) {
+ if (!conf.containsKey(configParam)) {
+ continue;
+ }
+ String className = (String) conf.get(configParam);
+ if (className.startsWith("backtype")) {
+ className = className.replace("backtype", "org.apache");
+ conf.put(configParam, className);
+ }
+ }
+ // fix conf params used by ConstraintSolverStrategy
+ if (!conf.containsKey(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_STATE_SEARCH)) {
+ conf.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_STATE_SEARCH, 10_000);
+ }
+ if (!conf.containsKey(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH)) {
+ conf.put(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH, 10_000);
+ }
+
+ String topoId = nm;
+ String topoName = (String) conf.getOrDefault(Config.TOPOLOGY_NAME, nm);
+
+ // conf
+ StringBuffer sb = new StringBuffer("Config for " + nm + ": ");
+ for (String param : examinedConfParams) {
+ Object val = conf.getOrDefault(param, "<null>");
+ sb.append(param).append("=").append(val).append(", ");
+ }
+ LOG.info(sb.toString());
+
+ // topo
+ Map<ExecutorDetails, String> execToComp = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology);
+ LOG.info("Topology \"{}\" spouts={}, bolts={}, execToComp size is {}", topoName,
+ stormTopology.get_spouts_size(), stormTopology.get_bolts_size(), execToComp.size());
+ int numWorkers = Integer.parseInt("" + conf.getOrDefault(Config.TOPOLOGY_WORKERS, "0"));
+ TopologyDetails topo = new TopologyDetails(topoId, conf, stormTopology, numWorkers,
+ execToComp, Time.currentTimeSecs(), "user");
+ topo.getComponents(); // sanity check - normally this should not fail
+
+ topoDetailsList.add(topo);
+ }
+ if (!errors.isEmpty() && failOnParseError) {
+ throw new Exception("Unable to parse all serialized objects\n\t" + String.join("\n\t", errors));
+ }
+ return topoDetailsList.toArray(new TopologyDetails[0]);
+ }
+
+ /**
+ * Check if the files in the resource directory are matched, can be read properly, and code/config files occur
+ * in matched pairs.
+ *
+ * @throws Exception showing bad and unmatched resource files.
+ */
+ @Test
+ public void testReadSerializedTopologiesAndConfigs() throws Exception {
+ List<String> resources = getResourceFiles(TEST_RESOURCE_PATH);
+ Assert.assertTrue("No resource files found in " + TEST_RESOURCE_PATH, !resources.isEmpty());
+ TopologyDetails[] topoDetailsArray = createTopoDetailsArray(true);
+ }
+
+ /**
+ * Create one supervisor and add to the supervisors list.
+ *
+ * @param rack rack-number
+ * @param superInRack supervisor number in the rack
+ * @param cpu percentage
+ * @param mem in megabytes
+ * @param numPorts number of ports on this supervisor
+ * @param sups returned map os supervisors
+ */
+ private static void createAndAddOneSupervisor(
+ int rack, int superInRack, double cpu, double mem, int numPorts,
+ Map<String, SupervisorDetails> sups) {
+
+ List<Number> ports = new LinkedList<>();
+ for (int p = 0; p < numPorts; p++) {
+ ports.add(p);
+ }
+ String superId = String.format("r%03ds%03d", rack, superInRack);
+ String hostId = String.format("host-%03d-rack-%03d", superInRack, rack);
+ Map<String, Double> resourceMap = new HashMap<>();
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, cpu);
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, mem);
+ resourceMap.put("network.resource.units", 50.0);
+ SupervisorDetails sup = new SupervisorDetails(superId,
+ hostId, null, ports,
+ NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resourceMap));
+ sups.put(sup.getId(), sup);
+ }
+
+ /**
+ * Create supervisors.
+ *
+ * @param uniformSupervisors true if all supervisors are of the same size, false otherwise.
+ * @return supervisor details indexed by id
+ */
+ private static Map<String, SupervisorDetails> createSupervisors(boolean uniformSupervisors) {
+ Map<String, SupervisorDetails> retVal;
+ if (uniformSupervisors) {
+ int numRacks = 16;
+ int numSupersPerRack = 82;
+ int numPorts = 50; // note: scheduling is slower when components with large cpu/mem leave large percent of workerslots unused
+ int rackStart = 0;
+ int superInRackStart = 1;
+ double cpu = 7200; // %percent
+ double mem = 356_000; // MB
+ Map<String, Double> miscResources = new HashMap<>();
+ miscResources.put("network.resource.units", 100.0);
+
+ return TestUtilsForResourceAwareScheduler.genSupervisorsWithRacks(
+ numRacks, numSupersPerRack, numPorts, rackStart, superInRackStart, cpu, mem, miscResources);
+
+ } else {
+ // this non-uniform supervisor distribution closely (but not exactly) mimics a large cluster in use
+ int numSupersPerRack = 82;
+ int numPorts = 50;
+
+ Map<String, SupervisorDetails> retList = new HashMap<>();
+
+ for (int rack = 0 ; rack < 12 ; rack++) {
+ double cpu = 3600; // %percent
+ double mem = 178_000; // MB
+ for (int superInRack = 0; superInRack < numSupersPerRack ; superInRack++) {
+ createAndAddOneSupervisor(rack, superInRack, cpu - 100 * (superInRack % 2), mem, numPorts, retList);
+ }
+ }
+ for (int rack = 12 ; rack < 14 ; rack++) {
+ double cpu = 2400; // %percent
+ double mem = 118_100; // MB
+ for (int superInRack = 0; superInRack < numSupersPerRack ; superInRack++) {
+ createAndAddOneSupervisor(rack, superInRack, cpu - 100 * (superInRack % 2), mem, numPorts, retList);
+ }
+ }
+ for (int rack = 14 ; rack < 16 ; rack++) {
+ double cpu = 1200; // %percent
+ double mem = 42_480; // MB
+ for (int superInRack = 0; superInRack < numSupersPerRack ; superInRack++) {
+ createAndAddOneSupervisor(rack, superInRack, cpu - 100 * (superInRack % 2), mem, numPorts, retList);
+ }
+ }
+ return retList;
+ }
+ }
+
+ /**
+ * Create a large cluster, read topologies and configuration from resource directory and schedule.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testLargeCluster() throws Exception {
+ boolean uniformSupervisors = false; // false means non-uniform supervisor distribution
+
+ Map<String, SupervisorDetails> supervisors = createSupervisors(uniformSupervisors);
+
+ TopologyDetails[] topoDetailsArray = createTopoDetailsArray(false);
+ Assert.assertTrue("No topologies found", topoDetailsArray.length > 0);
+ Topologies topologies = new Topologies(topoDetailsArray);
+
+ Config confWithDefaultStrategy = new Config();
+ confWithDefaultStrategy.putAll(topoDetailsArray[0].getConf());
+ confWithDefaultStrategy.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, DefaultResourceAwareStrategy.class.getName());
+
+ INimbus iNimbus = new INimbusTest();
+ Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supervisors, new HashMap<>(),
+ topologies, confWithDefaultStrategy);
+
+ scheduler = new ResourceAwareScheduler();
+
+ List<Class> classesToDebug = Arrays.asList(DefaultResourceAwareStrategy.class,
+ GenericResourceAwareStrategy.class, ResourceAwareScheduler.class,
+ Cluster.class
+ );
+ Level logLevel = Level.INFO ; // switch to Level.DEBUG for verbose otherwise Level.INFO
+ classesToDebug.forEach(x -> Configurator.setLevel(x.getName(), logLevel));
+ long startTime = System.currentTimeMillis();
+ scheduler.prepare(confWithDefaultStrategy, new StormMetricsRegistry());
+ scheduler.schedule(topologies, cluster);
+ long endTime = System.currentTimeMillis();
+ LOG.info("Scheduling Time: {} topologies in {} seconds", topoDetailsArray.length, (endTime - startTime) / 1000.0);
+
+ for (TopologyDetails td : topoDetailsArray) {
+ TestUtilsForResourceAwareScheduler.assertTopologiesFullyScheduled(cluster, td.getName());
+ }
+
+ // Remove topology and reschedule it
+ for (int i = 0 ; i < topoDetailsArray.length ; i++) {
+ startTime = System.currentTimeMillis();
+ TopologyDetails topoDetails = topoDetailsArray[i];
+ cluster.unassign(topoDetails.getId());
+ LOG.info("({}) Removed topology {}", i, topoDetails.getName());
+ IScheduler rescheduler = new ResourceAwareScheduler();
+ rescheduler.prepare(confWithDefaultStrategy, new StormMetricsRegistry());
+ rescheduler.schedule(topologies, cluster);
+ TestUtilsForResourceAwareScheduler.assertTopologiesFullyScheduled(cluster, topoDetails.getName());
+ endTime = System.currentTimeMillis();
+ LOG.info("({}) Scheduling Time: Removed topology {} and rescheduled in {} seconds", i, topoDetails.getName(), (endTime - startTime) / 1000.0);
+ }
+ classesToDebug.forEach(x -> Configurator.setLevel(x.getName(), Level.INFO));
+ }
+
+ public static class INimbusTest implements INimbus {
+ @Override
+ public void prepare(Map<String, Object> topoConf, String schedulerLocalDir) {
+
+ }
+
+ @Override
+ public Collection<WorkerSlot> allSlotsAvailableForScheduling(Collection<SupervisorDetails> existingSupervisors,
+ Topologies topologies, Set<String> topologiesMissingAssignments) {
+ //return null;
+ Set<WorkerSlot> ret = new HashSet<>();
+ for (SupervisorDetails sd : existingSupervisors) {
+ String id = sd.getId();
+ for (Number port : (Collection<Number>) sd.getMeta()) {
+ ret.add(new WorkerSlot(id, port));
+ }
+ }
+ return ret;
+ }
+
+ @Override
+ public void assignSlots(Topologies topologies, Map<String, Collection<WorkerSlot>> newSlotsByTopologyId) {
+
+ }
+
+ @Override
+ public String getHostName(Map<String, SupervisorDetails> existingSupervisors, String nodeId) {
+ if (existingSupervisors.containsKey(nodeId)) {
+ return existingSupervisors.get(nodeId).getHost();
+ }
+ return null;
+ }
+
+ @Override
+ public IScheduler getForcedScheduler() {
+ return null;
+ }
+ }
+}
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestTopologyAnonymizerUtils.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestTopologyAnonymizerUtils.java
new file mode 100644
index 0000000..ee1b337
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestTopologyAnonymizerUtils.java
@@ -0,0 +1,358 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.scheduling;
+
+import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.serialization.GzipThriftSerializationDelegate;
+import org.apache.storm.utils.Utils;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Anonymize Serialized Topologies and Configs with the goal of taking internally developed topologies and configuration
+ * and make them publicly available for testing.
+ *
+ * Assume that topologies and configurations exist in the specified resource directory with names ending in
+ * {@link #COMPRESSED_SERIALIZED_TOPOLOGY_FILENAME_ENDING} and {@link #COMPRESSED_SERIALIZED_CONFIG_FILENAME_ENDING}
+ * respectively as they exist in blobstore. Also, when both these files exist for the same topology,
+ * they share the same file name prefix.
+ *
+ * <li> Rename topologies and its corresponding configuration (as identified by its resource name). Ensure that renamed
+ * configuration file for a topology retains the proper linkage so that:
+ * <p><old-topo-name>-stormcode.ser -> <new-topo-name>-stormcode.ser</p> and its old conf
+ * <p><old-topo-name>-stormconf.ser -> <new-topo-name>-stormconf.ser</p>
+ * </li>
+ *
+ * <li>Rename components in each of the topologies.</li>
+ *
+ * The new converted resource files can be copied to a resource directory under "clusterconf" and made available for use
+ * in TestLargeCluster class.
+ */
+public class TestTopologyAnonymizerUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(TestTopologyAnonymizerUtils.class);
+
+ private static final String DEFAULT_ORIGINAL_RESOURCES_PATH = "clusterconf/iridiumblue";
+ private static final String DEFAULT_ANONYMIZED_RESOURCES_OUTDIR = "src/test/resources/clusterconf/largeCluster01";
+ public static final String COMPRESSED_SERIALIZED_TOPOLOGY_FILENAME_ENDING = "stormcode.ser";
+ public static final String COMPRESSED_SERIALIZED_CONFIG_FILENAME_ENDING = "stormconf.ser";
+
+ private String originalResourcePath;
+ private String outputDirPath;
+
+ public TestTopologyAnonymizerUtils() {
+ this.originalResourcePath = DEFAULT_ORIGINAL_RESOURCES_PATH;
+ this.outputDirPath = DEFAULT_ANONYMIZED_RESOURCES_OUTDIR;
+ }
+
+ /**
+ * Check if resource files are available in the resource path defined by originalResourcePath.
+ *
+ * @throws Exception if there are no resource files in input directory.
+ */
+ public void testResourceAvailability() throws Exception {
+ List<String> resources = getResourceFiles(originalResourcePath);
+ if (resources.isEmpty()) {
+ throw new Exception("No resource files found in resource path " + originalResourcePath);
+ }
+ }
+
+ /**
+ * Take all compressed serialized files in {@link #originalResourcePath} and create anonymized versions or the
+ * topology and configuration in the {@link #outputDirPath}.
+ *
+ * @throws Exception
+ */
+ public void anonymizeDirectory() throws Exception {
+ Map<String, Integer> seenTopoNameIndex = new HashMap<>();
+ List<String> errs = new ArrayList<>();
+
+ List<String> resources = getResourceFiles(originalResourcePath);
+ for (String resource : resources) {
+ if (resource.length() <= COMPRESSED_SERIALIZED_TOPOLOGY_FILENAME_ENDING.length()) {
+ String err = String.format("Resource %s name is too short", resource);
+ errs.add(err);
+ LOG.error(err);
+ continue;
+ }
+ String resType = resource.substring(resource.length() - COMPRESSED_SERIALIZED_TOPOLOGY_FILENAME_ENDING.length());
+ String entryName = getEntryName(
+ resource.substring(0, resource.length() - COMPRESSED_SERIALIZED_TOPOLOGY_FILENAME_ENDING.length()),
+ seenTopoNameIndex);
+ int entryNum = seenTopoNameIndex.get(entryName);
+ String topoName = String.format("TopologyName%05d", entryNum);
+ String topoId = String.format("TopologyId%05d", entryNum);
+ String newResourceName = String.format("%s-%s", topoName, resType);
+
+ switch (resType) {
+ case COMPRESSED_SERIALIZED_TOPOLOGY_FILENAME_ENDING:
+ // anonymize StormTopology
+ LOG.info("Anonymizing Topology {} as {}, with topoId={}", resource, newResourceName, topoId);
+ StormTopology stormTopology = readAndAnonymizeTopology(resource, errs);
+ writeCompressedResource(newResourceName, new GzipThriftSerializationDelegate().serialize(stormTopology));
+ break;
+
+ case COMPRESSED_SERIALIZED_CONFIG_FILENAME_ENDING:
+ // anonymize config
+ LOG.info("Anonymizing Config {} as {}", resource, newResourceName);
+ Map<String, Object> conf = readAndAnonymizeConfig(resource, topoName, errs);
+ writeCompressedResource(newResourceName, Utils.toCompressedJsonConf(conf));
+ break;
+
+ default:
+ String err = String.format("Resource %s is not recognized as one of supported types", resource);
+ errs.add(err);
+ LOG.warn(err);
+ }
+ }
+ if (!errs.isEmpty()) {
+ throw new Exception("Unable to parse all serialized objects\n\t" + String.join("\n\t", errs));
+ }
+ }
+
+ /**
+ * InputStream to read the fully qualified resource path.
+ *
+ * @param resourcePath
+ * @return
+ */
+ private static InputStream getResourceAsStream(String resourcePath) {
+ final InputStream in = Thread.currentThread().getContextClassLoader().getResourceAsStream(resourcePath);
+ return in == null ? ClassLoader.getSystemClassLoader().getResourceAsStream(resourcePath) : in;
+ }
+
+ /**
+ * Get the list of serialized topology (ending with {@link #COMPRESSED_SERIALIZED_TOPOLOGY_FILENAME_ENDING}
+ * and configuration (ending with {@link #COMPRESSED_SERIALIZED_TOPOLOGY_FILENAME_ENDING})
+ * resource files in the path.
+ *
+ * @param path directory in which resources exist.
+ * @return
+ * @throws IOException
+ */
+ public static List<String> getResourceFiles(String path) throws IOException {
+ List<String> fileNames = new ArrayList<>();
+
+ try (
+ InputStream in = getResourceAsStream(path);
+ BufferedReader br = new BufferedReader(new InputStreamReader(in))
+ ) {
+ String resource;
+
+ while ((resource = br.readLine()) != null) {
+ if (resource.endsWith(COMPRESSED_SERIALIZED_TOPOLOGY_FILENAME_ENDING)
+ || resource.endsWith(COMPRESSED_SERIALIZED_CONFIG_FILENAME_ENDING)) {
+ fileNames.add(path + "/" + resource);
+ }
+ }
+ Collections.sort(fileNames);
+ }
+ return fileNames;
+ }
+
+ /**
+ * Read the contents of the fully qualified resource path.
+ *
+ * @param resourcePath
+ * @return
+ * @throws Exception
+ */
+ private static byte[] getResourceAsBytes(String resourcePath) throws Exception {
+ InputStream in = getResourceAsStream(resourcePath);
+ if (in == null) {
+ return null;
+ }
+ try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+ while (in.available() > 0) {
+ out.write(in.read());
+ }
+ return out.toByteArray();
+ }
+ }
+
+ private String getEntryName(String resourceNamePrefix, Map<String, Integer> seenTopoNameIndex) {
+ int lastIdxOfSlash = resourceNamePrefix.lastIndexOf("/");
+ String baseName = resourceNamePrefix.substring(lastIdxOfSlash + 1);
+ seenTopoNameIndex.putIfAbsent(baseName, seenTopoNameIndex.size());
+ return baseName;
+ }
+
+ private StormTopology readAndAnonymizeTopology(String resource, List<String> errors) {
+ StormTopology stormTopology;
+ try {
+ stormTopology = Utils.deserialize(getResourceAsBytes(resource), StormTopology.class);
+ } catch (Exception ex) {
+ String err = String.format("Cannot read topology from resource %s", resource);
+ errors.add(err);
+ LOG.error(err, ex);
+ return null;
+ }
+
+ Map<String, String> renameMap = new HashMap<>();
+ if (stormTopology.get_spouts() != null){
+ for (String name : stormTopology.get_spouts().keySet()) {
+ String newName = String.format("Spout-%d", renameMap.size());
+ renameMap.putIfAbsent(name, newName);
+ }
+ }
+ int spoutCnt = renameMap.size();
+ if (stormTopology.get_bolts() != null) {
+ for (String name : stormTopology.get_bolts().keySet()) {
+ String newName = String.format("Bolt-%d", renameMap.size() - spoutCnt);
+ renameMap.putIfAbsent(name, newName);
+ }
+ }
+ int boltCnt = renameMap.size() - spoutCnt;
+ // rename components
+ StormTopology retVal = stormTopology.deepCopy();
+ if (spoutCnt > 0) {
+ Map<String, SpoutSpec> spouts = retVal.get_spouts();
+ for (String name: renameMap.keySet()) {
+ if (spouts.containsKey(name)) {
+ spouts.put(renameMap.get(name), spouts.remove(name));
+ }
+ }
+ retVal.get_spouts().values().forEach(spec -> {
+ for (GlobalStreamId globalId : spec.get_common().get_inputs().keySet()) {
+ if (renameMap.containsKey(globalId.get_componentId())) {
+ globalId.set_componentId(renameMap.get(globalId.get_componentId()));
+ }
+ }
+ });
+ }
+
+ if (boltCnt > 0) {
+ Map<String, Bolt> bolts = retVal.get_bolts();
+ for (String name: renameMap.keySet()) {
+ if (bolts.containsKey(name)) {
+ bolts.put(renameMap.get(name), bolts.remove(name));
+ }
+ }
+ retVal.get_bolts().values().forEach(spec -> {
+ for (GlobalStreamId globalId : spec.get_common().get_inputs().keySet()) {
+ if (renameMap.containsKey(globalId.get_componentId())) {
+ globalId.set_componentId(renameMap.get(globalId.get_componentId()));
+ }
+ }
+ });
+ }
+ return retVal;
+ }
+
+ private Map<String, Object> readAndAnonymizeConfig(String confResource, String topoName, List<String> errors) {
+ Map<String, Object> conf;
+ try {
+ conf = Utils.fromCompressedJsonConf(getResourceAsBytes(confResource));
+ } catch (Exception ex) {
+ String err = String.format("Cannot read configuration from resource %s", confResource);
+ errors.add(err);
+ LOG.error(err, ex);
+ return null;
+ }
+
+ conf.put(Config.TOPOLOGY_NAME, topoName);
+ if (!conf.containsKey(Config.TOPOLOGY_RAS_ONE_EXECUTOR_PER_WORKER)) {
+ conf.put(Config.TOPOLOGY_RAS_ONE_EXECUTOR_PER_WORKER, false);
+ }
+ if (!conf.containsKey(Config.TOPOLOGY_RAS_ONE_COMPONENT_PER_WORKER)) {
+ conf.put(Config.TOPOLOGY_RAS_ONE_COMPONENT_PER_WORKER, false);
+ }
+ // Fix 0.10 topology, config param used by ConstraintSolverStrategy
+ if (!conf.containsKey(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_STATE_SEARCH)) {
+ conf.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_STATE_SEARCH, 10_000);
+ }
+ if (!conf.containsKey(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH)) {
+ conf.put(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH, 10_000);
+ }
+ return conf;
+ }
+
+ private void writeCompressedResource(String newResourceName, byte[] compressedBytes) throws IOException {
+ File dir = new File(outputDirPath);
+ if (!dir.exists()) {
+ dir.mkdirs();
+ }
+ try (FileOutputStream fos = new FileOutputStream(new File(dir, newResourceName))
+ ) {
+ fos.write(compressedBytes);
+ }
+ }
+
+ /**
+ * In order to create resources as part of a test run:
+ * <li>Download compressed topologies and configurations (from blobstore) into resource path
+ * {@link #DEFAULT_ORIGINAL_RESOURCES_PATH}. The resource names must end with either
+ * {@link #COMPRESSED_SERIALIZED_TOPOLOGY_FILENAME_ENDING} or {@link #COMPRESSED_SERIALIZED_CONFIG_FILENAME_ENDING}</li>
+ * <li>Change pathnames for {@link #DEFAULT_ORIGINAL_RESOURCES_PATH} and {@link #DEFAULT_ANONYMIZED_RESOURCES_OUTDIR}</li>
+ * <li>Uncomment annotation so that this method is executed as a test</li>
+ * <li>add files in {@link #DEFAULT_ANONYMIZED_RESOURCES_OUTDIR} to the resource path "clusterconf/new-cluster-name"</li>
+ * <li>use TestLargeCluster to test these newly generated files after changing
+ * {@link TestLargeCluster#TEST_CLUSTER_NAME} to "new-cluster-name"</li>
+ *
+ * @throws Exception
+ */
+ // @Test
+ public void testAnonymizer() throws Exception {
+ String[] args = { DEFAULT_ORIGINAL_RESOURCES_PATH, DEFAULT_ANONYMIZED_RESOURCES_OUTDIR };
+ TestTopologyAnonymizerUtils instance = new TestTopologyAnonymizerUtils();
+ instance.originalResourcePath = args[0];
+ instance.outputDirPath = args[1];
+ instance.testResourceAvailability();
+ instance.anonymizeDirectory();
+ LOG.info("Read resources in {} and wrote anonymized files to {}", instance.originalResourcePath, instance.outputDirPath);
+ }
+
+ public static void main(String[] args) {
+ if (args == null || args.length == 0) {
+ args = new String[]{DEFAULT_ORIGINAL_RESOURCES_PATH, DEFAULT_ANONYMIZED_RESOURCES_OUTDIR};
+ }
+ if (args.length != 2) {
+ LOG.error("Expecting two arguments <sourceResourcePath> <targetDir>, received {} args", args.length);
+ System.exit(-1);
+ }
+
+ TestTopologyAnonymizerUtils instance = new TestTopologyAnonymizerUtils();
+ instance.originalResourcePath = args[0];
+ instance.outputDirPath = args[1];
+ try {
+ instance.testResourceAvailability();
+ instance.anonymizeDirectory();
+ } catch (Exception ex) {
+ LOG.error(ex.getMessage(), ex);
+ }
+ }
+}
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00000-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00000-stormcode.ser
new file mode 100644
index 0000000..230ca0e
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00000-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00000-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00000-stormconf.ser
new file mode 100644
index 0000000..1cff252
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00000-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00001-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00001-stormcode.ser
new file mode 100644
index 0000000..2a952da
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00001-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00001-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00001-stormconf.ser
new file mode 100644
index 0000000..b4a7087
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00001-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00002-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00002-stormcode.ser
new file mode 100644
index 0000000..f2fb021
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00002-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00002-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00002-stormconf.ser
new file mode 100644
index 0000000..7ed5170
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00002-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00003-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00003-stormcode.ser
new file mode 100644
index 0000000..ef1615b
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00003-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00003-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00003-stormconf.ser
new file mode 100644
index 0000000..48683dc
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00003-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00004-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00004-stormcode.ser
new file mode 100644
index 0000000..bf0cbd3
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00004-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00004-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00004-stormconf.ser
new file mode 100644
index 0000000..9227b8a
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00004-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00005-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00005-stormcode.ser
new file mode 100644
index 0000000..71f58d4
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00005-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00005-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00005-stormconf.ser
new file mode 100644
index 0000000..76e951a
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00005-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00006-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00006-stormcode.ser
new file mode 100644
index 0000000..bc450ec
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00006-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00006-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00006-stormconf.ser
new file mode 100644
index 0000000..0a82e80
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00006-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00007-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00007-stormcode.ser
new file mode 100644
index 0000000..e0ceac6
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00007-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00007-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00007-stormconf.ser
new file mode 100644
index 0000000..df9a4ec
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00007-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00008-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00008-stormcode.ser
new file mode 100644
index 0000000..4a8cc08
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00008-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00008-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00008-stormconf.ser
new file mode 100644
index 0000000..5e04c8c
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00008-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00009-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00009-stormcode.ser
new file mode 100644
index 0000000..ca7806e
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00009-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00009-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00009-stormconf.ser
new file mode 100644
index 0000000..7c3433a
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00009-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00010-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00010-stormcode.ser
new file mode 100644
index 0000000..61a399a
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00010-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00010-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00010-stormconf.ser
new file mode 100644
index 0000000..4076062
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00010-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00011-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00011-stormcode.ser
new file mode 100644
index 0000000..b575e72
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00011-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00011-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00011-stormconf.ser
new file mode 100644
index 0000000..5e1bc69
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00011-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00012-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00012-stormcode.ser
new file mode 100644
index 0000000..03da91f
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00012-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00012-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00012-stormconf.ser
new file mode 100644
index 0000000..5ed158b
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00012-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00013-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00013-stormcode.ser
new file mode 100644
index 0000000..da26170
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00013-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00013-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00013-stormconf.ser
new file mode 100644
index 0000000..0b76c15
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00013-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00014-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00014-stormcode.ser
new file mode 100644
index 0000000..d6bbc69
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00014-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00014-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00014-stormconf.ser
new file mode 100644
index 0000000..3bd1971
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00014-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00015-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00015-stormcode.ser
new file mode 100644
index 0000000..bf910fe
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00015-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00015-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00015-stormconf.ser
new file mode 100644
index 0000000..c8afda2
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00015-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00016-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00016-stormcode.ser
new file mode 100644
index 0000000..63c35fc
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00016-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00016-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00016-stormconf.ser
new file mode 100644
index 0000000..10eb6ca
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00016-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00017-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00017-stormcode.ser
new file mode 100644
index 0000000..bf7d530
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00017-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00017-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00017-stormconf.ser
new file mode 100644
index 0000000..12eb9ed
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00017-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00018-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00018-stormcode.ser
new file mode 100644
index 0000000..1ed360d
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00018-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00018-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00018-stormconf.ser
new file mode 100644
index 0000000..97b342b
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00018-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00019-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00019-stormcode.ser
new file mode 100644
index 0000000..6640f04
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00019-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00019-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00019-stormconf.ser
new file mode 100644
index 0000000..b443c29
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00019-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00020-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00020-stormcode.ser
new file mode 100644
index 0000000..afa6660
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00020-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00020-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00020-stormconf.ser
new file mode 100644
index 0000000..ec1ec00
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00020-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00021-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00021-stormcode.ser
new file mode 100644
index 0000000..4515494
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00021-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00021-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00021-stormconf.ser
new file mode 100644
index 0000000..bb3352b
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00021-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00022-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00022-stormcode.ser
new file mode 100644
index 0000000..6b208b0
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00022-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00022-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00022-stormconf.ser
new file mode 100644
index 0000000..ed85ed4
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00022-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00023-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00023-stormcode.ser
new file mode 100644
index 0000000..e57a4d0
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00023-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00023-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00023-stormconf.ser
new file mode 100644
index 0000000..23e8e33
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00023-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00024-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00024-stormcode.ser
new file mode 100644
index 0000000..f7fd6ae
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00024-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00024-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00024-stormconf.ser
new file mode 100644
index 0000000..02f8c6a
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00024-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00025-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00025-stormcode.ser
new file mode 100644
index 0000000..a525528
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00025-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00025-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00025-stormconf.ser
new file mode 100644
index 0000000..c702dbb
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00025-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00026-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00026-stormcode.ser
new file mode 100644
index 0000000..0b90e6e
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00026-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00026-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00026-stormconf.ser
new file mode 100644
index 0000000..a5f5ce6
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00026-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00027-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00027-stormcode.ser
new file mode 100644
index 0000000..4bcc7f5
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00027-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00027-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00027-stormconf.ser
new file mode 100644
index 0000000..e36e10d
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00027-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00028-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00028-stormcode.ser
new file mode 100644
index 0000000..0105cb3
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00028-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00028-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00028-stormconf.ser
new file mode 100644
index 0000000..bd1c57d
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00028-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00029-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00029-stormcode.ser
new file mode 100644
index 0000000..fa2e87e
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00029-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00029-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00029-stormconf.ser
new file mode 100644
index 0000000..a664c13
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00029-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00030-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00030-stormcode.ser
new file mode 100644
index 0000000..aea165f
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00030-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00030-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00030-stormconf.ser
new file mode 100644
index 0000000..0c277c3
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00030-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00031-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00031-stormcode.ser
new file mode 100644
index 0000000..51c66ae
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00031-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00031-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00031-stormconf.ser
new file mode 100644
index 0000000..41e0a6b
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00031-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00032-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00032-stormcode.ser
new file mode 100644
index 0000000..7ee5589
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00032-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00032-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00032-stormconf.ser
new file mode 100644
index 0000000..d2b1366
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00032-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00033-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00033-stormcode.ser
new file mode 100644
index 0000000..6e48889
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00033-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00033-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00033-stormconf.ser
new file mode 100644
index 0000000..baaec65
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00033-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00034-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00034-stormcode.ser
new file mode 100644
index 0000000..0585c6c
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00034-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00034-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00034-stormconf.ser
new file mode 100644
index 0000000..fc6be7d
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00034-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00035-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00035-stormcode.ser
new file mode 100644
index 0000000..d437a0e
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00035-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00035-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00035-stormconf.ser
new file mode 100644
index 0000000..396c5f4
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00035-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00036-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00036-stormcode.ser
new file mode 100644
index 0000000..a649fee
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00036-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00036-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00036-stormconf.ser
new file mode 100644
index 0000000..b04c7bc
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00036-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00037-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00037-stormcode.ser
new file mode 100644
index 0000000..5d7f25b
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00037-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00037-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00037-stormconf.ser
new file mode 100644
index 0000000..92af7cb
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00037-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00038-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00038-stormcode.ser
new file mode 100644
index 0000000..8aec850
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00038-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00038-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00038-stormconf.ser
new file mode 100644
index 0000000..388c278
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00038-stormconf.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00039-stormcode.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00039-stormcode.ser
new file mode 100644
index 0000000..536ab4a
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00039-stormcode.ser differ
diff --git a/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00039-stormconf.ser b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00039-stormconf.ser
new file mode 100644
index 0000000..e3447dc
Binary files /dev/null and b/storm-server/src/test/resources/clusterconf/largeCluster01/TopologyName00039-stormconf.ser differ