You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/02/14 14:07:42 UTC
[7/9] storm git commit: STORM-2306 - Messaging subsystem redesign.
New Backpressure model.
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/src/main/java/org/apache/storm/perf/LowThroughputTopo.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/LowThroughputTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/LowThroughputTopo.java
new file mode 100644
index 0000000..13488aa
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/LowThroughputTopo.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf;
+
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.perf.utils.Helper;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BoltDeclarer;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Utils;
+import org.slf4j.LoggerFactory;
+
+public class LowThroughputTopo {
+ private static final String SPOUT_ID = "ThrottledSpout";
+ private static final String BOLT_ID = "LatencyPrintBolt";
+ private static final Integer SPOUT_COUNT = 1;
+ private static final Integer BOLT_COUNT = 1;
+ private static final String SLEEP_MS = "sleep";
+
+ static StormTopology getTopology(Map<String, Object> conf) {
+
+ Long sleepMs = ObjectReader.getLong(conf.get(SLEEP_MS));
+ // 1 - Setup Spout --------
+ ThrottledSpout spout = new ThrottledSpout(sleepMs).withOutputFields(ThrottledSpout.DEFAULT_FIELD_NAME);
+
+ // 2 - Setup DevNull Bolt --------
+ LatencyPrintBolt bolt = new LatencyPrintBolt();
+
+
+ // 3 - Setup Topology --------
+ TopologyBuilder builder = new TopologyBuilder();
+
+ builder.setSpout(SPOUT_ID, spout, Helper.getInt(conf, SPOUT_COUNT, 1));
+ BoltDeclarer bd = builder.setBolt(BOLT_ID, bolt, Helper.getInt(conf, BOLT_COUNT, 1));
+
+ bd.localOrShuffleGrouping(SPOUT_ID);
+// bd.shuffleGrouping(SPOUT_ID);
+ return builder.createTopology();
+ }
+
+ public static void main(String[] args) throws Exception {
+ int runTime = -1;
+ Map<String, Object> topoConf = Utils.findAndReadConfigFile(args[1]);
+ topoConf.put(Config.TOPOLOGY_SPOUT_RECVQ_SKIPS, 1);
+ if (args.length > 0) {
+ long sleepMs = Integer.parseInt(args[0]);
+ topoConf.put(SLEEP_MS, sleepMs);
+ }
+ if (args.length > 1) {
+ runTime = Integer.parseInt(args[1]);
+ }
+ if (args.length > 2) {
+ System.err.println("args: spoutSleepMs [runDurationSec] ");
+ return;
+ }
+ topoConf.putAll(Utils.readCommandLineOpts());
+ // Submit topology to storm cluster
+ Helper.runOnClusterAndPrintMetrics(runTime, "LowThroughputTopo", topoConf, getTopology(topoConf));
+ }
+
+ private static class ThrottledSpout extends BaseRichSpout {
+
+ static final String DEFAULT_FIELD_NAME = "time";
+ private String fieldName = DEFAULT_FIELD_NAME;
+ private SpoutOutputCollector collector = null;
+ private long sleepTimeMs;
+
+ public ThrottledSpout(long sleepMs) {
+ this.sleepTimeMs = sleepMs;
+ }
+
+ public ThrottledSpout withOutputFields(String fieldName) {
+ this.fieldName = fieldName;
+ return this;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields(fieldName));
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void nextTuple() {
+ Long now = System.currentTimeMillis();
+ List<Object> tuple = Collections.singletonList(now);
+ collector.emit(tuple, now);
+ Utils.sleep(sleepTimeMs);
+ }
+
+ @Override
+ public void ack(Object msgId) {
+ super.ack(msgId);
+ }
+ }
+
+ private static class LatencyPrintBolt extends BaseRichBolt {
+ private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(LatencyPrintBolt.class);
+ private OutputCollector collector;
+
+ @Override
+ public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ Long now = System.currentTimeMillis();
+ Long then = (Long) tuple.getValues().get(0);
+ LOG.warn("Latency {} ", now - then);
+ System.err.println(now - then);
+ collector.ack(tuple);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/src/main/java/org/apache/storm/perf/SimplifiedWordCountTopo.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/SimplifiedWordCountTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/SimplifiedWordCountTopo.java
new file mode 100644
index 0000000..6d368a0
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/SimplifiedWordCountTopo.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf;
+
+import java.util.Map;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.perf.bolt.CountBolt;
+import org.apache.storm.perf.spout.WordGenSpout;
+import org.apache.storm.perf.utils.Helper;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+
+public class SimplifiedWordCountTopo {
+
+ public static final String SPOUT_ID = "spout";
+ public static final String COUNT_ID = "counter";
+ public static final String TOPOLOGY_NAME = "SimplifiedWordCountTopo";
+
+ // Config settings
+ public static final String SPOUT_NUM = "spout.count";
+ public static final String BOLT_NUM = "counter.count";
+ public static final String INPUT_FILE = "input.file";
+
+ public static final int DEFAULT_SPOUT_NUM = 1;
+ public static final int DEFAULT_COUNT_BOLT_NUM = 1;
+
+
+ static StormTopology getTopology(Map config) {
+
+ final int spoutNum = Helper.getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM);
+ final int cntBoltNum = Helper.getInt(config, BOLT_NUM, DEFAULT_COUNT_BOLT_NUM);
+ final String inputFile = Helper.getStr(config, INPUT_FILE);
+
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout(SPOUT_ID, new WordGenSpout(inputFile), spoutNum);
+ builder.setBolt(COUNT_ID, new CountBolt(), cntBoltNum).fieldsGrouping(SPOUT_ID, new Fields(WordGenSpout.FIELDS));
+
+ return builder.createTopology();
+ }
+
+ // Toplogy: WorGenSpout -> FieldsGrouping -> CountBolt
+ public static void main(String[] args) throws Exception {
+ int runTime = -1;
+ Config topoConf = new Config();
+ if (args.length > 2) {
+ String file = args[0];
+ runTime = Integer.parseInt(args[1]);
+ topoConf.put(INPUT_FILE, file);
+ topoConf.putAll(Utils.findAndReadConfigFile(args[1]));
+ }
+ if (args.length > 3 || args.length == 0) {
+ System.err.println("args: file.txt [runDurationSec] [optionalConfFile]");
+ return;
+ }
+ topoConf.put(Config.TOPOLOGY_SPOUT_RECVQ_SKIPS, 8);
+ topoConf.put(Config.TOPOLOGY_PRODUCER_BATCH_SIZE, 1000);
+ topoConf.put(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING, true);
+ topoConf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, 0.0005);
+ topoConf.put(Config.TOPOLOGY_BOLT_WAIT_STRATEGY, "org.apache.storm.policy.WaitStrategyPark");
+ topoConf.put(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC, 0);
+
+ topoConf.putAll(Utils.readCommandLineOpts());
+ // Submit topology to storm cluster
+ Helper.runOnClusterAndPrintMetrics(runTime, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java
index 61cf394..a3650c6 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java
@@ -21,6 +21,7 @@ package org.apache.storm.perf;
import java.util.Map;
+import org.apache.storm.Config;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.hdfs.bolt.HdfsBolt;
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
@@ -46,11 +47,11 @@ public class StrGenSpoutHdfsBoltTopo {
// configs - topo parallelism
public static final String SPOUT_NUM = "spout.count";
- public static final String BOLT_NUM = "bolt.count";
+ public static final String BOLT_NUM = "bolt.count";
// configs - hdfs bolt
- public static final String HDFS_URI = "hdfs.uri";
- public static final String HDFS_PATH = "hdfs.dir";
+ public static final String HDFS_URI = "hdfs.uri";
+ public static final String HDFS_PATH = "hdfs.dir";
public static final String HDFS_BATCH = "hdfs.batch";
public static final int DEFAULT_SPOUT_NUM = 1;
@@ -63,7 +64,7 @@ public class StrGenSpoutHdfsBoltTopo {
public static final String BOLT_ID = "hdfsBolt";
- public static StormTopology getTopology(Map<String, Object> topoConf) {
+ static StormTopology getTopology(Map<String, Object> topoConf) {
final int hdfsBatch = Helper.getInt(topoConf, HDFS_BATCH, DEFAULT_HDFS_BATCH);
// 1 - Setup StringGen Spout --------
@@ -79,15 +80,15 @@ public class StrGenSpoutHdfsBoltTopo {
final int boltNum = Helper.getInt(topoConf, BOLT_NUM, DEFAULT_BOLT_NUM);
// Use default, Storm-generated file names
- FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath(Helper.getStr(topoConf, HDFS_PATH) );
+ FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath(Helper.getStr(topoConf, HDFS_PATH));
// Instantiate the HdfsBolt
HdfsBolt bolt = new HdfsBolt()
- .withFsUrl(Hdfs_url)
- .withFileNameFormat(fileNameFormat)
- .withRecordFormat(format)
- .withRotationPolicy(rotationPolicy)
- .withSyncPolicy(syncPolicy);
+ .withFsUrl(Hdfs_url)
+ .withFileNameFormat(fileNameFormat)
+ .withRecordFormat(format)
+ .withRotationPolicy(rotationPolicy)
+ .withSyncPolicy(syncPolicy);
// 3 - Setup Topology --------
@@ -95,13 +96,15 @@ public class StrGenSpoutHdfsBoltTopo {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SPOUT_ID, spout, spoutNum);
builder.setBolt(BOLT_ID, bolt, boltNum)
- .localOrShuffleGrouping(SPOUT_ID);
+ .localOrShuffleGrouping(SPOUT_ID);
return builder.createTopology();
}
- /** Spout generates random strings and HDFS bolt writes them to a text file */
+ /**
+ * Spout generates random strings and HDFS bolt writes them to a text file
+ */
public static void main(String[] args) throws Exception {
String confFile = "conf/HdfsSpoutTopo.yaml";
int runTime = -1; //Run until Ctrl-C
@@ -120,7 +123,13 @@ public class StrGenSpoutHdfsBoltTopo {
}
Map<String, Object> topoConf = Utils.findAndReadConfigFile(confFile);
+ topoConf.put(Config.TOPOLOGY_PRODUCER_BATCH_SIZE, 1000);
+ topoConf.put(Config.TOPOLOGY_BOLT_WAIT_STRATEGY, "org.apache.storm.policy.WaitStrategyPark");
+ topoConf.put(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC, 0);
+ topoConf.put(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING, true);
+ topoConf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, 0.0005);
+ topoConf.putAll(Utils.readCommandLineOpts());
Helper.runOnClusterAndPrintMetrics(runTime, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
}
@@ -140,13 +149,13 @@ public class StrGenSpoutHdfsBoltTopo {
* @param delimiter
* @return
*/
- public LineWriter withLineDelimiter(String delimiter){
+ public LineWriter withLineDelimiter(String delimiter) {
this.lineDelimiter = delimiter;
return this;
}
public byte[] format(Tuple tuple) {
- return (tuple.getValueByField(fieldName).toString() + this.lineDelimiter).getBytes();
+ return (tuple.getValueByField(fieldName).toString() + this.lineDelimiter).getBytes();
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/src/main/java/org/apache/storm/perf/ThroughputMeter.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/ThroughputMeter.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/ThroughputMeter.java
new file mode 100644
index 0000000..09aec7d
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/ThroughputMeter.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf;
+
+public class ThroughputMeter {
+
+ private String name;
+ private long startTime = 0;
+ private int count;
+ private long endTime = 0;
+
+ public ThroughputMeter(String name) {
+ this.name = name;
+ this.startTime = System.currentTimeMillis();
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void record() {
+ ++count;
+ }
+
+ public double stop() {
+ if (startTime == 0) {
+ return 0;
+ }
+ if (endTime == 0) {
+ this.endTime = System.currentTimeMillis();
+ }
+ return calcThroughput(count, startTime, endTime);
+ }
+
+ // Returns the recorded throughput since the last call to getCurrentThroughput()
+ // or since this meter was instantiated if being called for fisrt time.
+ public double getCurrentThroughput() {
+ if (startTime == 0) {
+ return 0;
+ }
+ long currTime = (endTime == 0) ? System.currentTimeMillis() : endTime;
+
+ double result = calcThroughput(count, startTime, currTime) / 1000; // K/sec
+ startTime = currTime;
+ count = 0;
+ return result;
+ }
+
+ /**
+ * @return events/sec
+ */
+ private static double calcThroughput(long count, long startTime, long endTime) {
+ long gap = (endTime - startTime);
+ return (count / gap) * 1000;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java
index 3286ac3..368699b 100644
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java
@@ -18,6 +18,8 @@
package org.apache.storm.perf.bolt;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
@@ -27,8 +29,6 @@ import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
-import java.util.HashMap;
-import java.util.Map;
public class CountBolt extends BaseBasicBolt {
public static final String FIELDS_WORD = "word";
@@ -44,8 +44,9 @@ public class CountBolt extends BaseBasicBolt {
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0);
Integer count = counts.get(word);
- if (count == null)
+ if (count == null) {
count = 0;
+ }
count++;
counts.put(word, count);
collector.emit(new Values(word, count));
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java
index f9d045e..5f9c710 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java
@@ -18,26 +18,37 @@
package org.apache.storm.perf.bolt;
+import java.util.Map;
+import java.util.concurrent.locks.LockSupport;
+
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
-
-import java.util.Map;
+import org.apache.storm.utils.ObjectReader;
+import org.slf4j.LoggerFactory;
public class DevNullBolt extends BaseRichBolt {
+ private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(DevNullBolt.class);
private OutputCollector collector;
+ private Long sleepNanos;
+ private int eCount = 0;
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
+ this.sleepNanos = ObjectReader.getLong(topoConf.get("nullbolt.sleep.micros"), 0L) * 1_000;
}
@Override
public void execute(Tuple tuple) {
collector.ack(tuple);
+ if (sleepNanos > 0) {
+ LockSupport.parkNanos(sleepNanos);
+ }
+ ++eCount;
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java
index 3735447..0644e31 100644
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java
@@ -18,6 +18,8 @@
package org.apache.storm.perf.bolt;
+import java.util.Map;
+
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -26,8 +28,6 @@ import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
-import java.util.Map;
-
public class IdBolt extends BaseRichBolt {
private OutputCollector collector;
@@ -38,7 +38,7 @@ public class IdBolt extends BaseRichBolt {
@Override
public void execute(Tuple tuple) {
- collector.emit(tuple, new Values( tuple.getValues() ) );
+ collector.emit(tuple, new Values(tuple.getValues()));
collector.ack(tuple);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java
index f32628d..abb5af8 100644
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java
@@ -18,6 +18,8 @@
package org.apache.storm.perf.bolt;
+import java.util.Map;
+
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -26,12 +28,17 @@ import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
-import java.util.Map;
-
public class SplitSentenceBolt extends BaseBasicBolt {
public static final String FIELDS = "word";
+ public static String[] splitSentence(String sentence) {
+ if (sentence != null) {
+ return sentence.split("\\s+");
+ }
+ return null;
+ }
+
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context) {
}
@@ -47,12 +54,4 @@ public class SplitSentenceBolt extends BaseBasicBolt {
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(FIELDS));
}
-
-
- public static String[] splitSentence(String sentence) {
- if (sentence != null) {
- return sentence.split("\\s+");
- }
- return null;
- }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java
index afd2ebc..46f12ab 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java
@@ -18,16 +18,15 @@
package org.apache.storm.perf.spout;
+import java.util.ArrayList;
+import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
+import org.apache.storm.utils.ObjectReader;
public class ConstSpout extends BaseRichSpout {
@@ -35,7 +34,9 @@ public class ConstSpout extends BaseRichSpout {
private String value;
private String fieldName = DEFAUT_FIELD_NAME;
private SpoutOutputCollector collector = null;
- private int count=0;
+ private int count = 0;
+ private Long sleep = 0L;
+ private int ackCount = 0;
public ConstSpout(String value) {
this.value = value;
@@ -54,16 +55,26 @@ public class ConstSpout extends BaseRichSpout {
@Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
+ this.sleep = ObjectReader.getLong(conf.get("spout.sleep"), 0L);
}
@Override
public void nextTuple() {
- List<Object> tuple = Collections.singletonList((Object) value);
+ ArrayList<Object> tuple = new ArrayList<Object>(1);
+ tuple.add(value);
collector.emit(tuple, count++);
+ try {
+ if (sleep > 0) {
+ Thread.sleep(sleep);
+ }
+ } catch (InterruptedException e) {
+ return;
+ }
}
@Override
public void ack(Object msgId) {
+ ++ackCount;
super.ack(msgId);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java
index 4815a02..4f27d3b 100644
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java
@@ -18,13 +18,6 @@
package org.apache.storm.perf.spout;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichSpout;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
@@ -35,6 +28,13 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
public class FileReadSpout extends BaseRichSpout {
public static final String FIELDS = "sentence";
private static final long serialVersionUID = -2582705611472467172L;
@@ -55,6 +55,26 @@ public class FileReadSpout extends BaseRichSpout {
this.reader = reader;
}
+ public static List<String> readLines(InputStream input) {
+ List<String> lines = new ArrayList<>();
+ try {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(input));
+ try {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ lines.add(line);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Reading file failed", e);
+ } finally {
+ reader.close();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Error closing reader", e);
+ }
+ return lines;
+ }
+
@Override
public void open(Map<String, Object> conf, TopologyContext context,
SpoutOutputCollector collector) {
@@ -84,26 +104,6 @@ public class FileReadSpout extends BaseRichSpout {
declarer.declare(new Fields(FIELDS));
}
- public static List<String> readLines(InputStream input) {
- List<String> lines = new ArrayList<>();
- try {
- BufferedReader reader = new BufferedReader(new InputStreamReader(input));
- try {
- String line;
- while ((line = reader.readLine()) != null) {
- lines.add(line);
- }
- } catch (IOException e) {
- throw new RuntimeException("Reading file failed", e);
- } finally {
- reader.close();
- }
- } catch (IOException e) {
- throw new RuntimeException("Error closing reader", e);
- }
- return lines;
- }
-
public static class FileReader implements Serializable {
private static final long serialVersionUID = -7012334600647556267L;
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/StringGenSpout.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/StringGenSpout.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/StringGenSpout.java
index 6adb2e3..530f7b6 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/StringGenSpout.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/StringGenSpout.java
@@ -18,6 +18,10 @@
package org.apache.storm.perf.spout;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.storm.spout.SpoutOutputCollector;
@@ -26,11 +30,6 @@ import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
/** Spout pre-computes a list with 30k fixed length random strings.
* Emits sequentially from this list, over and over again.
*/
@@ -43,8 +42,8 @@ public class StringGenSpout extends BaseRichSpout {
private String fieldName = DEFAULT_FIELD_NAME;
private SpoutOutputCollector collector = null;
ArrayList<String> records;
- private int curr=0;
- private int count=0;
+ private int curr = 0;
+ private int count = 0;
public StringGenSpout(int strLen) {
this.strLen = strLen;
@@ -57,7 +56,7 @@ public class StringGenSpout extends BaseRichSpout {
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare( new Fields(fieldName) );
+ declarer.declare(new Fields(fieldName));
}
@Override
@@ -78,7 +77,7 @@ public class StringGenSpout extends BaseRichSpout {
@Override
public void nextTuple() {
List<Object> tuple;
- if( curr < strCount ) {
+ if(curr < strCount) {
tuple = Collections.singletonList((Object) records.get(curr));
++curr;
collector.emit(tuple, ++count);
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/WordGenSpout.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/WordGenSpout.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/WordGenSpout.java
new file mode 100644
index 0000000..92a2f53
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/WordGenSpout.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf.spout;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Map;
+
+import org.apache.storm.perf.utils.Helper;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.perf.ThroughputMeter;
+
+public class WordGenSpout extends BaseRichSpout {
+ public static final String FIELDS = "word";
+ private static final long serialVersionUID = -2582705611472467172L;
+ private String file;
+ private boolean ackEnabled = true;
+ private SpoutOutputCollector collector;
+
+ private long count = 0;
+ private int index = 0;
+ private ThroughputMeter emitMeter;
+ private ArrayList<String> words;
+
+
+ public WordGenSpout(String file) {
+ this.file = file;
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ this.collector = collector;
+ Integer ackers = Helper.getInt(conf, "topology.acker.executors", 0);
+ if (ackers.equals(0)) {
+ this.ackEnabled = false;
+ }
+ // for tests, reader will not be null
+ words = readWords(file);
+ emitMeter = new ThroughputMeter("WordGenSpout emits");
+ }
+
+ @Override
+ public void nextTuple() {
+ index = (index < words.size()-1) ? index+1 : 0;
+ String word = words.get(index);
+ if (ackEnabled) {
+ collector.emit(new Values(word), count);
+ count++;
+ } else {
+ collector.emit(new Values(word));
+ }
+ emitMeter.record();
+
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields(FIELDS));
+ }
+
+ // reads text file and extracts words from each line. returns list of all (non-unique) words
+ public static ArrayList<String> readWords(String file) {
+ ArrayList<String> lines = new ArrayList<>();
+ try {
+ FileInputStream input = new FileInputStream(file);
+ BufferedReader reader = new BufferedReader(new InputStreamReader(input));
+ try {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ for (String word : line.split("\\s+"))
+ lines.add(word);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Reading file failed", e);
+ } finally {
+ reader.close();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Error closing reader", e);
+ }
+ return lines;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java
index 97c1aa9..7cfd354 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java
@@ -18,27 +18,20 @@
package org.apache.storm.perf.utils;
-import org.apache.storm.generated.Nimbus;
-import org.apache.storm.utils.Utils;
-import org.apache.log4j.Logger;
-
import java.io.PrintWriter;
-import java.util.*;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.log4j.Logger;
+import org.apache.storm.generated.Nimbus;
+import org.apache.storm.utils.Utils;
public class BasicMetricsCollector implements AutoCloseable {
- private PrintWriter dataWriter;
- private long startTime=0;
-
- public enum MetricsItem {
- TOPOLOGY_STATS,
- XSFER_RATE,
- SPOUT_THROUGHPUT,
- SPOUT_LATENCY,
- ALL
- }
-
-
/* headers */
public static final String TIME = "elapsed (sec)";
public static final String TIME_FORMAT = "%d";
@@ -54,29 +47,27 @@ public class BasicMetricsCollector implements AutoCloseable {
public static final String SPOUT_ACKED = "spout_acks";
public static final String SPOUT_THROUGHPUT = "spout_throughput (acks/s)";
public static final String SPOUT_AVG_COMPLETE_LATENCY = "spout_avg_complete_latency(ms)";
- public static final String SPOUT_AVG_LATENCY_FORMAT = "%.1f";
+ public static final String SPOUT_AVG_LATENCY_FORMAT = "%.3f";
public static final String SPOUT_MAX_COMPLETE_LATENCY = "spout_max_complete_latency(ms)";
- public static final String SPOUT_MAX_LATENCY_FORMAT = "%.1f";
+ public static final String SPOUT_MAX_LATENCY_FORMAT = "%.3f";
private static final Logger LOG = Logger.getLogger(BasicMetricsCollector.class);
final MetricsCollectorConfig config;
// final StormTopology topology;
final Set<String> header = new LinkedHashSet<String>();
final Map<String, String> metrics = new HashMap<String, String>();
- int lineNumber = 0;
-
final boolean collectTopologyStats;
final boolean collectExecutorStats;
final boolean collectThroughput;
-
final boolean collectSpoutThroughput;
final boolean collectSpoutLatency;
-
+ int lineNumber = 0;
+ boolean first = true;
+ private PrintWriter dataWriter;
+ private long startTime = 0;
private MetricsSample lastSample;
private MetricsSample curSample;
private double maxLatency = 0;
- boolean first = true;
-
public BasicMetricsCollector(String topoName, Map<String, Object> topoConfig) {
Set<MetricsItem> items = getMetricsToCollect();
this.config = new MetricsCollectorConfig(topoName, topoConfig);
@@ -88,7 +79,7 @@ public class BasicMetricsCollector implements AutoCloseable {
dataWriter = new PrintWriter(System.err);
}
- private Set<MetricsItem> getMetricsToCollect() {
+ private Set<MetricsItem> getMetricsToCollect() {
Set<MetricsItem> result = new HashSet<>();
result.add(MetricsItem.ALL);
return result;
@@ -119,7 +110,7 @@ public class BasicMetricsCollector implements AutoCloseable {
}
boolean updateStats(PrintWriter writer)
- throws Exception {
+ throws Exception {
if (collectTopologyStats) {
updateTopologyStats();
}
@@ -169,14 +160,13 @@ public class BasicMetricsCollector implements AutoCloseable {
this.maxLatency = latency;
}
metrics.put(SPOUT_AVG_COMPLETE_LATENCY,
- String.format(SPOUT_AVG_LATENCY_FORMAT, latency));
+ String.format(SPOUT_AVG_LATENCY_FORMAT, latency));
metrics.put(SPOUT_MAX_COMPLETE_LATENCY,
- String.format(SPOUT_MAX_LATENCY_FORMAT, this.maxLatency));
+ String.format(SPOUT_MAX_LATENCY_FORMAT, this.maxLatency));
}
}
-
void writeHeader(PrintWriter writer) {
header.add(TIME);
if (collectTopologyStats) {
@@ -219,34 +209,39 @@ public class BasicMetricsCollector implements AutoCloseable {
writer.flush();
}
-
boolean collectTopologyStats(Set<MetricsItem> items) {
- return items.contains(MetricsItem.ALL) ||
- items.contains(MetricsItem.TOPOLOGY_STATS);
+ return items.contains(MetricsItem.ALL)
+ || items.contains(MetricsItem.TOPOLOGY_STATS);
}
boolean collectExecutorStats(Set<MetricsItem> items) {
- return items.contains(MetricsItem.ALL) ||
- items.contains(MetricsItem.XSFER_RATE) ||
- items.contains(MetricsItem.SPOUT_LATENCY);
+ return items.contains(MetricsItem.ALL)
+ || items.contains(MetricsItem.XSFER_RATE)
+ || items.contains(MetricsItem.SPOUT_LATENCY);
}
boolean collectThroughput(Set<MetricsItem> items) {
- return items.contains(MetricsItem.ALL) ||
- items.contains(MetricsItem.XSFER_RATE);
+ return items.contains(MetricsItem.ALL)
+ || items.contains(MetricsItem.XSFER_RATE);
}
boolean collectSpoutThroughput(Set<MetricsItem> items) {
- return items.contains(MetricsItem.ALL) ||
- items.contains(MetricsItem.SPOUT_THROUGHPUT);
+ return items.contains(MetricsItem.ALL)
+ || items.contains(MetricsItem.SPOUT_THROUGHPUT);
}
boolean collectSpoutLatency(Set<MetricsItem> items) {
- return items.contains(MetricsItem.ALL) ||
- items.contains(MetricsItem.SPOUT_LATENCY);
+ return items.contains(MetricsItem.ALL)
+ || items.contains(MetricsItem.SPOUT_LATENCY);
}
-
+ public enum MetricsItem {
+ TOPOLOGY_STATS,
+ XSFER_RATE,
+ SPOUT_THROUGHPUT,
+ SPOUT_LATENCY,
+ ALL
+ }
public static class MetricsCollectorConfig {
private static final Logger LOG = Logger.getLogger(MetricsCollectorConfig.class);
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java
index e34cb6e..73bfdda 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java
@@ -75,6 +75,7 @@ public class Helper {
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
try {
+ System.out.println("Killing...");
Helper.kill(client, topoName);
System.out.println("Killed Topology");
} catch (Exception e) {
@@ -84,7 +85,8 @@ public class Helper {
});
}
- public static void runOnClusterAndPrintMetrics(int durationSec, String topoName, Map<String, Object> topoConf, StormTopology topology) throws Exception {
+ public static void runOnClusterAndPrintMetrics(int durationSec, String topoName, Map<String, Object> topoConf, StormTopology topology)
+ throws Exception {
// submit topology
StormSubmitter.submitTopologyWithProgressBar(topoName, topoConf, topology);
setupShutdownHook(topoName); // handle Ctrl-C
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/IdentityBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/IdentityBolt.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/IdentityBolt.java
index 54f8ee1..f950203 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/IdentityBolt.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/IdentityBolt.java
@@ -18,16 +18,13 @@
package org.apache.storm.perf.utils;
+import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-
-import java.util.Map;
-
public class IdentityBolt extends BaseRichBolt {
private OutputCollector collector;
@@ -45,7 +42,6 @@ public class IdentityBolt extends BaseRichBolt {
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java
index f1177b6..f462bc7 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java
@@ -33,16 +33,16 @@ import org.apache.storm.utils.Utils;
public class MetricsSample {
- private long sampleTime = -1;
- private long totalTransferred = 0l;
- private long totalEmitted = 0l;
- private long totalAcked = 0l;
- private long totalFailed = 0l;
+ private long sampleTime = -1L;
+ private long totalTransferred = 0L;
+ private long totalEmitted = 0L;
+ private long totalAcked = 0L;
+ private long totalFailed = 0L;
private double totalLatency;
- private long spoutEmitted = 0l;
- private long spoutTransferred = 0l;
+ private long spoutEmitted = 0L;
+ private long spoutTransferred = 0L;
private int spoutExecutors = 0;
private int numSupervisors = 0;
@@ -64,7 +64,7 @@ public class MetricsSample {
int topologyTasks = topSummary.get_num_tasks();
TopologyInfo topInfo = client.getTopologyInfo(topSummary.get_id());
- MetricsSample sample = getMetricsSample( topInfo);
+ MetricsSample sample = getMetricsSample(topInfo);
sample.numWorkers = topologyWorkers;
sample.numExecutors = topologyExecutors;
sample.numTasks = topologyTasks;
@@ -75,62 +75,62 @@ public class MetricsSample {
List<ExecutorSummary> executorSummaries = topInfo.get_executors();
// totals
- long totalTransferred = 0l;
- long totalEmitted = 0l;
- long totalAcked = 0l;
- long totalFailed = 0l;
+ long totalTransferred = 0L;
+ long totalEmitted = 0L;
+ long totalAcked = 0L;
+ long totalFailed = 0L;
// number of spout executors
int spoutExecCount = 0;
double spoutLatencySum = 0.0;
- long spoutEmitted = 0l;
- long spoutTransferred = 0l;
+ long spoutEmitted = 0L;
+ long spoutTransferred = 0L;
// Executor summaries
- for(ExecutorSummary executorSummary : executorSummaries){
+ for (ExecutorSummary executorSummary : executorSummaries) {
ExecutorStats executorStats = executorSummary.get_stats();
- if(executorStats == null){
+ if (executorStats == null) {
continue;
}
ExecutorSpecificStats executorSpecificStats = executorStats.get_specific();
- if(executorSpecificStats == null){
+ if (executorSpecificStats == null) {
// bail out
continue;
}
// transferred totals
- Map<String,Map<String,Long>> transferred = executorStats.get_transferred();
+ Map<String, Map<String, Long>> transferred = executorStats.get_transferred();
Map<String, Long> txMap = transferred.get(":all-time");
- if(txMap == null){
+ if (txMap == null) {
continue;
}
- for(String key : txMap.keySet()){
+ for (String key : txMap.keySet()) {
// todo, ignore the master batch coordinator ?
- if(!Utils.isSystemId(key)){
+ if (!Utils.isSystemId(key)) {
Long count = txMap.get(key);
totalTransferred += count;
- if(executorSpecificStats.is_set_spout()){
+ if (executorSpecificStats.is_set_spout()) {
spoutTransferred += count;
}
}
}
// we found a spout
- if(executorSpecificStats.isSet(2)) { // spout
+ if (executorSpecificStats.isSet(2)) { // spout
SpoutStats spoutStats = executorSpecificStats.get_spout();
Map<String, Long> acked = spoutStats.get_acked().get(":all-time");
- if(acked != null){
- for(String key : acked.keySet()) {
+ if (acked != null) {
+ for (String key : acked.keySet()) {
totalAcked += acked.get(key);
}
}
Map<String, Long> failed = spoutStats.get_failed().get(":all-time");
- if(failed != null){
- for(String key : failed.keySet()) {
+ if (failed != null) {
+ for (String key : failed.keySet()) {
totalFailed += failed.get(key);
}
}
@@ -154,9 +154,9 @@ public class MetricsSample {
MetricsSample ret = new MetricsSample();
ret.totalEmitted = totalEmitted;
ret.totalTransferred = totalTransferred;
- ret.totalAcked = totalAcked;
+ ret.totalAcked = totalAcked;
ret.totalFailed = totalFailed;
- ret.totalLatency = spoutLatencySum/spoutExecCount;
+ ret.totalLatency = spoutLatencySum / spoutExecCount;
ret.spoutEmitted = spoutEmitted;
ret.spoutTransferred = spoutTransferred;
ret.sampleTime = System.currentTimeMillis();
@@ -178,7 +178,6 @@ public class MetricsSample {
}
-
// getters
public long getSampleTime() {
return sampleTime;
@@ -228,7 +227,7 @@ public class MetricsSample {
return totalSlots;
}
- public int getSpoutExecutors(){
+ public int getSpoutExecutors() {
return this.spoutExecutors;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
index 7d8c51f..6712768 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
@@ -69,7 +69,7 @@ public class EsTestUtil {
return new Fields("source", "index", "type", "id");
}
};
- return new TupleImpl(topologyContext, new Values(source, index, type, id), 1, "");
+ return new TupleImpl(topologyContext, new Values(source, index, type, id), source, 1, "");
}
public static TridentTuple generateTestTridentTuple(String source, String index, String type, String id) {
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
index 88bafd2..ac724e8 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
@@ -56,6 +56,11 @@ public class SpoutOutputCollectorMock implements ISpoutOutputCollector {
}
@Override
+ public void flush() {
+ // NO-OP
+ }
+
+ @Override
public void reportError(Throwable arg0) {
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java
index 3fe5de0..c42f769 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java
@@ -39,6 +39,11 @@ public class TridentCollectorMock implements TridentCollector {
}
@Override
+ public void flush() {
+ // NO-OP
+ }
+
+ @Override
public void reportError(Throwable arg0) {
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java
index 0f6bd28..cac55c9 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java
@@ -229,7 +229,7 @@ public class AvroGenericRecordBoltTest {
return new Fields("record");
}
};
- return new TupleImpl(topologyContext, new Values(record), 1, "");
+ return new TupleImpl(topologyContext, new Values(record), topologyContext.getComponentId(1), 1, "");
}
private void verifyAllAvroFiles(String path) throws IOException {
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java
index de7316f..b872029 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java
@@ -251,7 +251,7 @@ public class TestHdfsBolt {
return new Fields("id", "msg","city","state");
}
};
- return new TupleImpl(topologyContext, new Values(id, msg,city,state), 1, "");
+ return new TupleImpl(topologyContext, new Values(id, msg,city,state), topologyContext.getComponentId(1), 1, "");
}
// Generally used to compare how files were actually written and compare to expectations based on total
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java
index f297ba9..71f5d4e 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java
@@ -167,7 +167,7 @@ public class TestSequenceFileBolt {
return new Fields("key", "value");
}
};
- return new TupleImpl(topologyContext, new Values(key, value), 1, "");
+ return new TupleImpl(topologyContext, new Values(key, value), topologyContext.getComponentId(1), 1, "");
}
// Generally used to compare how files were actually written and compare to expectations based on total
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/format/TestSimpleFileNameFormat.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/format/TestSimpleFileNameFormat.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/format/TestSimpleFileNameFormat.java
index 051e4b7..64f5dd8 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/format/TestSimpleFileNameFormat.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/format/TestSimpleFileNameFormat.java
@@ -31,8 +31,9 @@ public class TestSimpleFileNameFormat {
@Test
public void testDefaults() {
+ Map<String, Object> topoConf = new HashMap();
SimpleFileNameFormat format = new SimpleFileNameFormat();
- format.prepare(null, createTopologyContext());
+ format.prepare(null, createTopologyContext(topoConf));
long now = System.currentTimeMillis();
String path = format.getPath();
String name = format.getName(1, now);
@@ -48,7 +49,8 @@ public class TestSimpleFileNameFormat {
.withName("$TIME.$HOST.$COMPONENT.$TASK.$NUM.txt")
.withPath("/mypath")
.withTimeFormat("yyyy-MM-dd HH:mm:ss");
- format.prepare(null, createTopologyContext());
+ Map<String, Object> topoConf = new HashMap();
+ format.prepare(null, createTopologyContext(topoConf));
long now = System.currentTimeMillis();
String path = format.getPath();
String name = format.getName(1, now);
@@ -66,14 +68,15 @@ public class TestSimpleFileNameFormat {
@Test(expected=IllegalArgumentException.class)
public void testTimeFormat() {
+ Map<String, Object> topoConf = new HashMap();
SimpleFileNameFormat format = new SimpleFileNameFormat()
.withTimeFormat("xyz");
- format.prepare(null, createTopologyContext());
+ format.prepare(null, createTopologyContext(topoConf));
}
- private TopologyContext createTopologyContext(){
+ private TopologyContext createTopologyContext(Map<String, Object> topoConf){
Map<Integer, String> taskToComponent = new HashMap<Integer, String>();
taskToComponent.put(7, "Xcom");
- return new TopologyContext(null, null, taskToComponent, null, null, null, null, null, null, 7, 6703, null, null, null, null, null, null);
+ return new TopologyContext(null, topoConf, taskToComponent, null, null, null, null, null, null, 7, 6703, null, null, null, null, null, null);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
index 6e2b140..b3909f3 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
@@ -589,9 +589,9 @@ public class TestHdfsSpout {
}
private Map getCommonConfigs() {
- Map<String, Object> conf = new HashMap();
- conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, "0");
- return conf;
+ Map<String, Object> topoConf = new HashMap();
+ topoConf.put(Config.TOPOLOGY_ACKER_EXECUTORS, "0");
+ return topoConf;
}
private AutoCloseableHdfsSpout makeSpout(String readerType, String[] outputFields) {
@@ -619,9 +619,9 @@ public class TestHdfsSpout {
}
}
- private void openSpout(HdfsSpout spout, int spoutId, Map<String, Object> conf) {
+ private void openSpout(HdfsSpout spout, int spoutId, Map<String, Object> topoConf) {
MockCollector collector = new MockCollector();
- spout.open(conf, new MockTopologyContext(spoutId), collector);
+ spout.open(topoConf, new MockTopologyContext(spoutId, topoConf), collector);
}
/**
@@ -698,13 +698,18 @@ public class TestHdfsSpout {
}
@Override
- public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
+ public List<Integer> emit(List<Object> tuple, Object messageId) {
lines.add(tuple.toString());
items.add(HdfsUtils.Pair.of(messageId, tuple));
return null;
}
@Override
+ public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
+ return emit(tuple, messageId);
+ }
+
+ @Override
public void emitDirect(int arg0, String arg1, List<Object> arg2, Object arg3) {
throw new UnsupportedOperationException("NOT Implemented");
}
@@ -747,9 +752,9 @@ public class TestHdfsSpout {
private final int componentId;
- public MockTopologyContext(int componentId) {
+ public MockTopologyContext(int componentId, Map<String, Object> topoConf) {
// StormTopology topology, Map<String, Object> topoConf, Map<Integer, String> taskToComponent, Map<String, List<Integer>> componentToSortedTasks, Map<String, Map<String, Fields>> componentToStreamToFields, String stormId, String codeDir, String pidDir, Integer taskId, Integer workerPort, List<Integer> workerTasks, Map<String, Object> defaultResources, Map<String, Object> userResources, Map<String, Object> executorData, Map<Integer, Map<Integer, Map<String, IMetric>>> registeredMetrics, Atom openOrPrepareWasCalled
- super(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null);
+ super(null, topoConf, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null);
this.componentId = componentId;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
index f7cd046..dfa62ee 100644
--- a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
@@ -464,7 +464,7 @@ public class TestHiveBolt {
return new Fields("id", "msg","city","state");
}
};
- return new TupleImpl(topologyContext, new Values(id, msg,city,state), 1, "");
+ return new TupleImpl(topologyContext, new Values(id, msg,city,state), topologyContext.getComponentId(1), 1, "");
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java b/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java
index a53033f..345be58 100644
--- a/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java
@@ -168,7 +168,7 @@ public class TestHiveWriter {
return new Fields("id", "msg");
}
};
- return new TupleImpl(topologyContext, new Values(id, msg), 1, "");
+ return new TupleImpl(topologyContext, new Values(id, msg), topologyContext.getComponentId(1), 1, "");
}
private void writeTuples(HiveWriter writer, HiveMapper mapper, int count)
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java
index a5a6c51..4e05646 100644
--- a/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java
+++ b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java
@@ -37,6 +37,11 @@ public class MockSpoutOutputCollector implements ISpoutOutputCollector {
}
@Override
+ public void flush() {
+ //NO-OP
+ }
+
+ @Override
public void reportError(Throwable error) {
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index abd6774..39bdb93 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -440,7 +440,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
} else {
final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
if (isAtLeastOnceProcessing()
- && committedOffset != null
+ && committedOffset != null
&& committedOffset.offset() > record.offset()
&& commitMetadataManager.isOffsetCommittedByThisTopology(tp,
committedOffset,
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/external/storm-kafka/src/test/org/apache/storm/kafka/PartitionManagerTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/PartitionManagerTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/PartitionManagerTest.java
index 888ecde..805913d 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/PartitionManagerTest.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/PartitionManagerTest.java
@@ -238,6 +238,10 @@ public class PartitionManagerTest {
public long getPendingCount() {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public void flush() {
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java
index 5f3753d..ed26157 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java
@@ -296,7 +296,7 @@ public class KafkaBoltTest {
return new Fields("key", "message");
}
};
- return new TupleImpl(topologyContext, new Values(key, message), 1, "");
+ return new TupleImpl(topologyContext, new Values(key, message), topologyContext.getComponentId(1), 1, "");
}
private Tuple generateTestTuple(Object message) {
@@ -307,7 +307,7 @@ public class KafkaBoltTest {
return new Fields("message");
}
};
- return new TupleImpl(topologyContext, new Values(message), 1, "");
+ return new TupleImpl(topologyContext, new Values(message), topologyContext.getComponentId(1), 1, "");
}
private Tuple mockTickTuple() {
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index cb89376..2260d66 100644
--- a/pom.xml
+++ b/pom.xml
@@ -259,6 +259,7 @@
<snakeyaml.version>1.11</snakeyaml.version>
<httpclient.version>4.3.3</httpclient.version>
<clojure.tools.cli.version>0.2.4</clojure.tools.cli.version>
+ <jctools.version>2.0.1</jctools.version>
<disruptor.version>3.3.2</disruptor.version>
<jgrapht.version>0.9.0</jgrapht.version>
<guava.version>16.0.1</guava.version>
@@ -876,6 +877,11 @@
<version>${clojure.tools.cli.version}</version>
</dependency>
<dependency>
+ <groupId>org.jctools</groupId>
+ <artifactId>jctools-core</artifactId>
+ <version>${jctools.version}</version>
+ </dependency>
+ <dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>${disruptor.version}</version>
@@ -1094,7 +1100,7 @@
<includes>
<include>${java.unit.test.include}</include>
</includes>
- <argLine>-Xmx1536m</argLine>
+ <argLine>-Xmx3g</argLine>
<trimStackTrace>false</trimStackTrace>
<forkCount>1.0C</forkCount>
<reuseForks>true</reuseForks>
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/pom.xml
----------------------------------------------------------------------
diff --git a/storm-client/pom.xml b/storm-client/pom.xml
index e0c9609..2e073a8 100644
--- a/storm-client/pom.xml
+++ b/storm-client/pom.xml
@@ -110,10 +110,10 @@
<artifactId>commons-collections</artifactId>
</dependency>
- <!-- disruptor -->
+ <!-- jctools -->
<dependency>
- <groupId>com.lmax</groupId>
- <artifactId>disruptor</artifactId>
+ <groupId>org.jctools</groupId>
+ <artifactId>jctools-core</artifactId>
</dependency>
<!-- JAXB -->
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index 5e9ea48..84825cf 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -15,10 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm;
import java.util.Arrays;
import org.apache.storm.metric.IEventLogger;
+import org.apache.storm.policy.IWaitStrategy;
import org.apache.storm.serialization.IKryoDecorator;
import org.apache.storm.serialization.IKryoFactory;
import org.apache.storm.validation.ConfigValidation;
@@ -120,53 +122,14 @@ public class Config extends HashMap<String, Object> {
public static final String TASK_CREDENTIALS_POLL_SECS = "task.credentials.poll.secs";
/**
- * How often to poll for changed topology backpressure flag from ZK
- */
- @isInteger
- @isPositiveNumber
- public static final String TASK_BACKPRESSURE_POLL_SECS = "task.backpressure.poll.secs";
-
- /**
- * Whether to enable backpressure in for a certain topology
+ * Whether to enable backpressure in for a certain topology.
+ * @deprecated: In Storm 2.0. Retained for enabling transition from 1.x. Will be removed soon.
*/
+ @Deprecated
@isBoolean
public static final String TOPOLOGY_BACKPRESSURE_ENABLE = "topology.backpressure.enable";
/**
- * This signifies the tuple congestion in a disruptor queue.
- * When the used ratio of a disruptor queue is higher than the high watermark,
- * the backpressure scheme, if enabled, should slow down the tuple sending speed of
- * the spouts until reaching the low watermark.
- */
- @isPositiveNumber
- public static final String BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK="backpressure.disruptor.high.watermark";
-
- /**
- * This signifies a state that a disruptor queue has left the congestion.
- * If the used ratio of a disruptor queue is lower than the low watermark,
- * it will unset the backpressure flag.
- */
- @isPositiveNumber
- public static final String BACKPRESSURE_DISRUPTOR_LOW_WATERMARK="backpressure.disruptor.low.watermark";
-
- /**
- * How long until the backpressure znode is invalid.
- * It's measured by the data (timestamp) of the znode, not the ctime (creation time) or mtime (modification time), etc.
- * This must be larger than BACKPRESSURE_ZNODE_UPDATE_FREQ_SECS.
- */
- @isInteger
- @isPositiveNumber
- public static final String BACKPRESSURE_ZNODE_TIMEOUT_SECS = "backpressure.znode.timeout.secs";
-
- /**
- * How often will the data (timestamp) of backpressure znode be updated.
- * But if the worker backpressure status (on/off) changes, the znode will be updated anyway.
- */
- @isInteger
- @isPositiveNumber
- public static final String BACKPRESSURE_ZNODE_UPDATE_FREQ_SECS = "backpressure.znode.update.freq.secs";
-
- /**
* A list of users that are allowed to interact with the topology. To use this set
* nimbus.authorizer to org.apache.storm.security.auth.authorizer.SimpleACLAuthorizer
*/
@@ -630,16 +593,6 @@ public class Config extends HashMap<String, Object> {
public static final String TOPOLOGY_ENVIRONMENT="topology.environment";
/*
- * Topology-specific option to disable/enable bolt's outgoing overflow buffer.
- * Enabling this option ensures that the bolt can always clear the incoming messages,
- * preventing live-lock for the topology with cyclic flow.
- * The overflow buffer can fill degrading the performance gradually,
- * eventually running out of memory.
- */
- @isBoolean
- public static final String TOPOLOGY_BOLTS_OUTGOING_OVERFLOW_BUFFER_ENABLE="topology.bolts.outgoing.overflow.buffer.enable";
-
- /*
* Bolt-specific configuration for windowed bolts to specify the window length as a count of number of tuples
* in the window.
*/
@@ -719,23 +672,25 @@ public class Config extends HashMap<String, Object> {
public static final String TOPOLOGY_AUTO_TASK_HOOKS="topology.auto.task.hooks";
/**
- * The size of the Disruptor receive queue for each executor. Must be a power of 2.
+ * The size of the receive queue for each executor.
*/
- @isPowerOf2
+ @isPositiveNumber
+ @isInteger
public static final String TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE="topology.executor.receive.buffer.size";
/**
- * The size of the Disruptor send queue for each executor. Must be a power of 2.
+ * The size of the transfer queue for each worker.
*/
- @isPowerOf2
- public static final String TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE="topology.executor.send.buffer.size";
+ @isPositiveNumber
+ @isInteger
+ public static final String TOPOLOGY_TRANSFER_BUFFER_SIZE="topology.transfer.buffer.size";
/**
- * The size of the Disruptor transfer queue for each worker.
+ * The size of the transfer queue for each worker.
*/
+ @isPositiveNumber
@isInteger
- @isPowerOf2
- public static final String TOPOLOGY_TRANSFER_BUFFER_SIZE="topology.transfer.buffer.size";
+ public static final String TOPOLOGY_TRANSFER_BATCH_SIZE="topology.transfer.batch.size";
/**
* How often a tick tuple from the "__system" component and "__tick" stream should be sent
@@ -745,13 +700,40 @@ public class Config extends HashMap<String, Object> {
public static final String TOPOLOGY_TICK_TUPLE_FREQ_SECS="topology.tick.tuple.freq.secs";
/**
- * @deprecated this is no longer supported
- * Configure the wait strategy used for internal queuing. Can be used to tradeoff latency
- * vs. throughput
+ * The number of tuples to batch before sending to the destination executor.
*/
- @Deprecated
- @isString
- public static final String TOPOLOGY_DISRUPTOR_WAIT_STRATEGY="topology.disruptor.wait.strategy";
+ @isInteger
+ @isPositiveNumber
+ @NotNull
+ public static final String TOPOLOGY_PRODUCER_BATCH_SIZE="topology.producer.batch.size";
+
+ /**
+ * If number of items in task's overflowQ exceeds this, new messages coming from other workers to this task will be dropped
+ * This prevents OutOfMemoryException that can occur in rare scenarios in the presence of BackPressure. This affects
+ * only inter-worker messages. Messages originating from within the same worker will not be dropped.
+ */
+ @isInteger
+ @isPositiveNumber(includeZero = true)
+ @NotNull
+ public static final String TOPOLOGY_EXECUTOR_OVERFLOW_LIMIT="topology.executor.overflow.limit";
+
+ /**
+ * How often a worker should check and notify upstream workers about its tasks that are no longer experiencing BP
+ * and able to receive new messages
+ */
+ @isInteger
+ @isPositiveNumber
+ @NotNull
+ public static final String TOPOLOGY_BACKPRESSURE_CHECK_MILLIS ="topology.backpressure.check.millis";
+
+ /**
+ * How often to send flush tuple to the executors for flushing out batched events.
+ */
+ @isInteger
+ @isPositiveNumber(includeZero = true)
+ @NotNull
+ public static final String TOPOLOGY_BATCH_FLUSH_INTERVAL_MILLIS ="topology.batch.flush.interval.millis";
+
/**
* The size of the shared thread pool for worker tasks to make use of. The thread pool can be accessed
@@ -890,30 +872,93 @@ public class Config extends HashMap<String, Object> {
public static final String TOPOLOGY_ISOLATED_MACHINES = "topology.isolate.machines";
/**
- * Configure timeout milliseconds used for disruptor queue wait strategy. Can be used to tradeoff latency
- * vs. CPU usage
+ * Selects the Bolt's Wait Strategy to use when there are no incoming msgs. Used to trade off latency vs CPU usage.
+ */
+ @isString
+ @isDerivedFrom(baseType = IWaitStrategy.class)
+ public static final String TOPOLOGY_BOLT_WAIT_STRATEGY = "topology.bolt.wait.strategy";
+
+ /**
+ * Configures park time for WaitStrategyPark. If set to 0, returns immediately (i.e busy wait).
*/
- @isInteger
@NotNull
- public static final String TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS="topology.disruptor.wait.timeout.millis";
+ @isPositiveNumber(includeZero = true)
+ public static final String TOPOLOGY_BOLT_WAIT_PARK_MICROSEC = "topology.bolt.wait.park.microsec";
/**
- * The number of tuples to batch before sending to the next thread. This number is just an initial suggestion and
- * the code may adjust it as your topology runs.
+ * Configures number of iterations to spend in level 1 of WaitStrategyProgressive, before progressing to level 2
*/
+ @NotNull
@isInteger
@isPositiveNumber
+ public static final String TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL1_COUNT = "topology.bolt.wait.progressive.level1.count";
+
+ /**
+ * Configures number of iterations to spend in level 2 of WaitStrategyProgressive, before progressing to level 3
+ */
@NotNull
- public static final String TOPOLOGY_DISRUPTOR_BATCH_SIZE="topology.disruptor.batch.size";
+ @isInteger
+ @isPositiveNumber
+ public static final String TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL2_COUNT = "topology.bolt.wait.progressive.level2.count";
/**
- * The maximum age in milliseconds a batch can be before being sent to the next thread. This number is just an
- * initial suggestion and the code may adjust it as your topology runs.
+ * Configures sleep time for WaitStrategyProgressive.
*/
+ @NotNull
+ @isPositiveNumber(includeZero = true)
+ public static final String TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL3_SLEEP_MILLIS = "topology.bolt.wait.progressive.level3.sleep.millis";
+
+
+ /**
+ * A class that implements a wait strategy for an upstream component (spout/bolt) trying to write to a downstream component
+ * whose recv queue is full
+ *
+ * 1. nextTuple emits no tuples
+ * 2. The spout has hit maxSpoutPending and can't emit any more tuples
+ */
+ @isString
+ @isDerivedFrom(baseType = IWaitStrategy.class)
+ public static final String TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY="topology.backpressure.wait.strategy";
+
+ /**
+ * Configures park time if using WaitStrategyPark for BackPressure. If set to 0, returns immediately (i.e busy wait).
+ */
+ @NotNull
+ @isPositiveNumber(includeZero = true)
+ public static final String TOPOLOGY_BACKPRESSURE_WAIT_PARK_MICROSEC = "topology.backpressure.wait.park.microsec";
+
+ /**
+ * Configures sleep time if using WaitStrategyProgressive for BackPressure.
+ */
+ @NotNull
+ @isPositiveNumber(includeZero = true)
+ public static final String TOPOLOGY_BACKPRESSURE_WAIT_PROGRESSIVE_LEVEL3_SLEEP_MILLIS = "topology.backpressure.wait.progressive.level3.sleep.millis";
+
+ /**
+ * Configures steps used to determine progression to the next level of wait .. if using WaitStrategyProgressive for BackPressure.
+ */
+ @NotNull
@isInteger
@isPositiveNumber
+ public static final String TOPOLOGY_BACKPRESSURE_WAIT_PROGRESSIVE_LEVEL1_COUNT = "topology.backpressure.wait.progressive.level1.count";
+
+ /**
+ * Configures steps used to determine progression to the next level of wait .. if using WaitStrategyProgressive for BackPressure.
+ */
+ @NotNull
+ @isInteger
+ @isPositiveNumber
+ public static final String TOPOLOGY_BACKPRESSURE_WAIT_PROGRESSIVE_LEVEL2_COUNT = "topology.backpressure.wait.progressive.level2.count";
+
+
+ /**
+ * Check recvQ after every N invocations of Spout's nextTuple() [when ACKing is disabled].
+ * Spouts receive very few msgs if ACK is disabled. This avoids checking the recvQ after each nextTuple().
+ */
+ @isInteger
+ @isPositiveNumber(includeZero = true)
@NotNull
- public static final String TOPOLOGY_DISRUPTOR_BATCH_TIMEOUT_MILLIS="topology.disruptor.batch.timeout.millis";
+ public static final String TOPOLOGY_SPOUT_RECVQ_SKIPS = "topology.spout.recvq.skips";
/**
* Minimum number of nimbus hosts where the code must be replicated before leader nimbus
@@ -933,17 +978,6 @@ public class Config extends HashMap<String, Object> {
public static final String TOPOLOGY_MAX_REPLICATION_WAIT_TIME_SEC = "topology.max.replication.wait.time.sec";
/**
- * This is a config that is not likely to be used. Internally the disruptor queue will batch entries written
- * into the queue. A background thread pool will flush those batches if they get too old. By default that
- * pool can grow rather large, and sacrifice some CPU time to keep the latency low. In some cases you may
- * want the queue to be smaller so there is less CPU used, but the latency will increase in some situations.
- * This configs is on a per cluster bases, if you want to control this on a per topology bases you need to set
- * the java System property for the worker "num_flusher_pool_threads" to the value you want.
- */
- @isInteger
- public static final String STORM_WORKER_DISRUPTOR_FLUSHER_MAX_POOL_SIZE = "storm.worker.disruptor.flusher.max.pool.size";
-
- /**
* The list of servers that Pacemaker is running on.
*/
@isStringList
@@ -1382,6 +1416,28 @@ public class Config extends HashMap<String, Object> {
public static final String STORM_MESSAGING_NETTY_BUFFER_SIZE = "storm.messaging.netty.buffer_size";
/**
+ * Netty based messaging: The netty write buffer high watermark in bytes.
+ * <p>
+ * If the number of bytes queued in the netty's write buffer exceeds this value, the netty {@code Channel.isWritable()}
+ * will start to return {@code false}. The client will wait until the value falls below the {@linkplain #STORM_MESSAGING_NETTY_BUFFER_LOW_WATERMARK low water mark}.
+ * </p>
+ */
+ @isInteger
+ @isPositiveNumber
+ public static final String STORM_MESSAGING_NETTY_BUFFER_HIGH_WATERMARK = "storm.messaging.netty.buffer.high.watermark";
+
+ /**
+ * Netty based messaging: The netty write buffer low watermark in bytes.
+ * <p>
+ * Once the number of bytes queued in the write buffer exceeded the {@linkplain #STORM_MESSAGING_NETTY_BUFFER_HIGH_WATERMARK high water mark} and then
+ * dropped down below this value, the netty {@code Channel.isWritable()} will start to return true.
+ * </p>
+ */
+ @isInteger
+ @isPositiveNumber
+ public static final String STORM_MESSAGING_NETTY_BUFFER_LOW_WATERMARK = "storm.messaging.netty.buffer.low.watermark";
+
+ /**
* Netty based messaging: Sets the backlog value to specify when the channel binds to a local address
*/
@isInteger
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/Constants.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/Constants.java b/storm-client/src/jvm/org/apache/storm/Constants.java
index a8e2c01..623acfb 100644
--- a/storm-client/src/jvm/org/apache/storm/Constants.java
+++ b/storm-client/src/jvm/org/apache/storm/Constants.java
@@ -33,6 +33,7 @@ public class Constants {
public static final List<Long> SYSTEM_EXECUTOR_ID = Arrays.asList(-1L, -1L);
public static final String SYSTEM_COMPONENT_ID = "__system";
public static final String SYSTEM_TICK_STREAM_ID = "__tick";
+ public static final String SYSTEM_FLUSH_STREAM_ID = "__flush";
public static final String METRICS_COMPONENT_ID_PREFIX = "__metrics_";
public static final String METRICS_STREAM_ID = "__metrics";
public static final String METRICS_TICK_STREAM_ID = "__metrics_tick";
@@ -54,7 +55,6 @@ public class Constants {
public static final String USER_TIMER = "user-timer";
public static final String TRANSFER_FN = "transfer-fn";
public static final String SUICIDE_FN = "suicide-fn";
- public static final String THROTTLE_ON = "throttle-on";
public static final String EXECUTOR_RECEIVE_QUEUE_MAP = "executor-receive-queue-map";
public static final String STORM_ACTIVE_ATOM = "storm-active-atom";
public static final String COMPONENT_TO_DEBUG_ATOM = "storm-component->debug-atom";