You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/07 19:14:49 UTC

[08/18] storm git commit: STORM-2702: some code cleanup for unused stuff and added some RAS support

STORM-2702: some code cleanup for unused stuff and added some RAS support


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6bc32138
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6bc32138
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6bc32138

Branch: refs/heads/master
Commit: 6bc3213806c67818c8e7c0c8d72c0b8a234468b7
Parents: 5238df2
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Aug 24 10:02:03 2017 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Aug 24 10:07:21 2017 -0500

----------------------------------------------------------------------
 examples/storm-loadgen/README.md                |   2 +
 .../org/apache/storm/loadgen/CaptureLoad.java   | 131 ++++++++++++++++++-
 .../org/apache/storm/loadgen/CompStats.java     |  52 --------
 .../java/org/apache/storm/loadgen/GenLoad.java  |  20 ++-
 .../org/apache/storm/loadgen/LoadCompConf.java  |  42 +++---
 .../org/apache/storm/loadgen/LoadEngine.java    |  45 -------
 6 files changed, 173 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/6bc32138/examples/storm-loadgen/README.md
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/README.md b/examples/storm-loadgen/README.md
index 6e9c3ed..50a618f 100644
--- a/examples/storm-loadgen/README.md
+++ b/examples/storm-loadgen/README.md
@@ -159,6 +159,8 @@ Spouts and bolts have the same format.
 | id | The id of the bolt or spout.  This should be unique within the topology |
 | parallelism | How many instances of this component should be a part of the topology |
 | streams | The streams that are produced by this bolt or spout |
+| cpuLoad | The number of cores this component needs for resource aware scheduling |
+| memoryLoad | The amount of memory in MB that this component needs for resource aware scheduling |
 
 ### Output Streams
 

http://git-wip-us.apache.org/repos/asf/storm/blob/6bc32138/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java
index e748efa..649a4c0 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java
@@ -49,7 +49,10 @@ import org.apache.storm.generated.TopologyPageInfo;
 import org.apache.storm.generated.TopologySummary;
 import org.apache.storm.generated.WorkerSummary;
 import org.apache.storm.utils.NimbusClient;
+import org.apache.storm.utils.ObjectReader;
+import org.json.simple.JSONObject;
 import org.json.simple.JSONValue;
+import org.json.simple.parser.JSONParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -150,6 +153,22 @@ public class CaptureLoad {
                     .withId(boltComp);
                 boltBuilders.put(boltComp, builder);
             }
+
+            Map<String, Map<String, Double>> boltResources = getBoltsResources(topo, topoConf);
+            for (Map.Entry<String, Map<String, Double>> entry: boltResources.entrySet()) {
+                LoadCompConf.Builder bd = boltBuilders.get(entry.getKey());
+                if (bd != null) {
+                    Map<String, Double> resources = entry.getValue();
+                    Double cpu = resources.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
+                    if (cpu != null) {
+                        bd.withCpuLoad(cpu);
+                    }
+                    Double mem = resources.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+                    if (mem != null) {
+                        bd.withMemoryLoad(mem);
+                    }
+                }
+            }
         }
 
         //Spouts
@@ -175,6 +194,22 @@ public class CaptureLoad {
                     .withId(spoutComp);
                 spoutBuilders.put(spoutComp, builder);
             }
+
+            Map<String, Map<String, Double>> spoutResources = getSpoutsResources(topo, topoConf);
+            for (Map.Entry<String, Map<String, Double>> entry: spoutResources.entrySet()) {
+                LoadCompConf.Builder sd = spoutBuilders.get(entry.getKey());
+                if (sd != null) {
+                    Map<String, Double> resources = entry.getValue();
+                    Double cpu = resources.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
+                    if (cpu != null) {
+                        sd.withCpuLoad(cpu);
+                    }
+                    Double mem = resources.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+                    if (mem != null) {
+                        sd.withMemoryLoad(mem);
+                    }
+                }
+            }
         }
 
         //Stats...
@@ -247,8 +282,6 @@ public class CaptureLoad {
                 }
             }
             builder.withRate(new NormalDistStats(emittedRate));
-            //TODO to know if the output keys are skewed we have to guess by looking
-            // at the down stream executed stats, but for now we are going to ignore it
 
             //The OutputStream is done
             LoadCompConf.Builder comp = boltBuilders.get(id.get_componentId());
@@ -338,4 +371,98 @@ public class CaptureLoad {
             System.exit(exitStatus);
         }
     }
