You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/04/06 17:33:38 UTC
[49/52] [partial] storm git commit: STORM-2441 Break down
'storm-core' to extract client (worker) artifacts
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulWindowingTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulWindowingTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulWindowingTopology.java
new file mode 100644
index 0000000..0e9b6ea
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulWindowingTopology.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.starter.bolt.PrinterBolt;
+import org.apache.storm.starter.spout.RandomIntegerSpout;
+import org.apache.storm.state.KeyValueState;
+import org.apache.storm.state.State;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseStatefulWindowedBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.windowing.TupleWindow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+import static org.apache.storm.topology.base.BaseWindowedBolt.Count;
+
+/**
+ * A simple example that demonstrates the usage of {@link org.apache.storm.topology.IStatefulWindowedBolt} to
+ * save the state of the windowing operation to avoid re-computation in case of failures.
+ * <p>
+ * The framework internally manages the window boundaries and does not invoke
+ * {@link org.apache.storm.topology.IWindowedBolt#execute(TupleWindow)} for the already evaluated windows in case of restarts
+ * during failures. The {@link org.apache.storm.topology.IStatefulBolt#initState(State)}
+ * is invoked with the previously saved state of the bolt after prepare, before the execute() method is invoked.
+ * </p>
+ */
+public class StatefulWindowingTopology {
+ private static final Logger LOG = LoggerFactory.getLogger(StatefulWindowingTopology.class);
+
+ private static class WindowSumBolt extends BaseStatefulWindowedBolt<KeyValueState<String, Long>> {
+ private KeyValueState<String, Long> state;
+ private long sum;
+
+ private OutputCollector collector;
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void initState(KeyValueState<String, Long> state) {
+ this.state = state;
+ sum = state.get("sum", 0L);
+ LOG.debug("initState with state [" + state + "] current sum [" + sum + "]");
+ }
+
+ @Override
+ public void execute(TupleWindow inputWindow) {
+ for (Tuple tuple : inputWindow.get()) {
+ sum += tuple.getIntegerByField("value");
+ }
+ state.put("sum", sum);
+ collector.emit(new Values(sum));
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("sum"));
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout("spout", new RandomIntegerSpout());
+ builder.setBolt("sumbolt", new WindowSumBolt().withWindow(new Count(5), new Count(3))
+ .withMessageIdField("msgid"), 1).shuffleGrouping("spout");
+ builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("sumbolt");
+ Config conf = new Config();
+ conf.setDebug(false);
+ //conf.put(Config.TOPOLOGY_STATE_PROVIDER, "org.apache.storm.redis.state.RedisKeyValueStateProvider");
+ if (args != null && args.length > 0) {
+ conf.setNumWorkers(1);
+ StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
+ } else {
+ try (LocalCluster cluster = new LocalCluster();
+ LocalTopology topo = cluster.submitTopology("test", conf, builder.createTopology());) {
+ Utils.sleep(40000);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java
index 9e709a1..d8137b0 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java
@@ -20,16 +20,13 @@ package org.apache.storm.starter;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
-import org.apache.storm.metric.HttpForwardingMetricsServer;
-import org.apache.storm.metric.HttpForwardingMetricsConsumer;
-import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.misc.metric.HttpForwardingMetricsServer;
import org.apache.storm.metric.api.IMetricsConsumer.TaskInfo;
import org.apache.storm.metric.api.IMetricsConsumer.DataPoint;
import org.apache.storm.generated.*;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseBasicBolt;
@@ -37,9 +34,8 @@ import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.NimbusClient;
import org.apache.storm.utils.Utils;
-import org.apache.storm.StormSubmitter;
+import org.apache.storm.utils.NimbusClient;
import java.util.Collection;
import java.util.HashMap;
@@ -397,7 +393,7 @@ public class ThroughputVsLatency {
C cluster = new C(conf);
conf.setNumWorkers(parallelism);
conf.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class);
- conf.registerMetricsConsumer(org.apache.storm.metric.HttpForwardingMetricsConsumer.class, url, 1);
+ conf.registerMetricsConsumer(org.apache.storm.misc.metric.HttpForwardingMetricsConsumer.class, url, 1);
Map<String, String> workerMetrics = new HashMap<String, String>();
if (!cluster.isLocal()) {
//sigar uses JNI and does not work in local mode
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/AggregateExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/AggregateExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/AggregateExample.java
index 7467634..cbc5d45 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/AggregateExample.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/AggregateExample.java
@@ -22,21 +22,13 @@ import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.starter.spout.RandomIntegerSpout;
import org.apache.storm.streams.Pair;
-import org.apache.storm.streams.PairStream;
-import org.apache.storm.streams.Stream;
import org.apache.storm.streams.StreamBuilder;
import org.apache.storm.streams.operations.CombinerAggregator;
-import org.apache.storm.streams.operations.mappers.TupleValueMapper;
-import org.apache.storm.streams.operations.mappers.TupleValueMappers;
import org.apache.storm.streams.operations.mappers.ValueMapper;
-import org.apache.storm.streams.tuple.Tuple3;
import org.apache.storm.streams.windowing.TumblingWindows;
import org.apache.storm.topology.base.BaseWindowedBolt;
-import org.apache.storm.trident.windowing.config.TumblingDurationWindow;
import org.apache.storm.utils.Utils;
-import static org.apache.storm.topology.base.BaseWindowedBolt.Count;
-
/**
* An example that illustrates the global aggregate
*/
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java b/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java
deleted file mode 100644
index d073350..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.starter.spout.RandomIntegerSpout;
-import org.apache.storm.state.KeyValueState;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.topology.base.BaseBasicBolt;
-import org.apache.storm.topology.base.BaseStatefulBolt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-/**
- * An example topology that demonstrates the use of {@link org.apache.storm.topology.IStatefulBolt}
- * to manage state. To run the example,
- * <pre>
- * $ storm jar examples/storm-starter/storm-starter-topologies-*.jar storm.starter.StatefulTopology statetopology
- * </pre>
- * <p/>
- * The default state used is 'InMemoryKeyValueState' which does not persist the state across restarts. You could use
- * 'RedisKeyValueState' to test state persistence by setting below property in conf/storm.yaml
- * <pre>
- * topology.state.provider: org.apache.storm.redis.state.RedisKeyValueStateProvider
- * </pre>
- * <p/>
- * You should also start a local redis instance before running the 'storm jar' command. The default
- * RedisKeyValueStateProvider parameters can be overridden in conf/storm.yaml, for e.g.
- * <p/>
- * <pre>
- * topology.state.provider.config: '{"keyClass":"...", "valueClass":"...",
- * "keySerializerClass":"...", "valueSerializerClass":"...",
- * "jedisPoolConfig":{"host":"localhost", "port":6379,
- * "timeout":2000, "database":0, "password":"xyz"}}'
- *
- * </pre>
- * </p>
- */
-public class StatefulTopology {
- private static final Logger LOG = LoggerFactory.getLogger(StatefulTopology.class);
- /**
- * A bolt that uses {@link KeyValueState} to save its state.
- */
- private static class StatefulSumBolt extends BaseStatefulBolt<KeyValueState<String, Long>> {
- String name;
- KeyValueState<String, Long> kvState;
- long sum;
- private OutputCollector collector;
-
- StatefulSumBolt(String name) {
- this.name = name;
- }
-
- @Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
- }
-
- @Override
- public void execute(Tuple input) {
- sum += ((Number) input.getValueByField("value")).longValue();
- LOG.debug("{} sum = {}", name, sum);
- kvState.put("sum", sum);
- collector.emit(input, new Values(sum));
- collector.ack(input);
- }
-
- @Override
- public void initState(KeyValueState<String, Long> state) {
- kvState = state;
- sum = kvState.get("sum", 0L);
- LOG.debug("Initstate, sum from saved state = {} ", sum);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("value"));
- }
- }
-
- public static class PrinterBolt extends BaseBasicBolt {
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- System.out.println(tuple);
- LOG.debug("Got tuple {}", tuple);
- collector.emit(tuple.getValues());
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer ofd) {
- ofd.declare(new Fields("value"));
- }
-
- }
-
- public static void main(String[] args) throws Exception {
- TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout("spout", new RandomIntegerSpout());
- builder.setBolt("partialsum", new StatefulSumBolt("partial"), 1).shuffleGrouping("spout");
- builder.setBolt("printer", new PrinterBolt(), 2).shuffleGrouping("partialsum");
- builder.setBolt("total", new StatefulSumBolt("total"), 1).shuffleGrouping("printer");
- Config conf = new Config();
- conf.setDebug(false);
-
- if (args != null && args.length > 0) {
- conf.setNumWorkers(1);
- StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
- } else {
- try (LocalCluster cluster = new LocalCluster();
- LocalTopology topology = cluster.submitTopology("test", conf, builder.createTopology());) {
- Utils.sleep(40000);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/examples/storm-starter/src/jvm/storm/starter/StatefulWindowingTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/StatefulWindowingTopology.java b/examples/storm-starter/src/jvm/storm/starter/StatefulWindowingTopology.java
deleted file mode 100644
index e43759c..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/StatefulWindowingTopology.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.starter.bolt.PrinterBolt;
-import org.apache.storm.starter.spout.RandomIntegerSpout;
-import org.apache.storm.state.KeyValueState;
-import org.apache.storm.state.State;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.topology.base.BaseStatefulWindowedBolt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-import org.apache.storm.windowing.TupleWindow;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-import static org.apache.storm.topology.base.BaseWindowedBolt.Count;
-
-/**
- * A simple example that demonstrates the usage of {@link org.apache.storm.topology.IStatefulWindowedBolt} to
- * save the state of the windowing operation to avoid re-computation in case of failures.
- * <p>
- * The framework internally manages the window boundaries and does not invoke
- * {@link org.apache.storm.topology.IWindowedBolt#execute(TupleWindow)} for the already evaluated windows in case of restarts
- * during failures. The {@link org.apache.storm.topology.IStatefulBolt#initState(State)}
- * is invoked with the previously saved state of the bolt after prepare, before the execute() method is invoked.
- * </p>
- */
-public class StatefulWindowingTopology {
- private static final Logger LOG = LoggerFactory.getLogger(StatefulWindowingTopology.class);
-
- private static class WindowSumBolt extends BaseStatefulWindowedBolt<KeyValueState<String, Long>> {
- private KeyValueState<String, Long> state;
- private long sum;
-
- private OutputCollector collector;
-
- @Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
- }
-
- @Override
- public void initState(KeyValueState<String, Long> state) {
- this.state = state;
- sum = state.get("sum", 0L);
- LOG.debug("initState with state [" + state + "] current sum [" + sum + "]");
- }
-
- @Override
- public void execute(TupleWindow inputWindow) {
- for (Tuple tuple : inputWindow.get()) {
- sum += tuple.getIntegerByField("value");
- }
- state.put("sum", sum);
- collector.emit(new Values(sum));
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("sum"));
- }
- }
-
- public static void main(String[] args) throws Exception {
- TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout("spout", new RandomIntegerSpout());
- builder.setBolt("sumbolt", new WindowSumBolt().withWindow(new Count(5), new Count(3))
- .withMessageIdField("msgid"), 1).shuffleGrouping("spout");
- builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("sumbolt");
- Config conf = new Config();
- conf.setDebug(false);
- //conf.put(Config.TOPOLOGY_STATE_PROVIDER, "org.apache.storm.redis.state.RedisKeyValueStateProvider");
- if (args != null && args.length > 0) {
- conf.setNumWorkers(1);
- StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
- } else {
- try (LocalCluster cluster = new LocalCluster();
- LocalTopology topo = cluster.submitTopology("test", conf, builder.createTopology());) {
- Utils.sleep(40000);
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/pom.xml b/external/storm-cassandra/pom.xml
index 8fc882a..c4e341b 100644
--- a/external/storm-cassandra/pom.xml
+++ b/external/storm-cassandra/pom.xml
@@ -79,12 +79,19 @@
<dependency>
<groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
+ <artifactId>storm-client</artifactId>
<version>${project.version}</version>
<scope>${provided.scope}</scope>
</dependency>
<dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-server</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${org.slf4j.version}</version>
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java
index ea7750f..7d34b16 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java
@@ -35,11 +35,9 @@ import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.TupleUtils;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java
index 0f0de53..28e47bd 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java
@@ -19,6 +19,7 @@
package org.apache.storm.cassandra.client;
import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.ObjectReader;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.policies.DefaultRetryPolicy;
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
@@ -106,14 +107,14 @@ public class CassandraConf implements Serializable {
*/
public CassandraConf(Map<String, Object> conf) {
- this.username = (String)Utils.get(conf, CASSANDRA_USERNAME, null);
- this.password = (String)Utils.get(conf, CASSANDRA_PASSWORD, null);
+ this.username = (String) Utils.get(conf, CASSANDRA_USERNAME, null);
+ this.password = (String) Utils.get(conf, CASSANDRA_PASSWORD, null);
this.keyspace = get(conf, CASSANDRA_KEYSPACE);
- this.consistencyLevel = ConsistencyLevel.valueOf((String)Utils.get(conf, CASSANDRA_CONSISTENCY_LEVEL, ConsistencyLevel.ONE.name()));
- this.nodes = ((String)Utils.get(conf, CASSANDRA_NODES, "localhost")).split(",");
- this.batchSizeRows = Utils.getInt(conf.get(CASSANDRA_BATCH_SIZE_ROWS), 100);
- this.port = Utils.getInt(conf.get(CASSANDRA_PORT), 9042);
- this.retryPolicyName = (String)Utils.get(conf, CASSANDRA_RETRY_POLICY, DefaultRetryPolicy.class.getSimpleName());
+ this.consistencyLevel = ConsistencyLevel.valueOf((String) Utils.get(conf, CASSANDRA_CONSISTENCY_LEVEL, ConsistencyLevel.ONE.name()));
+ this.nodes = ((String) Utils.get(conf, CASSANDRA_NODES, "localhost")).split(",");
+ this.batchSizeRows = ObjectReader.getInt(conf.get(CASSANDRA_BATCH_SIZE_ROWS), 100);
+ this.port = ObjectReader.getInt(conf.get(CASSANDRA_PORT), 9042);
+ this.retryPolicyName = (String) Utils.get(conf, CASSANDRA_RETRY_POLICY, DefaultRetryPolicy.class.getSimpleName());
this.reconnectionPolicyBaseMs = getLong(conf.get(CASSANDRA_RECONNECT_POLICY_BASE_MS), 100L);
this.reconnectionPolicyMaxMs = getLong(conf.get(CASSANDRA_RECONNECT_POLICY_MAX_MS), TimeUnit.MINUTES.toMillis(1));
}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-druid/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-druid/pom.xml b/external/storm-druid/pom.xml
index 3f9601c..6414ccc 100644
--- a/external/storm-druid/pom.xml
+++ b/external/storm-druid/pom.xml
@@ -32,11 +32,17 @@
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
+ <artifactId>storm-client</artifactId>
<version>${project.version}</version>
<scope>${provided.scope}</scope>
</dependency>
<dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-server</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>io.druid</groupId>
<artifactId>tranquility-core_2.11</artifactId>
<version>0.8.2</version>
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/pom.xml b/external/storm-elasticsearch/pom.xml
index c676179..bab0426 100644
--- a/external/storm-elasticsearch/pom.xml
+++ b/external/storm-elasticsearch/pom.xml
@@ -48,7 +48,7 @@
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
+ <artifactId>storm-client</artifactId>
<version>${project.version}</version>
<scope>${provided.scope}</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-eventhubs/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/pom.xml b/external/storm-eventhubs/pom.xml
index 05d8c3b..7fe6071 100755
--- a/external/storm-eventhubs/pom.xml
+++ b/external/storm-eventhubs/pom.xml
@@ -58,13 +58,20 @@
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
+ <artifactId>storm-client</artifactId>
<version>${project.version}</version>
<!-- keep storm out of the jar-with-dependencies -->
<type>jar</type>
<scope>${provided.scope}</scope>
</dependency>
<dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-server</artifactId>
+ <version>${project.version}</version>
+ <type>jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>${curator.version}</version>
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/AtMostOnceEventCount.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/AtMostOnceEventCount.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/AtMostOnceEventCount.java
deleted file mode 100755
index b0dd33a..0000000
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/AtMostOnceEventCount.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.samples;
-
-import java.io.Serializable;
-
-import org.apache.storm.eventhubs.spout.EventHubSpout;
-import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
-import org.apache.storm.eventhubs.spout.IEventHubReceiver;
-import org.apache.storm.eventhubs.spout.IPartitionManager;
-import org.apache.storm.eventhubs.spout.IPartitionManagerFactory;
-import org.apache.storm.eventhubs.spout.IStateStore;
-import org.apache.storm.eventhubs.spout.SimplePartitionManager;
-
-public class AtMostOnceEventCount extends EventCount implements Serializable {
- @Override
- protected EventHubSpout createEventHubSpout() {
- IPartitionManagerFactory pmFactory = new IPartitionManagerFactory() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public IPartitionManager create(EventHubSpoutConfig spoutConfig,
- String partitionId, IStateStore stateStore,
- IEventHubReceiver receiver) {
- return new SimplePartitionManager(spoutConfig, partitionId,
- stateStore, receiver);
- }
- };
- EventHubSpout eventHubSpout = new EventHubSpout(
- spoutConfig, null, pmFactory, null);
- return eventHubSpout;
- }
-
- public static void main(String[] args) throws Exception {
- AtMostOnceEventCount scenario = new AtMostOnceEventCount();
-
- scenario.runScenario(args);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventCount.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventCount.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventCount.java
deleted file mode 100755
index ae15634..0000000
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventCount.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.samples;
-
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.topology.TopologyBuilder;
-
-import org.apache.storm.eventhubs.samples.bolt.GlobalCountBolt;
-import org.apache.storm.eventhubs.samples.bolt.PartialCountBolt;
-import org.apache.storm.eventhubs.spout.EventHubSpout;
-import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
-
-import java.io.FileReader;
-import java.util.Properties;
-
-/**
- * The basic scenario topology that uses EventHubSpout with PartialCountBolt
- * and GlobalCountBolt.
- * To submit this topology:
- * storm jar {jarfile} {classname} {topologyname} {spoutconffile}
- */
-public class EventCount {
- protected EventHubSpoutConfig spoutConfig;
- protected int numWorkers;
-
- public EventCount() {
- }
-
- protected void readEHConfig(String[] args) throws Exception {
- Properties properties = new Properties();
- if(args.length > 1) {
- properties.load(new FileReader(args[1]));
- }
- else {
- properties.load(EventCount.class.getClassLoader().getResourceAsStream(
- "Config.properties"));
- }
-
- String username = properties.getProperty("eventhubspout.username");
- String password = properties.getProperty("eventhubspout.password");
- String namespaceName = properties.getProperty("eventhubspout.namespace");
- String entityPath = properties.getProperty("eventhubspout.entitypath");
- String targetFqnAddress = properties.getProperty("eventhubspout.targetfqnaddress");
- String zkEndpointAddress = properties.getProperty("zookeeper.connectionstring");
- int partitionCount = Integer.parseInt(properties.getProperty("eventhubspout.partitions.count"));
- int checkpointIntervalInSeconds = Integer.parseInt(properties.getProperty("eventhubspout.checkpoint.interval"));
- int receiverCredits = Integer.parseInt(properties.getProperty("eventhub.receiver.credits"));
- String maxPendingMsgsPerPartitionStr = properties.getProperty("eventhubspout.max.pending.messages.per.partition");
- if(maxPendingMsgsPerPartitionStr == null) {
- maxPendingMsgsPerPartitionStr = "1024";
- }
- int maxPendingMsgsPerPartition = Integer.parseInt(maxPendingMsgsPerPartitionStr);
- String enqueueTimeDiffStr = properties.getProperty("eventhub.receiver.filter.timediff");
- if(enqueueTimeDiffStr == null) {
- enqueueTimeDiffStr = "0";
- }
- int enqueueTimeDiff = Integer.parseInt(enqueueTimeDiffStr);
- long enqueueTimeFilter = 0;
- if(enqueueTimeDiff != 0) {
- enqueueTimeFilter = System.currentTimeMillis() - enqueueTimeDiff*1000;
- }
- String consumerGroupName = properties.getProperty("eventhubspout.consumer.group.name");
-
- System.out.println("Eventhub spout config: ");
- System.out.println(" partition count: " + partitionCount);
- System.out.println(" checkpoint interval: " + checkpointIntervalInSeconds);
- System.out.println(" receiver credits: " + receiverCredits);
- spoutConfig = new EventHubSpoutConfig(username, password,
- namespaceName, entityPath, partitionCount, zkEndpointAddress,
- checkpointIntervalInSeconds, receiverCredits, maxPendingMsgsPerPartition,
- enqueueTimeFilter);
-
- if(targetFqnAddress != null)
- {
- spoutConfig.setTargetAddress(targetFqnAddress);
- }
- spoutConfig.setConsumerGroupName(consumerGroupName);
-
- //set the number of workers to be the same as partition number.
- //the idea is to have a spout and a partial count bolt co-exist in one
- //worker to avoid shuffling messages across workers in storm cluster.
- numWorkers = spoutConfig.getPartitionCount();
-
- if(args.length > 0) {
- //set topology name so that sample Trident topology can use it as stream name.
- spoutConfig.setTopologyName(args[0]);
- }
- }
-
- protected EventHubSpout createEventHubSpout() {
- EventHubSpout eventHubSpout = new EventHubSpout(spoutConfig);
- return eventHubSpout;
- }
-
- protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
- TopologyBuilder topologyBuilder = new TopologyBuilder();
-
- topologyBuilder.setSpout("EventHubsSpout", eventHubSpout, spoutConfig.getPartitionCount())
- .setNumTasks(spoutConfig.getPartitionCount());
- topologyBuilder.setBolt("PartialCountBolt", new PartialCountBolt(), spoutConfig.getPartitionCount())
- .localOrShuffleGrouping("EventHubsSpout").setNumTasks(spoutConfig.getPartitionCount());
- topologyBuilder.setBolt("GlobalCountBolt", new GlobalCountBolt(), 1)
- .globalGrouping("PartialCountBolt").setNumTasks(1);
- return topologyBuilder.createTopology();
- }
-
- protected void submitTopology(String[] args, StormTopology topology) throws Exception {
- Config config = new Config();
- config.setDebug(false);
- //Enable metrics
- config.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class, 1);
-
-
- if (args != null && args.length > 0) {
- config.setNumWorkers(numWorkers);
- StormSubmitter.submitTopology(args[0], config, topology);
- } else {
- config.setMaxTaskParallelism(2);
-
- try (LocalCluster localCluster = new LocalCluster();
- LocalTopology topo = localCluster.submitTopology("test", config, topology);) {
- Thread.sleep(5000000);
- }
- }
- }
-
- protected void runScenario(String[] args) throws Exception{
- readEHConfig(args);
- EventHubSpout eventHubSpout = createEventHubSpout();
- StormTopology topology = buildTopology(eventHubSpout);
- submitTopology(args, topology);
- }
-
- public static void main(String[] args) throws Exception {
- EventCount scenario = new EventCount();
- scenario.runScenario(args);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventHubLoop.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
deleted file mode 100755
index 665fef9..0000000
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.samples;
-
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.topology.TopologyBuilder;
-
-import org.apache.storm.eventhubs.bolt.EventHubBolt;
-import org.apache.storm.eventhubs.bolt.EventHubBoltConfig;
-import org.apache.storm.eventhubs.spout.EventHubSpout;
-
-/**
- * A sample topology that loops message back to EventHub
- */
-public class EventHubLoop extends EventCount {
-
- @Override
- protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
- TopologyBuilder topologyBuilder = new TopologyBuilder();
-
- topologyBuilder.setSpout("EventHubsSpout", eventHubSpout, spoutConfig.getPartitionCount())
- .setNumTasks(spoutConfig.getPartitionCount());
- EventHubBoltConfig boltConfig = new EventHubBoltConfig(spoutConfig.getConnectionString(),
- spoutConfig.getEntityPath(), true);
-
- EventHubBolt eventHubBolt = new EventHubBolt(boltConfig);
- int boltTasks = spoutConfig.getPartitionCount();
- topologyBuilder.setBolt("EventHubsBolt", eventHubBolt, boltTasks)
- .localOrShuffleGrouping("EventHubsSpout").setNumTasks(boltTasks);
- return topologyBuilder.createTopology();
- }
-
- public static void main(String[] args) throws Exception {
- EventHubLoop scenario = new EventHubLoop();
- scenario.runScenario(args);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java
deleted file mode 100755
index e8538c1..0000000
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.samples;
-
-import org.apache.storm.trident.TridentState;
-import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.operation.builtin.Count;
-import org.apache.storm.trident.operation.builtin.Sum;
-import org.apache.storm.trident.testing.MemoryMapState;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.tuple.Fields;
-
-import org.apache.storm.eventhubs.samples.TransactionalTridentEventCount.LoggingFilter;
-import org.apache.storm.eventhubs.spout.EventHubSpout;
-import org.apache.storm.eventhubs.trident.OpaqueTridentEventHubSpout;
-
-/**
- * A simple Trident topology uses OpaqueTridentEventHubSpout
- */
-public class OpaqueTridentEventCount extends EventCount {
- @Override
- protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
- TridentTopology topology = new TridentTopology();
-
- OpaqueTridentEventHubSpout spout = new OpaqueTridentEventHubSpout(spoutConfig);
- TridentState state = topology.newStream("stream-" + spoutConfig.getTopologyName(), spout)
- .parallelismHint(spoutConfig.getPartitionCount())
- .aggregate(new Count(), new Fields("partial-count"))
- .persistentAggregate(new MemoryMapState.Factory(), new Fields("partial-count"), new Sum(), new Fields("count"));
- state.newValuesStream().each(new Fields("count"), new LoggingFilter("got count: ", 10000));
- return topology.build();
- }
-
- public static void main(String[] args) throws Exception {
- OpaqueTridentEventCount scenario = new OpaqueTridentEventCount();
- scenario.runScenario(args);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java
deleted file mode 100755
index 0a5295f..0000000
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.samples;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.tuple.Fields;
-
-import org.apache.storm.eventhubs.spout.EventHubSpout;
-import org.apache.storm.eventhubs.trident.TransactionalTridentEventHubSpout;
-
-import org.apache.storm.trident.TridentState;
-import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.operation.BaseFilter;
-import org.apache.storm.trident.operation.builtin.Count;
-import org.apache.storm.trident.operation.builtin.Sum;
-import org.apache.storm.trident.testing.MemoryMapState;
-import org.apache.storm.trident.tuple.TridentTuple;
-
-/**
- * A simple Trident topology uses TransactionalTridentEventHubSpout
- */
-public class TransactionalTridentEventCount extends EventCount {
- public static class LoggingFilter extends BaseFilter {
- private static final long serialVersionUID = 1L;
- private static final Logger logger = LoggerFactory.getLogger(LoggingFilter.class);
- private final String prefix;
- private final long logIntervalMs;
- private long lastTime;
- public LoggingFilter(String prefix, int logIntervalMs) {
- this.prefix = prefix;
- this.logIntervalMs = logIntervalMs;
- lastTime = System.nanoTime();
- }
-
- @Override
- public boolean isKeep(TridentTuple tuple) {
- long now = System.nanoTime();
- if(logIntervalMs < (now - lastTime) / 1000000) {
- logger.info(prefix + tuple.toString());
- lastTime = now;
- }
- return false;
- }
- }
-
- @Override
- protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
- TridentTopology topology = new TridentTopology();
-
- TransactionalTridentEventHubSpout spout = new TransactionalTridentEventHubSpout(spoutConfig);
- TridentState state = topology.newStream("stream-" + spoutConfig.getTopologyName(), spout)
- .parallelismHint(spoutConfig.getPartitionCount())
- .aggregate(new Count(), new Fields("partial-count"))
- .persistentAggregate(new MemoryMapState.Factory(), new Fields("partial-count"), new Sum(), new Fields("count"));
- state.newValuesStream().each(new Fields("count"), new LoggingFilter("got count: ", 10000));
- return topology.build();
- }
-
- public static void main(String[] args) throws Exception {
- TransactionalTridentEventCount scenario = new TransactionalTridentEventCount();
- scenario.runScenario(args);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java
deleted file mode 100755
index bc9219e..0000000
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.samples.bolt;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.storm.utils.TupleUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.storm.Config;
-import org.apache.storm.metric.api.IMetric;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseBasicBolt;
-import org.apache.storm.tuple.Tuple;
-
-/**
- * Globally count number of messages
- */
-public class GlobalCountBolt extends BaseBasicBolt {
- private static final long serialVersionUID = 1L;
- private static final Logger logger = LoggerFactory
- .getLogger(GlobalCountBolt.class);
- private long globalCount;
- private long globalCountDiff;
- private long lastMetricsTime;
- private long throughput;
-
- @Override
- public void prepare(Map config, TopologyContext context) {
- globalCount = 0;
- globalCountDiff = 0;
- lastMetricsTime = System.nanoTime();
- context.registerMetric("GlobalMessageCount", new IMetric() {
- @Override
- public Object getValueAndReset() {
- long now = System.nanoTime();
- long millis = (now - lastMetricsTime) / 1000000;
- throughput = globalCountDiff / millis * 1000;
- Map values = new HashMap();
- values.put("global_count", globalCount);
- values.put("throughput", throughput);
- lastMetricsTime = now;
- globalCountDiff = 0;
- return values;
- }
- }, (Integer)config.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS));
- }
-
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- if (TupleUtils.isTick(tuple)) {
- return;
- }
-
- int partial = (Integer)tuple.getValueByField("partial_count");
- globalCount += partial;
- globalCountDiff += partial;
- if((globalCountDiff == partial) && (globalCount != globalCountDiff)) {
- //metrics has just been collected, let's also log it
- logger.info("Current throughput (messages/second): " + throughput);
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java
deleted file mode 100755
index eaf2b65..0000000
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.samples.bolt;
-
-import java.util.Map;
-
-import org.apache.storm.utils.TupleUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseBasicBolt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-
-/**
- * Partially count number of messages from EventHubs
- */
-public class PartialCountBolt extends BaseBasicBolt {
- private static final long serialVersionUID = 1L;
- private static final Logger logger = LoggerFactory
- .getLogger(PartialCountBolt.class);
- private static final int PartialCountBatchSize = 1000;
-
- private int partialCount;
-
- @Override
- public void prepare(Map stormConf, TopologyContext context) {
- partialCount = 0;
- }
-
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- if (TupleUtils.isTick(tuple)) {
- return;
- }
-
- partialCount++;
- if(partialCount == PartialCountBatchSize) {
- collector.emit(new Values(PartialCountBatchSize));
- partialCount = 0;
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("partial_count"));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/AtMostOnceEventCount.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/AtMostOnceEventCount.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/AtMostOnceEventCount.java
new file mode 100755
index 0000000..b0dd33a
--- /dev/null
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/AtMostOnceEventCount.java
@@ -0,0 +1,54 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples;
+
+import java.io.Serializable;
+
+import org.apache.storm.eventhubs.spout.EventHubSpout;
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+import org.apache.storm.eventhubs.spout.IEventHubReceiver;
+import org.apache.storm.eventhubs.spout.IPartitionManager;
+import org.apache.storm.eventhubs.spout.IPartitionManagerFactory;
+import org.apache.storm.eventhubs.spout.IStateStore;
+import org.apache.storm.eventhubs.spout.SimplePartitionManager;
+
+public class AtMostOnceEventCount extends EventCount implements Serializable {
+ @Override
+ protected EventHubSpout createEventHubSpout() {
+ IPartitionManagerFactory pmFactory = new IPartitionManagerFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IPartitionManager create(EventHubSpoutConfig spoutConfig,
+ String partitionId, IStateStore stateStore,
+ IEventHubReceiver receiver) {
+ return new SimplePartitionManager(spoutConfig, partitionId,
+ stateStore, receiver);
+ }
+ };
+ EventHubSpout eventHubSpout = new EventHubSpout(
+ spoutConfig, null, pmFactory, null);
+ return eventHubSpout;
+ }
+
+ public static void main(String[] args) throws Exception {
+ AtMostOnceEventCount scenario = new AtMostOnceEventCount();
+
+ scenario.runScenario(args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventCount.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventCount.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventCount.java
new file mode 100755
index 0000000..ae15634
--- /dev/null
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventCount.java
@@ -0,0 +1,157 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples;
+
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.TopologyBuilder;
+
+import org.apache.storm.eventhubs.samples.bolt.GlobalCountBolt;
+import org.apache.storm.eventhubs.samples.bolt.PartialCountBolt;
+import org.apache.storm.eventhubs.spout.EventHubSpout;
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+
+import java.io.FileReader;
+import java.util.Properties;
+
+/**
+ * The basic scenario topology that uses EventHubSpout with PartialCountBolt
+ * and GlobalCountBolt.
+ * To submit this topology:
+ * storm jar {jarfile} {classname} {topologyname} {spoutconffile}
+ */
+public class EventCount {
+ protected EventHubSpoutConfig spoutConfig;
+ protected int numWorkers;
+
+ public EventCount() {
+ }
+
+ protected void readEHConfig(String[] args) throws Exception {
+ Properties properties = new Properties();
+ if(args.length > 1) {
+ properties.load(new FileReader(args[1]));
+ }
+ else {
+ properties.load(EventCount.class.getClassLoader().getResourceAsStream(
+ "Config.properties"));
+ }
+
+ String username = properties.getProperty("eventhubspout.username");
+ String password = properties.getProperty("eventhubspout.password");
+ String namespaceName = properties.getProperty("eventhubspout.namespace");
+ String entityPath = properties.getProperty("eventhubspout.entitypath");
+ String targetFqnAddress = properties.getProperty("eventhubspout.targetfqnaddress");
+ String zkEndpointAddress = properties.getProperty("zookeeper.connectionstring");
+ int partitionCount = Integer.parseInt(properties.getProperty("eventhubspout.partitions.count"));
+ int checkpointIntervalInSeconds = Integer.parseInt(properties.getProperty("eventhubspout.checkpoint.interval"));
+ int receiverCredits = Integer.parseInt(properties.getProperty("eventhub.receiver.credits"));
+ String maxPendingMsgsPerPartitionStr = properties.getProperty("eventhubspout.max.pending.messages.per.partition");
+ if(maxPendingMsgsPerPartitionStr == null) {
+ maxPendingMsgsPerPartitionStr = "1024";
+ }
+ int maxPendingMsgsPerPartition = Integer.parseInt(maxPendingMsgsPerPartitionStr);
+ String enqueueTimeDiffStr = properties.getProperty("eventhub.receiver.filter.timediff");
+ if(enqueueTimeDiffStr == null) {
+ enqueueTimeDiffStr = "0";
+ }
+ int enqueueTimeDiff = Integer.parseInt(enqueueTimeDiffStr);
+ long enqueueTimeFilter = 0;
+ if(enqueueTimeDiff != 0) {
+ enqueueTimeFilter = System.currentTimeMillis() - enqueueTimeDiff*1000;
+ }
+ String consumerGroupName = properties.getProperty("eventhubspout.consumer.group.name");
+
+ System.out.println("Eventhub spout config: ");
+ System.out.println(" partition count: " + partitionCount);
+ System.out.println(" checkpoint interval: " + checkpointIntervalInSeconds);
+ System.out.println(" receiver credits: " + receiverCredits);
+ spoutConfig = new EventHubSpoutConfig(username, password,
+ namespaceName, entityPath, partitionCount, zkEndpointAddress,
+ checkpointIntervalInSeconds, receiverCredits, maxPendingMsgsPerPartition,
+ enqueueTimeFilter);
+
+ if(targetFqnAddress != null)
+ {
+ spoutConfig.setTargetAddress(targetFqnAddress);
+ }
+ spoutConfig.setConsumerGroupName(consumerGroupName);
+
+ //set the number of workers to be the same as partition number.
+ //the idea is to have a spout and a partial count bolt co-exist in one
+ //worker to avoid shuffling messages across workers in storm cluster.
+ numWorkers = spoutConfig.getPartitionCount();
+
+ if(args.length > 0) {
+ //set topology name so that sample Trident topology can use it as stream name.
+ spoutConfig.setTopologyName(args[0]);
+ }
+ }
+
+ protected EventHubSpout createEventHubSpout() {
+ EventHubSpout eventHubSpout = new EventHubSpout(spoutConfig);
+ return eventHubSpout;
+ }
+
+ protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
+ TopologyBuilder topologyBuilder = new TopologyBuilder();
+
+ topologyBuilder.setSpout("EventHubsSpout", eventHubSpout, spoutConfig.getPartitionCount())
+ .setNumTasks(spoutConfig.getPartitionCount());
+ topologyBuilder.setBolt("PartialCountBolt", new PartialCountBolt(), spoutConfig.getPartitionCount())
+ .localOrShuffleGrouping("EventHubsSpout").setNumTasks(spoutConfig.getPartitionCount());
+ topologyBuilder.setBolt("GlobalCountBolt", new GlobalCountBolt(), 1)
+ .globalGrouping("PartialCountBolt").setNumTasks(1);
+ return topologyBuilder.createTopology();
+ }
+
+ protected void submitTopology(String[] args, StormTopology topology) throws Exception {
+ Config config = new Config();
+ config.setDebug(false);
+ //Enable metrics
+ config.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class, 1);
+
+
+ if (args != null && args.length > 0) {
+ config.setNumWorkers(numWorkers);
+ StormSubmitter.submitTopology(args[0], config, topology);
+ } else {
+ config.setMaxTaskParallelism(2);
+
+ try (LocalCluster localCluster = new LocalCluster();
+ LocalTopology topo = localCluster.submitTopology("test", config, topology);) {
+ Thread.sleep(5000000);
+ }
+ }
+ }
+
+ protected void runScenario(String[] args) throws Exception{
+ readEHConfig(args);
+ EventHubSpout eventHubSpout = createEventHubSpout();
+ StormTopology topology = buildTopology(eventHubSpout);
+ submitTopology(args, topology);
+ }
+
+ public static void main(String[] args) throws Exception {
+ EventCount scenario = new EventCount();
+ scenario.runScenario(args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventHubLoop.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
new file mode 100755
index 0000000..665fef9
--- /dev/null
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
@@ -0,0 +1,52 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples;
+
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.TopologyBuilder;
+
+import org.apache.storm.eventhubs.bolt.EventHubBolt;
+import org.apache.storm.eventhubs.bolt.EventHubBoltConfig;
+import org.apache.storm.eventhubs.spout.EventHubSpout;
+
+/**
+ * A sample topology that loops message back to EventHub
+ */
+public class EventHubLoop extends EventCount {
+
+ @Override
+ protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
+ TopologyBuilder topologyBuilder = new TopologyBuilder();
+
+ topologyBuilder.setSpout("EventHubsSpout", eventHubSpout, spoutConfig.getPartitionCount())
+ .setNumTasks(spoutConfig.getPartitionCount());
+ EventHubBoltConfig boltConfig = new EventHubBoltConfig(spoutConfig.getConnectionString(),
+ spoutConfig.getEntityPath(), true);
+
+ EventHubBolt eventHubBolt = new EventHubBolt(boltConfig);
+ int boltTasks = spoutConfig.getPartitionCount();
+ topologyBuilder.setBolt("EventHubsBolt", eventHubBolt, boltTasks)
+ .localOrShuffleGrouping("EventHubsSpout").setNumTasks(boltTasks);
+ return topologyBuilder.createTopology();
+ }
+
+ public static void main(String[] args) throws Exception {
+ EventHubLoop scenario = new EventHubLoop();
+ scenario.runScenario(args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java
new file mode 100755
index 0000000..e8538c1
--- /dev/null
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java
@@ -0,0 +1,53 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples;
+
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.builtin.Count;
+import org.apache.storm.trident.operation.builtin.Sum;
+import org.apache.storm.trident.testing.MemoryMapState;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.tuple.Fields;
+
+import org.apache.storm.eventhubs.samples.TransactionalTridentEventCount.LoggingFilter;
+import org.apache.storm.eventhubs.spout.EventHubSpout;
+import org.apache.storm.eventhubs.trident.OpaqueTridentEventHubSpout;
+
+/**
+ * A simple Trident topology uses OpaqueTridentEventHubSpout
+ */
+public class OpaqueTridentEventCount extends EventCount {
+ @Override
+ protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
+ TridentTopology topology = new TridentTopology();
+
+ OpaqueTridentEventHubSpout spout = new OpaqueTridentEventHubSpout(spoutConfig);
+ TridentState state = topology.newStream("stream-" + spoutConfig.getTopologyName(), spout)
+ .parallelismHint(spoutConfig.getPartitionCount())
+ .aggregate(new Count(), new Fields("partial-count"))
+ .persistentAggregate(new MemoryMapState.Factory(), new Fields("partial-count"), new Sum(), new Fields("count"));
+ state.newValuesStream().each(new Fields("count"), new LoggingFilter("got count: ", 10000));
+ return topology.build();
+ }
+
+ public static void main(String[] args) throws Exception {
+ OpaqueTridentEventCount scenario = new OpaqueTridentEventCount();
+ scenario.runScenario(args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java
new file mode 100755
index 0000000..0a5295f
--- /dev/null
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java
@@ -0,0 +1,81 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.tuple.Fields;
+
+import org.apache.storm.eventhubs.spout.EventHubSpout;
+import org.apache.storm.eventhubs.trident.TransactionalTridentEventHubSpout;
+
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.BaseFilter;
+import org.apache.storm.trident.operation.builtin.Count;
+import org.apache.storm.trident.operation.builtin.Sum;
+import org.apache.storm.trident.testing.MemoryMapState;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+/**
+ * A simple Trident topology uses TransactionalTridentEventHubSpout
+ */
+public class TransactionalTridentEventCount extends EventCount {
+ public static class LoggingFilter extends BaseFilter {
+ private static final long serialVersionUID = 1L;
+ private static final Logger logger = LoggerFactory.getLogger(LoggingFilter.class);
+ private final String prefix;
+ private final long logIntervalMs;
+ private long lastTime;
+ public LoggingFilter(String prefix, int logIntervalMs) {
+ this.prefix = prefix;
+ this.logIntervalMs = logIntervalMs;
+ lastTime = System.nanoTime();
+ }
+
+ @Override
+ public boolean isKeep(TridentTuple tuple) {
+ long now = System.nanoTime();
+ if(logIntervalMs < (now - lastTime) / 1000000) {
+ logger.info(prefix + tuple.toString());
+ lastTime = now;
+ }
+ return false;
+ }
+ }
+
+ @Override
+ protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
+ TridentTopology topology = new TridentTopology();
+
+ TransactionalTridentEventHubSpout spout = new TransactionalTridentEventHubSpout(spoutConfig);
+ TridentState state = topology.newStream("stream-" + spoutConfig.getTopologyName(), spout)
+ .parallelismHint(spoutConfig.getPartitionCount())
+ .aggregate(new Count(), new Fields("partial-count"))
+ .persistentAggregate(new MemoryMapState.Factory(), new Fields("partial-count"), new Sum(), new Fields("count"));
+ state.newValuesStream().each(new Fields("count"), new LoggingFilter("got count: ", 10000));
+ return topology.build();
+ }
+
+ public static void main(String[] args) throws Exception {
+ TransactionalTridentEventCount scenario = new TransactionalTridentEventCount();
+ scenario.runScenario(args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java
new file mode 100755
index 0000000..bc9219e
--- /dev/null
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java
@@ -0,0 +1,88 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples.bolt;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.storm.utils.TupleUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.Config;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Tuple;
+
+/**
+ * Globally count number of messages
+ */
+public class GlobalCountBolt extends BaseBasicBolt {
+ private static final long serialVersionUID = 1L;
+ private static final Logger logger = LoggerFactory
+ .getLogger(GlobalCountBolt.class);
+ private long globalCount;
+ private long globalCountDiff;
+ private long lastMetricsTime;
+ private long throughput;
+
+ @Override
+ public void prepare(Map config, TopologyContext context) {
+ globalCount = 0;
+ globalCountDiff = 0;
+ lastMetricsTime = System.nanoTime();
+ context.registerMetric("GlobalMessageCount", new IMetric() {
+ @Override
+ public Object getValueAndReset() {
+ long now = System.nanoTime();
+ long millis = (now - lastMetricsTime) / 1000000;
+ throughput = globalCountDiff / millis * 1000;
+ Map values = new HashMap();
+ values.put("global_count", globalCount);
+ values.put("throughput", throughput);
+ lastMetricsTime = now;
+ globalCountDiff = 0;
+ return values;
+ }
+ }, (Integer)config.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS));
+ }
+
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ if (TupleUtils.isTick(tuple)) {
+ return;
+ }
+
+ int partial = (Integer)tuple.getValueByField("partial_count");
+ globalCount += partial;
+ globalCountDiff += partial;
+ if((globalCountDiff == partial) && (globalCount != globalCountDiff)) {
+ //metrics has just been collected, let's also log it
+ logger.info("Current throughput (messages/second): " + throughput);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java
new file mode 100755
index 0000000..eaf2b65
--- /dev/null
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java
@@ -0,0 +1,68 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples.bolt;
+
+import java.util.Map;
+
+import org.apache.storm.utils.TupleUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+/**
+ * Partially count number of messages from EventHubs
+ */
+public class PartialCountBolt extends BaseBasicBolt {
+ private static final long serialVersionUID = 1L;
+ private static final Logger logger = LoggerFactory
+ .getLogger(PartialCountBolt.class);
+ private static final int PartialCountBatchSize = 1000;
+
+ private int partialCount;
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context) {
+ partialCount = 0;
+ }
+
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ if (TupleUtils.isTick(tuple)) {
+ return;
+ }
+
+ partialCount++;
+ if(partialCount == PartialCountBatchSize) {
+ collector.emit(new Values(PartialCountBatchSize));
+ partialCount = 0;
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("partial_count"));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-hbase/pom.xml b/external/storm-hbase/pom.xml
index 3d6aa0c..fd6bfe5 100644
--- a/external/storm-hbase/pom.xml
+++ b/external/storm-hbase/pom.xml
@@ -44,7 +44,7 @@
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
+ <artifactId>storm-client</artifactId>
<version>${project.version}</version>
<scope>${provided.scope}</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/pom.xml b/external/storm-hdfs/pom.xml
index 281732e..8b3a792 100644
--- a/external/storm-hdfs/pom.xml
+++ b/external/storm-hdfs/pom.xml
@@ -45,7 +45,7 @@
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
+ <artifactId>storm-client</artifactId>
<version>${project.version}</version>
<scope>${provided.scope}</scope>
<exclusions>
@@ -57,9 +57,10 @@
</exclusion>
</exclusions>
</dependency>
+
<dependency>
<groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
+ <artifactId>storm-client</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
@@ -72,6 +73,7 @@
</exclusion>
</exclusions>
</dependency>
+
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
index a4c88ce..9acae35 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
@@ -19,7 +19,7 @@ package org.apache.storm.hdfs.blobstore;
import org.apache.storm.Config;
import org.apache.storm.blobstore.BlobStoreFile;
-import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.ObjectReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -139,7 +139,7 @@ public class HdfsBlobStoreImpl {
}
Object shouldCleanup = conf.get(Config.BLOBSTORE_CLEANUP_ENABLE);
- if (Utils.getBoolean(shouldCleanup, false)) {
+ if (ObjectReader.getBoolean(shouldCleanup, false)) {
LOG.debug("Starting hdfs blobstore cleaner");
_cleanup = new TimerTask() {
@Override