You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/07 19:14:49 UTC
[08/18] storm git commit: STORM-2702: some code cleanup for unused
stuff and added some RAS support
STORM-2702: some code cleanup for unused stuff and added some RAS support
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6bc32138
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6bc32138
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6bc32138
Branch: refs/heads/master
Commit: 6bc3213806c67818c8e7c0c8d72c0b8a234468b7
Parents: 5238df2
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Aug 24 10:02:03 2017 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Aug 24 10:07:21 2017 -0500
----------------------------------------------------------------------
examples/storm-loadgen/README.md | 2 +
.../org/apache/storm/loadgen/CaptureLoad.java | 131 ++++++++++++++++++-
.../org/apache/storm/loadgen/CompStats.java | 52 --------
.../java/org/apache/storm/loadgen/GenLoad.java | 20 ++-
.../org/apache/storm/loadgen/LoadCompConf.java | 42 +++---
.../org/apache/storm/loadgen/LoadEngine.java | 45 -------
6 files changed, 173 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/6bc32138/examples/storm-loadgen/README.md
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/README.md b/examples/storm-loadgen/README.md
index 6e9c3ed..50a618f 100644
--- a/examples/storm-loadgen/README.md
+++ b/examples/storm-loadgen/README.md
@@ -159,6 +159,8 @@ Spouts and bolts have the same format.
| id | The id of the bolt or spout. This should be unique within the topology |
| parallelism | How many instances of this component should be a part of the topology |
| streams | The streams that are produced by this bolt or spout |
+| cpuLoad | The number of cores this component needs for resource aware scheduling |
+| memoryLoad | The amount of memory in MB that this component needs for resource aware scheduling |
### Output Streams
http://git-wip-us.apache.org/repos/asf/storm/blob/6bc32138/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java
index e748efa..649a4c0 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java
@@ -49,7 +49,10 @@ import org.apache.storm.generated.TopologyPageInfo;
import org.apache.storm.generated.TopologySummary;
import org.apache.storm.generated.WorkerSummary;
import org.apache.storm.utils.NimbusClient;
+import org.apache.storm.utils.ObjectReader;
+import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
+import org.json.simple.parser.JSONParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -150,6 +153,22 @@ public class CaptureLoad {
.withId(boltComp);
boltBuilders.put(boltComp, builder);
}
+
+ Map<String, Map<String, Double>> boltResources = getBoltsResources(topo, topoConf);
+ for (Map.Entry<String, Map<String, Double>> entry: boltResources.entrySet()) {
+ LoadCompConf.Builder bd = boltBuilders.get(entry.getKey());
+ if (bd != null) {
+ Map<String, Double> resources = entry.getValue();
+ Double cpu = resources.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
+ if (cpu != null) {
+ bd.withCpuLoad(cpu);
+ }
+ Double mem = resources.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+ if (mem != null) {
+ bd.withMemoryLoad(mem);
+ }
+ }
+ }
}
//Spouts
@@ -175,6 +194,22 @@ public class CaptureLoad {
.withId(spoutComp);
spoutBuilders.put(spoutComp, builder);
}
+
+ Map<String, Map<String, Double>> spoutResources = getSpoutsResources(topo, topoConf);
+ for (Map.Entry<String, Map<String, Double>> entry: spoutResources.entrySet()) {
+ LoadCompConf.Builder sd = spoutBuilders.get(entry.getKey());
+ if (sd != null) {
+ Map<String, Double> resources = entry.getValue();
+ Double cpu = resources.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
+ if (cpu != null) {
+ sd.withCpuLoad(cpu);
+ }
+ Double mem = resources.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+ if (mem != null) {
+ sd.withMemoryLoad(mem);
+ }
+ }
+ }
}
//Stats...
@@ -247,8 +282,6 @@ public class CaptureLoad {
}
}
builder.withRate(new NormalDistStats(emittedRate));
- //TODO to know if the output keys are skewed we have to guess by looking
- // at the down stream executed stats, but for now we are going to ignore it
//The OutputStream is done
LoadCompConf.Builder comp = boltBuilders.get(id.get_componentId());
@@ -338,4 +371,98 @@ public class CaptureLoad {
System.exit(exitStatus);
}
}
+
+ //ResourceUtils.java is not a available on the classpath to let us parse out the resources we want.
+ // So we have copied and pasted some of the needed methods here. (with a few changes to logging)
+ static Map<String, Map<String, Double>> getBoltsResources(StormTopology topology,
+ Map<String, Object> topologyConf) {
+ Map<String, Map<String, Double>> boltResources = new HashMap<>();
+ if (topology.get_bolts() != null) {
+ for (Map.Entry<String, Bolt> bolt : topology.get_bolts().entrySet()) {
+ Map<String, Double> topologyResources = parseResources(bolt.getValue().get_common().get_json_conf());
+ checkIntialization(topologyResources, bolt.getValue().toString(), topologyConf);
+ boltResources.put(bolt.getKey(), topologyResources);
+ }
+ }
+ return boltResources;
+ }
+
+ static Map<String, Map<String, Double>> getSpoutsResources(StormTopology topology,
+ Map<String, Object> topologyConf) {
+ Map<String, Map<String, Double>> spoutResources = new HashMap<>();
+ if (topology.get_spouts() != null) {
+ for (Map.Entry<String, SpoutSpec> spout : topology.get_spouts().entrySet()) {
+ Map<String, Double> topologyResources = parseResources(spout.getValue().get_common().get_json_conf());
+ checkIntialization(topologyResources, spout.getValue().toString(), topologyConf);
+ spoutResources.put(spout.getKey(), topologyResources);
+ }
+ }
+ return spoutResources;
+ }
+
+ static Map<String, Double> parseResources(String input) {
+ Map<String, Double> topologyResources = new HashMap<>();
+ JSONParser parser = new JSONParser();
+ LOG.debug("Input to parseResources {}", input);
+ try {
+ if (input != null) {
+ Object obj = parser.parse(input);
+ JSONObject jsonObject = (JSONObject) obj;
+ if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB)) {
+ Double topoMemOnHeap = ObjectReader
+ .getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null);
+ topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, topoMemOnHeap);
+ }
+ if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)) {
+ Double topoMemOffHeap = ObjectReader
+ .getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null);
+ topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, topoMemOffHeap);
+ }
+ if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)) {
+ Double topoCpu = ObjectReader.getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT),
+ null);
+ topologyResources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, topoCpu);
+ }
+ LOG.debug("Topology Resources {}", topologyResources);
+ }
+ } catch (org.json.simple.parser.ParseException e) {
+ LOG.error("Failed to parse component resources is:" + e.toString(), e);
+ return null;
+ }
+ return topologyResources;
+ }
+
+ static void checkIntialization(Map<String, Double> topologyResources, String com,
+ Map<String, Object> topologyConf) {
+ checkInitMem(topologyResources, com, topologyConf);
+ checkInitCpu(topologyResources, com, topologyConf);
+ }
+
+ static void checkInitMem(Map<String, Double> topologyResources, String com,
+ Map<String, Object> topologyConf) {
+ if (!topologyResources.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB)) {
+ Double onHeap = ObjectReader.getDouble(
+ topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null);
+ if (onHeap != null) {
+ topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, onHeap);
+ }
+ }
+ if (!topologyResources.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)) {
+ Double offHeap = ObjectReader.getDouble(
+ topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null);
+ if (offHeap != null) {
+ topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, offHeap);
+ }
+ }
+ }
+
+ static void checkInitCpu(Map<String, Double> topologyResources, String com,
+ Map<String, Object> topologyConf) {
+ if (!topologyResources.containsKey(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)) {
+ Double cpu = ObjectReader.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT), null);
+ if (cpu != null) {
+ topologyResources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, cpu);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/6bc32138/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CompStats.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CompStats.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CompStats.java
deleted file mode 100644
index d0e0bd3..0000000
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CompStats.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.loadgen;
-
-import java.io.Serializable;
-import java.util.Map;
-import org.apache.storm.utils.ObjectReader;
-
-/**
- * A set of measurements about a component (bolt/spout) so we can statistically reproduce it.
- */
-public class CompStats implements Serializable {
- public final double cpuPercent; // Right now we don't have a good way to measure any kind of a distribution, this is all approximate
- public final double memoryMb; //again no good way to get a distribution...
-
- /**
- * Parse out a CompStats from a config map.
- * @param conf the map holding the CompStats values
- * @return the parsed CompStats
- */
- public static CompStats fromConf(Map<String, Object> conf) {
- double cpu = ObjectReader.getDouble(conf.get("cpuPercent"), 0.0);
- double memory = ObjectReader.getDouble(conf.get("memoryMb"), 0.0);
- return new CompStats(cpu, memory);
- }
-
- public void addToConf(Map<String, Object> ret) {
- ret.put("cpuPercent", cpuPercent);
- ret.put("memoryMb", memoryMb);
- }
-
- public CompStats(double cpuPercent, double memoryMb) {
- this.cpuPercent = cpuPercent;
- this.memoryMb = memoryMb;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/6bc32138/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java
index 95ba8dd..f535b32 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java
@@ -40,6 +40,7 @@ import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.metric.LoggingMetricsConsumer;
import org.apache.storm.topology.BoltDeclarer;
+import org.apache.storm.topology.SpoutDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.NimbusClient;
import org.slf4j.Logger;
@@ -227,7 +228,7 @@ public class GenLoad {
return tlc;
}
- static int uniquifier = 0;
+ private static int uniquifier = 0;
private static String parseAndSubmit(TopologyLoadConf tlc, String url) throws IOException, InvalidTopologyException,
AuthorizationException, AlreadyAliveException {
@@ -260,7 +261,13 @@ public class GenLoad {
TopologyBuilder builder = new TopologyBuilder();
for (LoadCompConf spoutConf : tlc.spouts) {
System.out.println("ADDING SPOUT " + spoutConf.id);
- builder.setSpout(spoutConf.id, new LoadSpout(spoutConf), spoutConf.parallelism);
+ SpoutDeclarer sd = builder.setSpout(spoutConf.id, new LoadSpout(spoutConf), spoutConf.parallelism);
+ if (spoutConf.memoryLoad > 0) {
+ sd.setMemoryLoad(spoutConf.memoryLoad);
+ }
+ if (spoutConf.cpuLoad > 0) {
+ sd.setCPULoad(spoutConf.cpuLoad);
+ }
}
Map<String, BoltDeclarer> boltDeclarers = new HashMap<>();
@@ -270,7 +277,14 @@ public class GenLoad {
System.out.println("ADDING BOLT " + boltConf.id);
LoadBolt lb = new LoadBolt(boltConf);
bolts.put(boltConf.id, lb);
- boltDeclarers.put(boltConf.id, builder.setBolt(boltConf.id, lb, boltConf.parallelism));
+ BoltDeclarer bd = builder.setBolt(boltConf.id, lb, boltConf.parallelism);
+ if (boltConf.memoryLoad > 0) {
+ bd.setMemoryLoad(boltConf.memoryLoad);
+ }
+ if (boltConf.cpuLoad > 0) {
+ bd.setCPULoad(boltConf.cpuLoad);
+ }
+ boltDeclarers.put(boltConf.id, bd);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/6bc32138/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadCompConf.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadCompConf.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadCompConf.java
index baead0f..6548cc8 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadCompConf.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadCompConf.java
@@ -33,7 +33,8 @@ public class LoadCompConf {
public final String id;
public final int parallelism;
public final List<OutputStream> streams;
- public final CompStats stats;
+ public final double cpuLoad;
+ public final double memoryLoad;
/**
* Parse the LoadCompConf from a config Map.
@@ -50,8 +51,10 @@ public class LoadCompConf {
streams.add(OutputStream.fromConf(streamInfo));
}
}
+ double memoryMb = ObjectReader.getDouble(conf.get("memoryLoad"), 0.0);
+ double cpuPercent = ObjectReader.getDouble(conf.get("cpuLoad"), 0.0);
- return new LoadCompConf(id, parallelism, streams, CompStats.fromConf(conf));
+ return new LoadCompConf(id, parallelism, streams, memoryMb, cpuPercent);
}
/**
@@ -62,6 +65,12 @@ public class LoadCompConf {
Map<String, Object> ret = new HashMap<>();
ret.put("id", id);
ret.put("parallelism", parallelism);
+ if (memoryLoad > 0) {
+ ret.put("memoryLoad", memoryLoad);
+ }
+ if (cpuLoad > 0) {
+ ret.put("cpuLoad", cpuLoad);
+ }
if (streams != null) {
List<Map<String, Object>> streamData = new ArrayList<>();
@@ -70,9 +79,6 @@ public class LoadCompConf {
}
ret.put("streams", streamData);
}
- if (stats != null) {
- stats.addToConf(ret);
- }
return ret;
}
@@ -89,7 +95,7 @@ public class LoadCompConf {
.map((orig) -> orig.remap(id, remappedStreams))
.collect(Collectors.toList());
- return new LoadCompConf(remappedId, parallelism, remappedOutStreams, stats);
+ return new LoadCompConf(remappedId, parallelism, remappedOutStreams, cpuLoad, memoryLoad);
}
/**
@@ -110,7 +116,7 @@ public class LoadCompConf {
public LoadCompConf setParallel(int newParallelism) {
//We need to adjust the throughput accordingly (so that it stays the same in aggregate)
double throughputAdjustment = ((double)parallelism) / newParallelism;
- return new LoadCompConf(id, newParallelism, streams, stats).scaleThroughput(throughputAdjustment);
+ return new LoadCompConf(id, newParallelism, streams, cpuLoad, memoryLoad).scaleThroughput(throughputAdjustment);
}
/**
@@ -121,7 +127,7 @@ public class LoadCompConf {
public LoadCompConf scaleThroughput(double v) {
if (streams != null) {
List<OutputStream> newStreams = streams.stream().map((s) -> s.scaleThroughput(v)).collect(Collectors.toList());
- return new LoadCompConf(id, parallelism, newStreams, stats);
+ return new LoadCompConf(id, parallelism, newStreams, cpuLoad, memoryLoad);
} else {
return this;
}
@@ -147,7 +153,8 @@ public class LoadCompConf {
private String id;
private int parallelism = 1;
private List<OutputStream> streams;
- private CompStats stats;
+ private double cpuLoad = 0.0;
+ private double memoryLoad = 0.0;
public String getId() {
return id;
@@ -189,17 +196,18 @@ public class LoadCompConf {
return this;
}
- public CompStats getStats() {
- return stats;
+ public Builder withCpuLoad(double cpuLoad) {
+ this.cpuLoad = cpuLoad;
+ return this;
}
- public Builder withStats(CompStats stats) {
- this.stats = stats;
+ public Builder withMemoryLoad(double memoryLoad) {
+ this.memoryLoad = memoryLoad;
return this;
}
public LoadCompConf build() {
- return new LoadCompConf(id, parallelism, streams, stats);
+ return new LoadCompConf(id, parallelism, streams, cpuLoad, memoryLoad);
}
}
@@ -208,15 +216,15 @@ public class LoadCompConf {
* @param id the id of the component.
* @param parallelism tha parallelism of the component.
* @param streams the output streams of the component.
- * @param stats the stats of the component.
*/
- public LoadCompConf(String id, int parallelism, List<OutputStream> streams, CompStats stats) {
+ public LoadCompConf(String id, int parallelism, List<OutputStream> streams, double cpuLoad, double memoryLoad) {
this.id = id;
if (id == null) {
throw new IllegalArgumentException("A spout ID cannot be null");
}
this.parallelism = parallelism;
this.streams = streams;
- this.stats = stats;
+ this.cpuLoad = cpuLoad;
+ this.memoryLoad = memoryLoad;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/6bc32138/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadEngine.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadEngine.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadEngine.java
deleted file mode 100644
index 9030379..0000000
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadEngine.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.loadgen;
-
-import java.util.Random;
-import java.util.concurrent.ThreadLocalRandom;
-import org.apache.storm.loadgen.CompStats;
-
-/**
- * The goal of this class is to provide a set of "Tuples" to send that will match as closely as possible the characteristics
- * measured from a production topology.
- */
-public class LoadEngine {
-
- //TODO need to do a lot...
-
- /**
- * Provides an API to simulate the timings and CPU utilization of a bolt or spout.
- */
- public static class InputTimingEngine {
- private final Random rand;
- private final CompStats stats;
-
- public InputTimingEngine(CompStats stats) {
- this.stats = stats;
- rand = ThreadLocalRandom.current();
- }
- }
-}