+
+    //ResourceUtils.java is not a available on the classpath to let us parse out the resources we want.
+    // So we have copied and pasted some of the needed methods here. (with a few changes to logging)
+    static Map<String, Map<String, Double>> getBoltsResources(StormTopology topology,
+                                                                     Map<String, Object> topologyConf) {
+        Map<String, Map<String, Double>> boltResources = new HashMap<>();
+        if (topology.get_bolts() != null) {
+            for (Map.Entry<String, Bolt> bolt : topology.get_bolts().entrySet()) {
+                Map<String, Double> topologyResources = parseResources(bolt.getValue().get_common().get_json_conf());
+                checkIntialization(topologyResources, bolt.getValue().toString(), topologyConf);
+                boltResources.put(bolt.getKey(), topologyResources);
+            }
+        }
+        return boltResources;
+    }
+
+    static Map<String, Map<String, Double>> getSpoutsResources(StormTopology topology,
+                                                                      Map<String, Object> topologyConf) {
+        Map<String, Map<String, Double>> spoutResources = new HashMap<>();
+        if (topology.get_spouts() != null) {
+            for (Map.Entry<String, SpoutSpec> spout : topology.get_spouts().entrySet()) {
+                Map<String, Double> topologyResources = parseResources(spout.getValue().get_common().get_json_conf());
+                checkIntialization(topologyResources, spout.getValue().toString(), topologyConf);
+                spoutResources.put(spout.getKey(), topologyResources);
+            }
+        }
+        return spoutResources;
+    }
+
+    static Map<String, Double> parseResources(String input) {
+        Map<String, Double> topologyResources = new HashMap<>();
+        JSONParser parser = new JSONParser();
+        LOG.debug("Input to parseResources {}", input);
+        try {
+            if (input != null) {
+                Object obj = parser.parse(input);
+                JSONObject jsonObject = (JSONObject) obj;
+                if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB)) {
+                    Double topoMemOnHeap = ObjectReader
+                        .getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null);
+                    topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, topoMemOnHeap);
+                }
+                if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)) {
+                    Double topoMemOffHeap = ObjectReader
+                        .getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null);
+                    topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, topoMemOffHeap);
+                }
+                if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)) {
+                    Double topoCpu = ObjectReader.getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT),
+                        null);
+                    topologyResources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, topoCpu);
+                }
+                LOG.debug("Topology Resources {}", topologyResources);
+            }
+        } catch (org.json.simple.parser.ParseException e) {
+            LOG.error("Failed to parse component resources is:" + e.toString(), e);
+            return null;
+        }
+        return topologyResources;
+    }
+
+    static void checkIntialization(Map<String, Double> topologyResources, String com,
+                                          Map<String, Object> topologyConf) {
+        checkInitMem(topologyResources, com, topologyConf);
+        checkInitCpu(topologyResources, com, topologyConf);
+    }
+
+    static void checkInitMem(Map<String, Double> topologyResources, String com,
+                                     Map<String, Object> topologyConf) {
+        if (!topologyResources.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB)) {
+            Double onHeap = ObjectReader.getDouble(
+                topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null);
+            if (onHeap != null) {
+                topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, onHeap);
+            }
+        }
+        if (!topologyResources.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)) {
+            Double offHeap = ObjectReader.getDouble(
+                topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null);
+            if (offHeap != null) {
+                topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, offHeap);
+            }
+        }
+    }
+
+    static void checkInitCpu(Map<String, Double> topologyResources, String com,
+                                     Map<String, Object> topologyConf) {
+        if (!topologyResources.containsKey(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)) {
+            Double cpu = ObjectReader.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT), null);
+            if (cpu != null) {
+                topologyResources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, cpu);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/6bc32138/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CompStats.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CompStats.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CompStats.java
deleted file mode 100644
index d0e0bd3..0000000
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CompStats.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.loadgen;
-
-import java.io.Serializable;
-import java.util.Map;
-import org.apache.storm.utils.ObjectReader;
-
-/**
- * A set of measurements about a component (bolt/spout) so we can statistically reproduce it.
- */
-public class CompStats implements Serializable {
-    public final double cpuPercent; // Right now we don't have a good way to measure any kind of a distribution, this is all approximate
-    public final double memoryMb; //again no good way to get a distribution...
-
-    /**
-     * Parse out a CompStats from a config map.
-     * @param conf the map holding the CompStats values
-     * @return the parsed CompStats
-     */
-    public static CompStats fromConf(Map<String, Object> conf) {
-        double cpu = ObjectReader.getDouble(conf.get("cpuPercent"), 0.0);
-        double memory = ObjectReader.getDouble(conf.get("memoryMb"), 0.0);
-        return new CompStats(cpu, memory);
-    }
-
-    public void addToConf(Map<String, Object> ret) {
-        ret.put("cpuPercent", cpuPercent);
-        ret.put("memoryMb", memoryMb);
-    }
-
-    public CompStats(double cpuPercent, double memoryMb) {
-        this.cpuPercent = cpuPercent;
-        this.memoryMb = memoryMb;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/6bc32138/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java
index 95ba8dd..f535b32 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java
@@ -40,6 +40,7 @@ import org.apache.storm.generated.AuthorizationException;
 import org.apache.storm.generated.InvalidTopologyException;
 import org.apache.storm.metric.LoggingMetricsConsumer;
 import org.apache.storm.topology.BoltDeclarer;
+import org.apache.storm.topology.SpoutDeclarer;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.utils.NimbusClient;
 import org.slf4j.Logger;
@@ -227,7 +228,7 @@ public class GenLoad {
         return tlc;
     }
 
-    static int uniquifier = 0;
+    private static int uniquifier = 0;
 
     private static String parseAndSubmit(TopologyLoadConf tlc, String url) throws IOException, InvalidTopologyException,
         AuthorizationException, AlreadyAliveException {
@@ -260,7 +261,13 @@ public class GenLoad {
         TopologyBuilder builder = new TopologyBuilder();
         for (LoadCompConf spoutConf : tlc.spouts) {
             System.out.println("ADDING SPOUT " + spoutConf.id);
-            builder.setSpout(spoutConf.id, new LoadSpout(spoutConf), spoutConf.parallelism);
+            SpoutDeclarer sd = builder.setSpout(spoutConf.id, new LoadSpout(spoutConf), spoutConf.parallelism);
+            if (spoutConf.memoryLoad > 0) {
+                sd.setMemoryLoad(spoutConf.memoryLoad);
+            }
+            if (spoutConf.cpuLoad > 0) {
+                sd.setCPULoad(spoutConf.cpuLoad);
+            }
         }
 
         Map<String, BoltDeclarer> boltDeclarers = new HashMap<>();
@@ -270,7 +277,14 @@ public class GenLoad {
                 System.out.println("ADDING BOLT " + boltConf.id);
                 LoadBolt lb = new LoadBolt(boltConf);
                 bolts.put(boltConf.id, lb);
-                boltDeclarers.put(boltConf.id, builder.setBolt(boltConf.id, lb, boltConf.parallelism));
+                BoltDeclarer bd = builder.setBolt(boltConf.id, lb, boltConf.parallelism);
+                if (boltConf.memoryLoad > 0) {
+                    bd.setMemoryLoad(boltConf.memoryLoad);
+                }
+                if (boltConf.cpuLoad > 0) {
+                    bd.setCPULoad(boltConf.cpuLoad);
+                }
+                boltDeclarers.put(boltConf.id, bd);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/6bc32138/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadCompConf.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadCompConf.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadCompConf.java
index baead0f..6548cc8 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadCompConf.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadCompConf.java
@@ -33,7 +33,8 @@ public class LoadCompConf {
     public final String id;
     public final int parallelism;
     public final List<OutputStream> streams;
-    public final CompStats stats;
+    public final double cpuLoad;
+    public final double memoryLoad;
 
     /**
      * Parse the LoadCompConf from a config Map.
@@ -50,8 +51,10 @@ public class LoadCompConf {
                 streams.add(OutputStream.fromConf(streamInfo));
             }
         }
+        double memoryMb = ObjectReader.getDouble(conf.get("memoryLoad"), 0.0);
+        double cpuPercent = ObjectReader.getDouble(conf.get("cpuLoad"), 0.0);
 
-        return new LoadCompConf(id, parallelism, streams, CompStats.fromConf(conf));
+        return new LoadCompConf(id, parallelism, streams, memoryMb, cpuPercent);
     }
 
     /**
@@ -62,6 +65,12 @@ public class LoadCompConf {
         Map<String, Object> ret = new HashMap<>();
         ret.put("id", id);
         ret.put("parallelism", parallelism);
+        if (memoryLoad > 0) {
+            ret.put("memoryLoad", memoryLoad);
+        }
+        if (cpuLoad > 0) {
+            ret.put("cpuLoad", cpuLoad);
+        }
 
         if (streams != null) {
             List<Map<String, Object>> streamData = new ArrayList<>();
@@ -70,9 +79,6 @@ public class LoadCompConf {
             }
             ret.put("streams", streamData);
         }
-        if (stats != null) {
-            stats.addToConf(ret);
-        }
         return ret;
     }
 
@@ -89,7 +95,7 @@ public class LoadCompConf {
                 .map((orig) -> orig.remap(id, remappedStreams))
                 .collect(Collectors.toList());
 
-        return new LoadCompConf(remappedId, parallelism, remappedOutStreams, stats);
+        return new LoadCompConf(remappedId, parallelism, remappedOutStreams, cpuLoad, memoryLoad);
     }
 
     /**
@@ -110,7 +116,7 @@ public class LoadCompConf {
     public LoadCompConf setParallel(int newParallelism) {
         //We need to adjust the throughput accordingly (so that it stays the same in aggregate)
         double throughputAdjustment = ((double)parallelism) / newParallelism;
-        return new LoadCompConf(id, newParallelism, streams, stats).scaleThroughput(throughputAdjustment);
+        return new LoadCompConf(id, newParallelism, streams, cpuLoad, memoryLoad).scaleThroughput(throughputAdjustment);
     }
 
     /**
@@ -121,7 +127,7 @@ public class LoadCompConf {
     public LoadCompConf scaleThroughput(double v) {
         if (streams != null) {
             List<OutputStream> newStreams = streams.stream().map((s) -> s.scaleThroughput(v)).collect(Collectors.toList());
-            return new LoadCompConf(id, parallelism, newStreams, stats);
+            return new LoadCompConf(id, parallelism, newStreams, cpuLoad, memoryLoad);
         } else {
             return this;
         }
@@ -147,7 +153,8 @@ public class LoadCompConf {
         private String id;
         private int parallelism = 1;
         private List<OutputStream> streams;
-        private CompStats stats;
+        private double cpuLoad = 0.0;
+        private double memoryLoad = 0.0;
 
         public String getId() {
             return id;
@@ -189,17 +196,18 @@ public class LoadCompConf {
             return this;
         }
 
-        public CompStats getStats() {
-            return stats;
+        public Builder withCpuLoad(double cpuLoad) {
+            this.cpuLoad = cpuLoad;
+            return this;
         }
 
-        public Builder withStats(CompStats stats) {
-            this.stats = stats;
+        public Builder withMemoryLoad(double memoryLoad) {
+            this.memoryLoad = memoryLoad;
             return this;
         }
 
         public LoadCompConf build() {
-            return new LoadCompConf(id, parallelism, streams, stats);
+            return new LoadCompConf(id, parallelism, streams, cpuLoad, memoryLoad);
         }
     }
 
@@ -208,15 +216,15 @@ public class LoadCompConf {
      * @param id the id of the component.
      * @param parallelism tha parallelism of the component.
      * @param streams the output streams of the component.
-     * @param stats the stats of the component.
      */
-    public LoadCompConf(String id, int parallelism, List<OutputStream> streams, CompStats stats) {
+    public LoadCompConf(String id, int parallelism, List<OutputStream> streams, double cpuLoad, double memoryLoad) {
         this.id = id;
         if (id == null) {
             throw new IllegalArgumentException("A spout ID cannot be null");
         }
         this.parallelism = parallelism;
         this.streams = streams;
-        this.stats = stats;
+        this.cpuLoad = cpuLoad;
+        this.memoryLoad = memoryLoad;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/6bc32138/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadEngine.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadEngine.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadEngine.java
deleted file mode 100644
index 9030379..0000000
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadEngine.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.loadgen;
-
-import java.util.Random;
-import java.util.concurrent.ThreadLocalRandom;
-import org.apache.storm.loadgen.CompStats;
-
-/**
- * The goal of this class is to provide a set of "Tuples" to send that will match as closely as possible the characteristics
- * measured from a production topology.
- */
-public class LoadEngine {
-
-    //TODO need to do a lot...
-
-    /**
-     * Provides an API to simulate the timings and CPU utilization of a bolt or spout.
-     */
-    public static class InputTimingEngine {
-        private final Random rand;
-        private final CompStats stats;
-
-        public InputTimingEngine(CompStats stats) {
-            this.stats = stats;
-            rand = ThreadLocalRandom.current();
-        }
-    }
-}