You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/04/06 17:33:38 UTC

[49/52] [partial] storm git commit: STORM-2441 Break down 'storm-core' to extract client (worker) artifacts

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulWindowingTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulWindowingTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulWindowingTopology.java
new file mode 100644
index 0000000..0e9b6ea
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulWindowingTopology.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.starter.bolt.PrinterBolt;
+import org.apache.storm.starter.spout.RandomIntegerSpout;
+import org.apache.storm.state.KeyValueState;
+import org.apache.storm.state.State;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseStatefulWindowedBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.windowing.TupleWindow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+import static org.apache.storm.topology.base.BaseWindowedBolt.Count;
+
+/**
+ * A simple example that demonstrates the usage of {@link org.apache.storm.topology.IStatefulWindowedBolt} to
+ * save the state of the windowing operation to avoid re-computation in case of failures.
+ * <p>
+ * The framework internally manages the window boundaries and does not invoke
+ * {@link org.apache.storm.topology.IWindowedBolt#execute(TupleWindow)} for the already evaluated windows in case of restarts
+ * during failures. The {@link org.apache.storm.topology.IStatefulBolt#initState(State)}
+ * is invoked with the previously saved state of the bolt after prepare, before the execute() method is invoked.
+ * </p>
+ */
+public class StatefulWindowingTopology {
+    private static final Logger LOG = LoggerFactory.getLogger(StatefulWindowingTopology.class);
+
+    private static class WindowSumBolt extends BaseStatefulWindowedBolt<KeyValueState<String, Long>> {
+        private KeyValueState<String, Long> state;
+        private long sum;
+
+        private OutputCollector collector;
+
+        @Override
+        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+            this.collector = collector;
+        }
+
+        @Override
+        public void initState(KeyValueState<String, Long> state) {
+            this.state = state;
+            sum = state.get("sum", 0L);
+            LOG.debug("initState with state [" + state + "] current sum [" + sum + "]");
+        }
+
+        @Override
+        public void execute(TupleWindow inputWindow) {
+            for (Tuple tuple : inputWindow.get()) {
+                sum += tuple.getIntegerByField("value");
+            }
+            state.put("sum", sum);
+            collector.emit(new Values(sum));
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("sum"));
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout("spout", new RandomIntegerSpout());
+        builder.setBolt("sumbolt", new WindowSumBolt().withWindow(new Count(5), new Count(3))
+                .withMessageIdField("msgid"), 1).shuffleGrouping("spout");
+        builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("sumbolt");
+        Config conf = new Config();
+        conf.setDebug(false);
+        //conf.put(Config.TOPOLOGY_STATE_PROVIDER, "org.apache.storm.redis.state.RedisKeyValueStateProvider");
+        if (args != null && args.length > 0) {
+            conf.setNumWorkers(1);
+            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
+        } else {
+            try (LocalCluster cluster = new LocalCluster();
+                 LocalTopology topo = cluster.submitTopology("test", conf, builder.createTopology());) {
+                Utils.sleep(40000);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java
index 9e709a1..d8137b0 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java
@@ -20,16 +20,13 @@ package org.apache.storm.starter;
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
 import org.apache.storm.StormSubmitter;
-import org.apache.storm.metric.HttpForwardingMetricsServer;
-import org.apache.storm.metric.HttpForwardingMetricsConsumer;
-import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.misc.metric.HttpForwardingMetricsServer;
 import org.apache.storm.metric.api.IMetricsConsumer.TaskInfo;
 import org.apache.storm.metric.api.IMetricsConsumer.DataPoint;
 import org.apache.storm.generated.*;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.IRichBolt;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.topology.base.BaseBasicBolt;
@@ -37,9 +34,8 @@ import org.apache.storm.topology.base.BaseRichSpout;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.NimbusClient;
 import org.apache.storm.utils.Utils;
-import org.apache.storm.StormSubmitter;
+import org.apache.storm.utils.NimbusClient;
 
 import java.util.Collection;
 import java.util.HashMap;
@@ -397,7 +393,7 @@ public class ThroughputVsLatency {
     C cluster = new C(conf);
     conf.setNumWorkers(parallelism);
     conf.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class);
-    conf.registerMetricsConsumer(org.apache.storm.metric.HttpForwardingMetricsConsumer.class, url, 1);
+    conf.registerMetricsConsumer(org.apache.storm.misc.metric.HttpForwardingMetricsConsumer.class, url, 1);
     Map<String, String> workerMetrics = new HashMap<String, String>();
     if (!cluster.isLocal()) {
       //sigar uses JNI and does not work in local mode

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/AggregateExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/AggregateExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/AggregateExample.java
index 7467634..cbc5d45 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/AggregateExample.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/AggregateExample.java
@@ -22,21 +22,13 @@ import org.apache.storm.LocalCluster;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.starter.spout.RandomIntegerSpout;
 import org.apache.storm.streams.Pair;
-import org.apache.storm.streams.PairStream;
-import org.apache.storm.streams.Stream;
 import org.apache.storm.streams.StreamBuilder;
 import org.apache.storm.streams.operations.CombinerAggregator;
-import org.apache.storm.streams.operations.mappers.TupleValueMapper;
-import org.apache.storm.streams.operations.mappers.TupleValueMappers;
 import org.apache.storm.streams.operations.mappers.ValueMapper;
-import org.apache.storm.streams.tuple.Tuple3;
 import org.apache.storm.streams.windowing.TumblingWindows;
 import org.apache.storm.topology.base.BaseWindowedBolt;
-import org.apache.storm.trident.windowing.config.TumblingDurationWindow;
 import org.apache.storm.utils.Utils;
 
-import static org.apache.storm.topology.base.BaseWindowedBolt.Count;
-
 /**
  * An example that illustrates the global aggregate
  */

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java b/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java
deleted file mode 100644
index d073350..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.starter.spout.RandomIntegerSpout;
-import org.apache.storm.state.KeyValueState;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.topology.base.BaseBasicBolt;
-import org.apache.storm.topology.base.BaseStatefulBolt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-/**
- * An example topology that demonstrates the use of {@link org.apache.storm.topology.IStatefulBolt}
- * to manage state. To run the example,
- * <pre>
- * $ storm jar examples/storm-starter/storm-starter-topologies-*.jar storm.starter.StatefulTopology statetopology
- * </pre>
- * <p/>
- * The default state used is 'InMemoryKeyValueState' which does not persist the state across restarts. You could use
- * 'RedisKeyValueState' to test state persistence by setting below property in conf/storm.yaml
- * <pre>
- * topology.state.provider: org.apache.storm.redis.state.RedisKeyValueStateProvider
- * </pre>
- * <p/>
- * You should also start a local redis instance before running the 'storm jar' command. The default
- * RedisKeyValueStateProvider parameters can be overridden in conf/storm.yaml, for e.g.
- * <p/>
- * <pre>
- * topology.state.provider.config: '{"keyClass":"...", "valueClass":"...",
- *                                   "keySerializerClass":"...", "valueSerializerClass":"...",
- *                                   "jedisPoolConfig":{"host":"localhost", "port":6379,
- *                                      "timeout":2000, "database":0, "password":"xyz"}}'
- *
- * </pre>
- * </p>
- */
-public class StatefulTopology {
-    private static final Logger LOG = LoggerFactory.getLogger(StatefulTopology.class);
-    /**
-     * A bolt that uses {@link KeyValueState} to save its state.
-     */
-    private static class StatefulSumBolt extends BaseStatefulBolt<KeyValueState<String, Long>> {
-        String name;
-        KeyValueState<String, Long> kvState;
-        long sum;
-        private OutputCollector collector;
-
-        StatefulSumBolt(String name) {
-            this.name = name;
-        }
-
-        @Override
-        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-            this.collector = collector;
-        }
-
-        @Override
-        public void execute(Tuple input) {
-            sum += ((Number) input.getValueByField("value")).longValue();
-            LOG.debug("{} sum = {}", name, sum);
-            kvState.put("sum", sum);
-            collector.emit(input, new Values(sum));
-            collector.ack(input);
-        }
-
-        @Override
-        public void initState(KeyValueState<String, Long> state) {
-            kvState = state;
-            sum = kvState.get("sum", 0L);
-            LOG.debug("Initstate, sum from saved state = {} ", sum);
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("value"));
-        }
-    }
-
-    public static class PrinterBolt extends BaseBasicBolt {
-        @Override
-        public void execute(Tuple tuple, BasicOutputCollector collector) {
-            System.out.println(tuple);
-            LOG.debug("Got tuple {}", tuple);
-            collector.emit(tuple.getValues());
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer ofd) {
-            ofd.declare(new Fields("value"));
-        }
-
-    }
-
-    public static void main(String[] args) throws Exception {
-        TopologyBuilder builder = new TopologyBuilder();
-        builder.setSpout("spout", new RandomIntegerSpout());
-        builder.setBolt("partialsum", new StatefulSumBolt("partial"), 1).shuffleGrouping("spout");
-        builder.setBolt("printer", new PrinterBolt(), 2).shuffleGrouping("partialsum");
-        builder.setBolt("total", new StatefulSumBolt("total"), 1).shuffleGrouping("printer");
-        Config conf = new Config();
-        conf.setDebug(false);
-
-        if (args != null && args.length > 0) {
-            conf.setNumWorkers(1);
-            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
-        } else {
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalTopology topology = cluster.submitTopology("test", conf, builder.createTopology());) {
-                Utils.sleep(40000);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/examples/storm-starter/src/jvm/storm/starter/StatefulWindowingTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/StatefulWindowingTopology.java b/examples/storm-starter/src/jvm/storm/starter/StatefulWindowingTopology.java
deleted file mode 100644
index e43759c..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/StatefulWindowingTopology.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.starter.bolt.PrinterBolt;
-import org.apache.storm.starter.spout.RandomIntegerSpout;
-import org.apache.storm.state.KeyValueState;
-import org.apache.storm.state.State;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.topology.base.BaseStatefulWindowedBolt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-import org.apache.storm.windowing.TupleWindow;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-import static org.apache.storm.topology.base.BaseWindowedBolt.Count;
-
-/**
- * A simple example that demonstrates the usage of {@link org.apache.storm.topology.IStatefulWindowedBolt} to
- * save the state of the windowing operation to avoid re-computation in case of failures.
- * <p>
- * The framework internally manages the window boundaries and does not invoke
- * {@link org.apache.storm.topology.IWindowedBolt#execute(TupleWindow)} for the already evaluated windows in case of restarts
- * during failures. The {@link org.apache.storm.topology.IStatefulBolt#initState(State)}
- * is invoked with the previously saved state of the bolt after prepare, before the execute() method is invoked.
- * </p>
- */
-public class StatefulWindowingTopology {
-    private static final Logger LOG = LoggerFactory.getLogger(StatefulWindowingTopology.class);
-
-    private static class WindowSumBolt extends BaseStatefulWindowedBolt<KeyValueState<String, Long>> {
-        private KeyValueState<String, Long> state;
-        private long sum;
-
-        private OutputCollector collector;
-
-        @Override
-        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-            this.collector = collector;
-        }
-
-        @Override
-        public void initState(KeyValueState<String, Long> state) {
-            this.state = state;
-            sum = state.get("sum", 0L);
-            LOG.debug("initState with state [" + state + "] current sum [" + sum + "]");
-        }
-
-        @Override
-        public void execute(TupleWindow inputWindow) {
-            for (Tuple tuple : inputWindow.get()) {
-                sum += tuple.getIntegerByField("value");
-            }
-            state.put("sum", sum);
-            collector.emit(new Values(sum));
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("sum"));
-        }
-    }
-
-    public static void main(String[] args) throws Exception {
-        TopologyBuilder builder = new TopologyBuilder();
-        builder.setSpout("spout", new RandomIntegerSpout());
-        builder.setBolt("sumbolt", new WindowSumBolt().withWindow(new Count(5), new Count(3))
-                .withMessageIdField("msgid"), 1).shuffleGrouping("spout");
-        builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("sumbolt");
-        Config conf = new Config();
-        conf.setDebug(false);
-        //conf.put(Config.TOPOLOGY_STATE_PROVIDER, "org.apache.storm.redis.state.RedisKeyValueStateProvider");
-        if (args != null && args.length > 0) {
-            conf.setNumWorkers(1);
-            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
-        } else {
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalTopology topo = cluster.submitTopology("test", conf, builder.createTopology());) {
-                Utils.sleep(40000);
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/pom.xml b/external/storm-cassandra/pom.xml
index 8fc882a..c4e341b 100644
--- a/external/storm-cassandra/pom.xml
+++ b/external/storm-cassandra/pom.xml
@@ -79,12 +79,19 @@
 
         <dependency>
             <groupId>org.apache.storm</groupId>
-            <artifactId>storm-core</artifactId>
+            <artifactId>storm-client</artifactId>
             <version>${project.version}</version>
             <scope>${provided.scope}</scope>
         </dependency>
 
         <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-server</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
             <version>${org.slf4j.version}</version>

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java
index ea7750f..7d34b16 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java
@@ -35,11 +35,9 @@ import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichBolt;
 import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.TupleUtils;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java
index 0f0de53..28e47bd 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java
@@ -19,6 +19,7 @@
 package org.apache.storm.cassandra.client;
 
 import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.ObjectReader;
 import com.datastax.driver.core.ConsistencyLevel;
 import com.datastax.driver.core.policies.DefaultRetryPolicy;
 import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
@@ -106,14 +107,14 @@ public class CassandraConf implements Serializable {
      */
     public CassandraConf(Map<String, Object> conf) {
 
-        this.username = (String)Utils.get(conf, CASSANDRA_USERNAME, null);
-        this.password = (String)Utils.get(conf, CASSANDRA_PASSWORD, null);
+        this.username = (String) Utils.get(conf, CASSANDRA_USERNAME, null);
+        this.password = (String) Utils.get(conf, CASSANDRA_PASSWORD, null);
         this.keyspace = get(conf, CASSANDRA_KEYSPACE);
-        this.consistencyLevel = ConsistencyLevel.valueOf((String)Utils.get(conf, CASSANDRA_CONSISTENCY_LEVEL, ConsistencyLevel.ONE.name()));
-        this.nodes    = ((String)Utils.get(conf, CASSANDRA_NODES, "localhost")).split(",");
-        this.batchSizeRows = Utils.getInt(conf.get(CASSANDRA_BATCH_SIZE_ROWS), 100);
-        this.port = Utils.getInt(conf.get(CASSANDRA_PORT), 9042);
-        this.retryPolicyName = (String)Utils.get(conf, CASSANDRA_RETRY_POLICY, DefaultRetryPolicy.class.getSimpleName());
+        this.consistencyLevel = ConsistencyLevel.valueOf((String) Utils.get(conf, CASSANDRA_CONSISTENCY_LEVEL, ConsistencyLevel.ONE.name()));
+        this.nodes    = ((String) Utils.get(conf, CASSANDRA_NODES, "localhost")).split(",");
+        this.batchSizeRows = ObjectReader.getInt(conf.get(CASSANDRA_BATCH_SIZE_ROWS), 100);
+        this.port = ObjectReader.getInt(conf.get(CASSANDRA_PORT), 9042);
+        this.retryPolicyName = (String) Utils.get(conf, CASSANDRA_RETRY_POLICY, DefaultRetryPolicy.class.getSimpleName());
         this.reconnectionPolicyBaseMs = getLong(conf.get(CASSANDRA_RECONNECT_POLICY_BASE_MS), 100L);
         this.reconnectionPolicyMaxMs  = getLong(conf.get(CASSANDRA_RECONNECT_POLICY_MAX_MS), TimeUnit.MINUTES.toMillis(1));
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-druid/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-druid/pom.xml b/external/storm-druid/pom.xml
index 3f9601c..6414ccc 100644
--- a/external/storm-druid/pom.xml
+++ b/external/storm-druid/pom.xml
@@ -32,11 +32,17 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.storm</groupId>
-            <artifactId>storm-core</artifactId>
+            <artifactId>storm-client</artifactId>
             <version>${project.version}</version>
             <scope>${provided.scope}</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-server</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>io.druid</groupId>
             <artifactId>tranquility-core_2.11</artifactId>
             <version>0.8.2</version>

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/pom.xml b/external/storm-elasticsearch/pom.xml
index c676179..bab0426 100644
--- a/external/storm-elasticsearch/pom.xml
+++ b/external/storm-elasticsearch/pom.xml
@@ -48,7 +48,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.storm</groupId>
-            <artifactId>storm-core</artifactId>
+            <artifactId>storm-client</artifactId>
             <version>${project.version}</version>
             <scope>${provided.scope}</scope>
         </dependency>

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-eventhubs/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/pom.xml b/external/storm-eventhubs/pom.xml
index 05d8c3b..7fe6071 100755
--- a/external/storm-eventhubs/pom.xml
+++ b/external/storm-eventhubs/pom.xml
@@ -58,13 +58,20 @@
         </dependency>
         <dependency>
             <groupId>org.apache.storm</groupId>
-            <artifactId>storm-core</artifactId>
+            <artifactId>storm-client</artifactId>
             <version>${project.version}</version>
             <!-- keep storm out of the jar-with-dependencies -->
             <type>jar</type>
             <scope>${provided.scope}</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-server</artifactId>
+            <version>${project.version}</version>
+            <type>jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.apache.curator</groupId>
             <artifactId>curator-framework</artifactId>
             <version>${curator.version}</version>

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/AtMostOnceEventCount.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/AtMostOnceEventCount.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/AtMostOnceEventCount.java
deleted file mode 100755
index b0dd33a..0000000
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/AtMostOnceEventCount.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.samples;
-
-import java.io.Serializable;
-
-import org.apache.storm.eventhubs.spout.EventHubSpout;
-import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
-import org.apache.storm.eventhubs.spout.IEventHubReceiver;
-import org.apache.storm.eventhubs.spout.IPartitionManager;
-import org.apache.storm.eventhubs.spout.IPartitionManagerFactory;
-import org.apache.storm.eventhubs.spout.IStateStore;
-import org.apache.storm.eventhubs.spout.SimplePartitionManager;
-
-public class AtMostOnceEventCount extends EventCount implements Serializable {
-  @Override
-  protected EventHubSpout createEventHubSpout() {
-    IPartitionManagerFactory pmFactory = new IPartitionManagerFactory() {
-      private static final long serialVersionUID = 1L;
-
-      @Override
-      public IPartitionManager create(EventHubSpoutConfig spoutConfig,
-          String partitionId, IStateStore stateStore,
-          IEventHubReceiver receiver) {
-        return new SimplePartitionManager(spoutConfig, partitionId,
-            stateStore, receiver);
-      }
-    };
-    EventHubSpout eventHubSpout = new EventHubSpout(
-        spoutConfig, null, pmFactory, null);
-    return eventHubSpout;
-  }
-  
-  public static void main(String[] args) throws Exception {
-    AtMostOnceEventCount scenario = new AtMostOnceEventCount();
-
-    scenario.runScenario(args);
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventCount.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventCount.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventCount.java
deleted file mode 100755
index ae15634..0000000
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventCount.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.samples;
-
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.topology.TopologyBuilder;
-
-import org.apache.storm.eventhubs.samples.bolt.GlobalCountBolt;
-import org.apache.storm.eventhubs.samples.bolt.PartialCountBolt;
-import org.apache.storm.eventhubs.spout.EventHubSpout;
-import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
-
-import java.io.FileReader;
-import java.util.Properties;
-
-/**
- * The basic scenario topology that uses EventHubSpout with PartialCountBolt
- * and GlobalCountBolt.
- * To submit this topology:
- * storm jar {jarfile} {classname} {topologyname} {spoutconffile}
- */
-public class EventCount {
-  protected EventHubSpoutConfig spoutConfig;
-  protected int numWorkers;
-  
-  public EventCount() {
-  }
-  
-  protected void readEHConfig(String[] args) throws Exception {
-	  Properties properties = new Properties();
-    if(args.length > 1) {
-      properties.load(new FileReader(args[1]));
-    }
-    else {
-      properties.load(EventCount.class.getClassLoader().getResourceAsStream(
-          "Config.properties"));
-    }
-
-    String username = properties.getProperty("eventhubspout.username");
-    String password = properties.getProperty("eventhubspout.password");
-    String namespaceName = properties.getProperty("eventhubspout.namespace");
-    String entityPath = properties.getProperty("eventhubspout.entitypath");
-    String targetFqnAddress = properties.getProperty("eventhubspout.targetfqnaddress");
-    String zkEndpointAddress = properties.getProperty("zookeeper.connectionstring");
-    int partitionCount = Integer.parseInt(properties.getProperty("eventhubspout.partitions.count"));
-    int checkpointIntervalInSeconds = Integer.parseInt(properties.getProperty("eventhubspout.checkpoint.interval"));
-    int receiverCredits = Integer.parseInt(properties.getProperty("eventhub.receiver.credits"));
-    String maxPendingMsgsPerPartitionStr = properties.getProperty("eventhubspout.max.pending.messages.per.partition");
-    if(maxPendingMsgsPerPartitionStr == null) {
-      maxPendingMsgsPerPartitionStr = "1024";
-    }
-    int maxPendingMsgsPerPartition = Integer.parseInt(maxPendingMsgsPerPartitionStr);
-    String enqueueTimeDiffStr = properties.getProperty("eventhub.receiver.filter.timediff");
-    if(enqueueTimeDiffStr == null) {
-      enqueueTimeDiffStr = "0";
-    }
-    int enqueueTimeDiff = Integer.parseInt(enqueueTimeDiffStr);
-    long enqueueTimeFilter = 0;
-    if(enqueueTimeDiff != 0) {
-      enqueueTimeFilter = System.currentTimeMillis() - enqueueTimeDiff*1000;
-    }
-    String consumerGroupName = properties.getProperty("eventhubspout.consumer.group.name");
-    
-    System.out.println("Eventhub spout config: ");
-    System.out.println("  partition count: " + partitionCount);
-    System.out.println("  checkpoint interval: " + checkpointIntervalInSeconds);
-    System.out.println("  receiver credits: " + receiverCredits);
-    spoutConfig = new EventHubSpoutConfig(username, password,
-      namespaceName, entityPath, partitionCount, zkEndpointAddress,
-      checkpointIntervalInSeconds, receiverCredits, maxPendingMsgsPerPartition,
-      enqueueTimeFilter);
-
-    if(targetFqnAddress != null)
-    {
-      spoutConfig.setTargetAddress(targetFqnAddress);      
-    }
-    spoutConfig.setConsumerGroupName(consumerGroupName);
-
-    //set the number of workers to be the same as partition number.
-    //the idea is to have a spout and a partial count bolt co-exist in one
-    //worker to avoid shuffling messages across workers in storm cluster.
-    numWorkers = spoutConfig.getPartitionCount();
-    
-    if(args.length > 0) {
-      //set topology name so that sample Trident topology can use it as stream name.
-      spoutConfig.setTopologyName(args[0]);
-    }
-	}
-  
-  protected EventHubSpout createEventHubSpout() {
-    EventHubSpout eventHubSpout = new EventHubSpout(spoutConfig);
-    return eventHubSpout;
-  }
-	
-  protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
-    TopologyBuilder topologyBuilder = new TopologyBuilder();
-
-    topologyBuilder.setSpout("EventHubsSpout", eventHubSpout, spoutConfig.getPartitionCount())
-      .setNumTasks(spoutConfig.getPartitionCount());
-    topologyBuilder.setBolt("PartialCountBolt", new PartialCountBolt(), spoutConfig.getPartitionCount())
-      .localOrShuffleGrouping("EventHubsSpout").setNumTasks(spoutConfig.getPartitionCount());
-    topologyBuilder.setBolt("GlobalCountBolt", new GlobalCountBolt(), 1)
-      .globalGrouping("PartialCountBolt").setNumTasks(1);
-    return topologyBuilder.createTopology();
-  }
-	
-  protected void submitTopology(String[] args, StormTopology topology) throws Exception {
-	  Config config = new Config();
-    config.setDebug(false);
-    //Enable metrics
-    config.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class, 1);
-
-    
-	  if (args != null && args.length > 0) {
-      config.setNumWorkers(numWorkers);
-      StormSubmitter.submitTopology(args[0], config, topology);
-    } else {
-      config.setMaxTaskParallelism(2);
-
-      try (LocalCluster localCluster = new LocalCluster();
-           LocalTopology topo = localCluster.submitTopology("test", config, topology);) {
-        Thread.sleep(5000000);
-      }
-    }
-  }
-  
-  protected void runScenario(String[] args) throws Exception{
-    readEHConfig(args);
-    EventHubSpout eventHubSpout = createEventHubSpout();
-    StormTopology topology = buildTopology(eventHubSpout);
-    submitTopology(args, topology);
-  }
-
-  public static void main(String[] args) throws Exception {
-    EventCount scenario = new EventCount();
-    scenario.runScenario(args);
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventHubLoop.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
deleted file mode 100755
index 665fef9..0000000
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.samples;
-
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.topology.TopologyBuilder;
-
-import org.apache.storm.eventhubs.bolt.EventHubBolt;
-import org.apache.storm.eventhubs.bolt.EventHubBoltConfig;
-import org.apache.storm.eventhubs.spout.EventHubSpout;
-
-/**
- * A sample topology that loops message back to EventHub
- */
-public class EventHubLoop extends EventCount {
-
-  @Override
-  protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
-    TopologyBuilder topologyBuilder = new TopologyBuilder();
-
-    topologyBuilder.setSpout("EventHubsSpout", eventHubSpout, spoutConfig.getPartitionCount())
-      .setNumTasks(spoutConfig.getPartitionCount());
-    EventHubBoltConfig boltConfig = new EventHubBoltConfig(spoutConfig.getConnectionString(),
-        spoutConfig.getEntityPath(), true);
-    
-    EventHubBolt eventHubBolt = new EventHubBolt(boltConfig);
-    int boltTasks = spoutConfig.getPartitionCount();
-    topologyBuilder.setBolt("EventHubsBolt", eventHubBolt, boltTasks)
-      .localOrShuffleGrouping("EventHubsSpout").setNumTasks(boltTasks);
-    return topologyBuilder.createTopology();
-  }
-  
-  public static void main(String[] args) throws Exception {
-    EventHubLoop scenario = new EventHubLoop();
-    scenario.runScenario(args);
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java
deleted file mode 100755
index e8538c1..0000000
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.samples;
-
-import org.apache.storm.trident.TridentState;
-import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.operation.builtin.Count;
-import org.apache.storm.trident.operation.builtin.Sum;
-import org.apache.storm.trident.testing.MemoryMapState;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.tuple.Fields;
-
-import org.apache.storm.eventhubs.samples.TransactionalTridentEventCount.LoggingFilter;
-import org.apache.storm.eventhubs.spout.EventHubSpout;
-import org.apache.storm.eventhubs.trident.OpaqueTridentEventHubSpout;
-
-/**
- * A simple Trident topology uses OpaqueTridentEventHubSpout
- */
-public class OpaqueTridentEventCount extends EventCount {
-  @Override
-  protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
-    TridentTopology topology = new TridentTopology();
-    
-    OpaqueTridentEventHubSpout spout = new OpaqueTridentEventHubSpout(spoutConfig);
-    TridentState state = topology.newStream("stream-" + spoutConfig.getTopologyName(), spout)
-        .parallelismHint(spoutConfig.getPartitionCount())
-        .aggregate(new Count(), new Fields("partial-count"))
-        .persistentAggregate(new MemoryMapState.Factory(), new Fields("partial-count"), new Sum(), new Fields("count"));
-    state.newValuesStream().each(new Fields("count"), new LoggingFilter("got count: ", 10000));
-    return topology.build();
-  }
-  
-  public static void main(String[] args) throws Exception {
-    OpaqueTridentEventCount scenario = new OpaqueTridentEventCount();
-    scenario.runScenario(args);
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java
deleted file mode 100755
index 0a5295f..0000000
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.samples;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.tuple.Fields;
-
-import org.apache.storm.eventhubs.spout.EventHubSpout;
-import org.apache.storm.eventhubs.trident.TransactionalTridentEventHubSpout;
-
-import org.apache.storm.trident.TridentState;
-import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.operation.BaseFilter;
-import org.apache.storm.trident.operation.builtin.Count;
-import org.apache.storm.trident.operation.builtin.Sum;
-import org.apache.storm.trident.testing.MemoryMapState;
-import org.apache.storm.trident.tuple.TridentTuple;
-
-/**
- * A simple Trident topology uses TransactionalTridentEventHubSpout
- */
-public class TransactionalTridentEventCount extends EventCount {
-  public static class LoggingFilter extends BaseFilter {
-    private static final long serialVersionUID = 1L;
-    private static final Logger logger = LoggerFactory.getLogger(LoggingFilter.class);
-    private final String prefix;
-    private final long logIntervalMs;
-    private long lastTime;
-    public LoggingFilter(String prefix, int logIntervalMs) {
-      this.prefix = prefix;
-      this.logIntervalMs = logIntervalMs;
-      lastTime = System.nanoTime();
-    }
-
-    @Override
-    public boolean isKeep(TridentTuple tuple) {
-      long now = System.nanoTime();
-      if(logIntervalMs < (now - lastTime) / 1000000) {
-        logger.info(prefix + tuple.toString());
-        lastTime = now;
-      }
-      return false;
-    }
-  }
-  
-  @Override
-  protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
-    TridentTopology topology = new TridentTopology();
-    
-    TransactionalTridentEventHubSpout spout = new TransactionalTridentEventHubSpout(spoutConfig);
-    TridentState state = topology.newStream("stream-" + spoutConfig.getTopologyName(), spout)
-        .parallelismHint(spoutConfig.getPartitionCount())
-        .aggregate(new Count(), new Fields("partial-count"))
-        .persistentAggregate(new MemoryMapState.Factory(), new Fields("partial-count"), new Sum(), new Fields("count"));
-    state.newValuesStream().each(new Fields("count"), new LoggingFilter("got count: ", 10000));
-    return topology.build();
-  }
-  
-  public static void main(String[] args) throws Exception {
-    TransactionalTridentEventCount scenario = new TransactionalTridentEventCount();
-    scenario.runScenario(args);
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java
deleted file mode 100755
index bc9219e..0000000
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.samples.bolt;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.storm.utils.TupleUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.storm.Config;
-import org.apache.storm.metric.api.IMetric;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseBasicBolt;
-import org.apache.storm.tuple.Tuple;
-
-/**
- * Globally count number of messages
- */
-public class GlobalCountBolt extends BaseBasicBolt {
-  private static final long serialVersionUID = 1L;
-  private static final Logger logger = LoggerFactory
-      .getLogger(GlobalCountBolt.class);
-  private long globalCount;
-  private long globalCountDiff;
-  private long lastMetricsTime;
-  private long throughput;
-  
-  @Override
-  public void prepare(Map config, TopologyContext context) {
-    globalCount = 0;
-    globalCountDiff = 0;
-    lastMetricsTime = System.nanoTime();
-    context.registerMetric("GlobalMessageCount", new IMetric() {
-      @Override
-      public Object getValueAndReset() {
-        long now = System.nanoTime();
-        long millis = (now - lastMetricsTime) / 1000000;
-        throughput = globalCountDiff / millis * 1000;
-        Map values = new HashMap();
-        values.put("global_count", globalCount);
-        values.put("throughput", throughput);
-        lastMetricsTime = now;
-        globalCountDiff = 0;
-        return values;
-      }
-  }, (Integer)config.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS));
-  }
-
-  @Override
-  public void execute(Tuple tuple, BasicOutputCollector collector) {
-    if (TupleUtils.isTick(tuple)) {
-      return;
-    }
-
-    int partial = (Integer)tuple.getValueByField("partial_count");
-    globalCount += partial;
-    globalCountDiff += partial;
-    if((globalCountDiff == partial) && (globalCount != globalCountDiff)) {
-      //metrics has just been collected, let's also log it
-      logger.info("Current throughput (messages/second): " + throughput);
-    }
-  }
-
-  @Override
-  public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java
deleted file mode 100755
index eaf2b65..0000000
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.samples.bolt;
-
-import java.util.Map;
-
-import org.apache.storm.utils.TupleUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseBasicBolt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-
-/**
- * Partially count number of messages from EventHubs
- */
-public class PartialCountBolt extends BaseBasicBolt {
-  private static final long serialVersionUID = 1L;
-  private static final Logger logger = LoggerFactory
-      .getLogger(PartialCountBolt.class);
-  private static final int PartialCountBatchSize = 1000; 
-  
-  private int partialCount;
-  
-  @Override
-  public void prepare(Map stormConf, TopologyContext context) {
-    partialCount = 0;
-  }
-  
-  @Override
-  public void execute(Tuple tuple, BasicOutputCollector collector) {
-    if (TupleUtils.isTick(tuple)) {
-      return;
-    }
-
-    partialCount++;
-    if(partialCount == PartialCountBatchSize) {
-      collector.emit(new Values(PartialCountBatchSize));
-      partialCount = 0;
-    }
-  }
-
-  @Override
-  public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    declarer.declare(new Fields("partial_count"));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/AtMostOnceEventCount.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/AtMostOnceEventCount.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/AtMostOnceEventCount.java
new file mode 100755
index 0000000..b0dd33a
--- /dev/null
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/AtMostOnceEventCount.java
@@ -0,0 +1,54 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples;
+
+import java.io.Serializable;
+
+import org.apache.storm.eventhubs.spout.EventHubSpout;
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+import org.apache.storm.eventhubs.spout.IEventHubReceiver;
+import org.apache.storm.eventhubs.spout.IPartitionManager;
+import org.apache.storm.eventhubs.spout.IPartitionManagerFactory;
+import org.apache.storm.eventhubs.spout.IStateStore;
+import org.apache.storm.eventhubs.spout.SimplePartitionManager;
+
+public class AtMostOnceEventCount extends EventCount implements Serializable {
+  @Override
+  protected EventHubSpout createEventHubSpout() {
+    IPartitionManagerFactory pmFactory = new IPartitionManagerFactory() {
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public IPartitionManager create(EventHubSpoutConfig spoutConfig,
+          String partitionId, IStateStore stateStore,
+          IEventHubReceiver receiver) {
+        return new SimplePartitionManager(spoutConfig, partitionId,
+            stateStore, receiver);
+      }
+    };
+    EventHubSpout eventHubSpout = new EventHubSpout(
+        spoutConfig, null, pmFactory, null);
+    return eventHubSpout;
+  }
+  
+  public static void main(String[] args) throws Exception {
+    AtMostOnceEventCount scenario = new AtMostOnceEventCount();
+
+    scenario.runScenario(args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventCount.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventCount.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventCount.java
new file mode 100755
index 0000000..ae15634
--- /dev/null
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventCount.java
@@ -0,0 +1,157 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples;
+
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.TopologyBuilder;
+
+import org.apache.storm.eventhubs.samples.bolt.GlobalCountBolt;
+import org.apache.storm.eventhubs.samples.bolt.PartialCountBolt;
+import org.apache.storm.eventhubs.spout.EventHubSpout;
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+
+import java.io.FileReader;
+import java.util.Properties;
+
+/**
+ * The basic scenario topology that uses EventHubSpout with PartialCountBolt
+ * and GlobalCountBolt.
+ * To submit this topology:
+ * storm jar {jarfile} {classname} {topologyname} {spoutconffile}
+ */
+public class EventCount {
+  protected EventHubSpoutConfig spoutConfig;
+  protected int numWorkers;
+  
+  public EventCount() {
+  }
+  
+  protected void readEHConfig(String[] args) throws Exception {
+	  Properties properties = new Properties();
+    if(args.length > 1) {
+      properties.load(new FileReader(args[1]));
+    }
+    else {
+      properties.load(EventCount.class.getClassLoader().getResourceAsStream(
+          "Config.properties"));
+    }
+
+    String username = properties.getProperty("eventhubspout.username");
+    String password = properties.getProperty("eventhubspout.password");
+    String namespaceName = properties.getProperty("eventhubspout.namespace");
+    String entityPath = properties.getProperty("eventhubspout.entitypath");
+    String targetFqnAddress = properties.getProperty("eventhubspout.targetfqnaddress");
+    String zkEndpointAddress = properties.getProperty("zookeeper.connectionstring");
+    int partitionCount = Integer.parseInt(properties.getProperty("eventhubspout.partitions.count"));
+    int checkpointIntervalInSeconds = Integer.parseInt(properties.getProperty("eventhubspout.checkpoint.interval"));
+    int receiverCredits = Integer.parseInt(properties.getProperty("eventhub.receiver.credits"));
+    String maxPendingMsgsPerPartitionStr = properties.getProperty("eventhubspout.max.pending.messages.per.partition");
+    if(maxPendingMsgsPerPartitionStr == null) {
+      maxPendingMsgsPerPartitionStr = "1024";
+    }
+    int maxPendingMsgsPerPartition = Integer.parseInt(maxPendingMsgsPerPartitionStr);
+    String enqueueTimeDiffStr = properties.getProperty("eventhub.receiver.filter.timediff");
+    if(enqueueTimeDiffStr == null) {
+      enqueueTimeDiffStr = "0";
+    }
+    int enqueueTimeDiff = Integer.parseInt(enqueueTimeDiffStr);
+    long enqueueTimeFilter = 0;
+    if(enqueueTimeDiff != 0) {
+      enqueueTimeFilter = System.currentTimeMillis() - enqueueTimeDiff*1000;
+    }
+    String consumerGroupName = properties.getProperty("eventhubspout.consumer.group.name");
+    
+    System.out.println("Eventhub spout config: ");
+    System.out.println("  partition count: " + partitionCount);
+    System.out.println("  checkpoint interval: " + checkpointIntervalInSeconds);
+    System.out.println("  receiver credits: " + receiverCredits);
+    spoutConfig = new EventHubSpoutConfig(username, password,
+      namespaceName, entityPath, partitionCount, zkEndpointAddress,
+      checkpointIntervalInSeconds, receiverCredits, maxPendingMsgsPerPartition,
+      enqueueTimeFilter);
+
+    if(targetFqnAddress != null)
+    {
+      spoutConfig.setTargetAddress(targetFqnAddress);      
+    }
+    spoutConfig.setConsumerGroupName(consumerGroupName);
+
+    //set the number of workers to be the same as partition number.
+    //the idea is to have a spout and a partial count bolt co-exist in one
+    //worker to avoid shuffling messages across workers in storm cluster.
+    numWorkers = spoutConfig.getPartitionCount();
+    
+    if(args.length > 0) {
+      //set topology name so that sample Trident topology can use it as stream name.
+      spoutConfig.setTopologyName(args[0]);
+    }
+	}
+  
+  protected EventHubSpout createEventHubSpout() {
+    EventHubSpout eventHubSpout = new EventHubSpout(spoutConfig);
+    return eventHubSpout;
+  }
+	
+  protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
+    TopologyBuilder topologyBuilder = new TopologyBuilder();
+
+    topologyBuilder.setSpout("EventHubsSpout", eventHubSpout, spoutConfig.getPartitionCount())
+      .setNumTasks(spoutConfig.getPartitionCount());
+    topologyBuilder.setBolt("PartialCountBolt", new PartialCountBolt(), spoutConfig.getPartitionCount())
+      .localOrShuffleGrouping("EventHubsSpout").setNumTasks(spoutConfig.getPartitionCount());
+    topologyBuilder.setBolt("GlobalCountBolt", new GlobalCountBolt(), 1)
+      .globalGrouping("PartialCountBolt").setNumTasks(1);
+    return topologyBuilder.createTopology();
+  }
+	
+  protected void submitTopology(String[] args, StormTopology topology) throws Exception {
+	  Config config = new Config();
+    config.setDebug(false);
+    //Enable metrics
+    config.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class, 1);
+
+    
+	  if (args != null && args.length > 0) {
+      config.setNumWorkers(numWorkers);
+      StormSubmitter.submitTopology(args[0], config, topology);
+    } else {
+      config.setMaxTaskParallelism(2);
+
+      try (LocalCluster localCluster = new LocalCluster();
+           LocalTopology topo = localCluster.submitTopology("test", config, topology);) {
+        Thread.sleep(5000000);
+      }
+    }
+  }
+  
+  protected void runScenario(String[] args) throws Exception{
+    readEHConfig(args);
+    EventHubSpout eventHubSpout = createEventHubSpout();
+    StormTopology topology = buildTopology(eventHubSpout);
+    submitTopology(args, topology);
+  }
+
+  public static void main(String[] args) throws Exception {
+    EventCount scenario = new EventCount();
+    scenario.runScenario(args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventHubLoop.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
new file mode 100755
index 0000000..665fef9
--- /dev/null
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
@@ -0,0 +1,52 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples;
+
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.TopologyBuilder;
+
+import org.apache.storm.eventhubs.bolt.EventHubBolt;
+import org.apache.storm.eventhubs.bolt.EventHubBoltConfig;
+import org.apache.storm.eventhubs.spout.EventHubSpout;
+
+/**
+ * A sample topology that loops message back to EventHub
+ */
+public class EventHubLoop extends EventCount {
+
+  @Override
+  protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
+    TopologyBuilder topologyBuilder = new TopologyBuilder();
+
+    topologyBuilder.setSpout("EventHubsSpout", eventHubSpout, spoutConfig.getPartitionCount())
+      .setNumTasks(spoutConfig.getPartitionCount());
+    EventHubBoltConfig boltConfig = new EventHubBoltConfig(spoutConfig.getConnectionString(),
+        spoutConfig.getEntityPath(), true);
+    
+    EventHubBolt eventHubBolt = new EventHubBolt(boltConfig);
+    int boltTasks = spoutConfig.getPartitionCount();
+    topologyBuilder.setBolt("EventHubsBolt", eventHubBolt, boltTasks)
+      .localOrShuffleGrouping("EventHubsSpout").setNumTasks(boltTasks);
+    return topologyBuilder.createTopology();
+  }
+  
+  public static void main(String[] args) throws Exception {
+    EventHubLoop scenario = new EventHubLoop();
+    scenario.runScenario(args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java
new file mode 100755
index 0000000..e8538c1
--- /dev/null
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java
@@ -0,0 +1,53 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples;
+
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.builtin.Count;
+import org.apache.storm.trident.operation.builtin.Sum;
+import org.apache.storm.trident.testing.MemoryMapState;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.tuple.Fields;
+
+import org.apache.storm.eventhubs.samples.TransactionalTridentEventCount.LoggingFilter;
+import org.apache.storm.eventhubs.spout.EventHubSpout;
+import org.apache.storm.eventhubs.trident.OpaqueTridentEventHubSpout;
+
+/**
+ * A simple Trident topology uses OpaqueTridentEventHubSpout
+ */
+public class OpaqueTridentEventCount extends EventCount {
+  @Override
+  protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
+    TridentTopology topology = new TridentTopology();
+    
+    OpaqueTridentEventHubSpout spout = new OpaqueTridentEventHubSpout(spoutConfig);
+    TridentState state = topology.newStream("stream-" + spoutConfig.getTopologyName(), spout)
+        .parallelismHint(spoutConfig.getPartitionCount())
+        .aggregate(new Count(), new Fields("partial-count"))
+        .persistentAggregate(new MemoryMapState.Factory(), new Fields("partial-count"), new Sum(), new Fields("count"));
+    state.newValuesStream().each(new Fields("count"), new LoggingFilter("got count: ", 10000));
+    return topology.build();
+  }
+  
+  public static void main(String[] args) throws Exception {
+    OpaqueTridentEventCount scenario = new OpaqueTridentEventCount();
+    scenario.runScenario(args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java
new file mode 100755
index 0000000..0a5295f
--- /dev/null
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java
@@ -0,0 +1,81 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.tuple.Fields;
+
+import org.apache.storm.eventhubs.spout.EventHubSpout;
+import org.apache.storm.eventhubs.trident.TransactionalTridentEventHubSpout;
+
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.BaseFilter;
+import org.apache.storm.trident.operation.builtin.Count;
+import org.apache.storm.trident.operation.builtin.Sum;
+import org.apache.storm.trident.testing.MemoryMapState;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+/**
+ * A simple Trident topology uses TransactionalTridentEventHubSpout
+ */
+public class TransactionalTridentEventCount extends EventCount {
+  public static class LoggingFilter extends BaseFilter {
+    private static final long serialVersionUID = 1L;
+    private static final Logger logger = LoggerFactory.getLogger(LoggingFilter.class);
+    private final String prefix;
+    private final long logIntervalMs;
+    private long lastTime;
+    public LoggingFilter(String prefix, int logIntervalMs) {
+      this.prefix = prefix;
+      this.logIntervalMs = logIntervalMs;
+      lastTime = System.nanoTime();
+    }
+
+    @Override
+    public boolean isKeep(TridentTuple tuple) {
+      long now = System.nanoTime();
+      if(logIntervalMs < (now - lastTime) / 1000000) {
+        logger.info(prefix + tuple.toString());
+        lastTime = now;
+      }
+      return false;
+    }
+  }
+  
+  @Override
+  protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
+    TridentTopology topology = new TridentTopology();
+    
+    TransactionalTridentEventHubSpout spout = new TransactionalTridentEventHubSpout(spoutConfig);
+    TridentState state = topology.newStream("stream-" + spoutConfig.getTopologyName(), spout)
+        .parallelismHint(spoutConfig.getPartitionCount())
+        .aggregate(new Count(), new Fields("partial-count"))
+        .persistentAggregate(new MemoryMapState.Factory(), new Fields("partial-count"), new Sum(), new Fields("count"));
+    state.newValuesStream().each(new Fields("count"), new LoggingFilter("got count: ", 10000));
+    return topology.build();
+  }
+  
+  public static void main(String[] args) throws Exception {
+    TransactionalTridentEventCount scenario = new TransactionalTridentEventCount();
+    scenario.runScenario(args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java
new file mode 100755
index 0000000..bc9219e
--- /dev/null
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java
@@ -0,0 +1,88 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples.bolt;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.storm.utils.TupleUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.Config;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Tuple;
+
+/**
+ * Globally count number of messages
+ */
+public class GlobalCountBolt extends BaseBasicBolt {
+  private static final long serialVersionUID = 1L;
+  private static final Logger logger = LoggerFactory
+      .getLogger(GlobalCountBolt.class);
+  private long globalCount;
+  private long globalCountDiff;
+  private long lastMetricsTime;
+  private long throughput;
+  
+  @Override
+  public void prepare(Map config, TopologyContext context) {
+    globalCount = 0;
+    globalCountDiff = 0;
+    lastMetricsTime = System.nanoTime();
+    context.registerMetric("GlobalMessageCount", new IMetric() {
+      @Override
+      public Object getValueAndReset() {
+        long now = System.nanoTime();
+        long millis = (now - lastMetricsTime) / 1000000;
+        throughput = globalCountDiff / millis * 1000;
+        Map values = new HashMap();
+        values.put("global_count", globalCount);
+        values.put("throughput", throughput);
+        lastMetricsTime = now;
+        globalCountDiff = 0;
+        return values;
+      }
+  }, (Integer)config.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS));
+  }
+
+  @Override
+  public void execute(Tuple tuple, BasicOutputCollector collector) {
+    if (TupleUtils.isTick(tuple)) {
+      return;
+    }
+
+    int partial = (Integer)tuple.getValueByField("partial_count");
+    globalCount += partial;
+    globalCountDiff += partial;
+    if((globalCountDiff == partial) && (globalCount != globalCountDiff)) {
+      //metrics has just been collected, let's also log it
+      logger.info("Current throughput (messages/second): " + throughput);
+    }
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java
new file mode 100755
index 0000000..eaf2b65
--- /dev/null
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java
@@ -0,0 +1,68 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples.bolt;
+
+import java.util.Map;
+
+import org.apache.storm.utils.TupleUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+/**
+ * Partially count number of messages from EventHubs
+ */
+public class PartialCountBolt extends BaseBasicBolt {
+  private static final long serialVersionUID = 1L;
+  private static final Logger logger = LoggerFactory
+      .getLogger(PartialCountBolt.class);
+  private static final int PartialCountBatchSize = 1000; 
+  
+  private int partialCount;
+  
+  @Override
+  public void prepare(Map stormConf, TopologyContext context) {
+    partialCount = 0;
+  }
+  
+  @Override
+  public void execute(Tuple tuple, BasicOutputCollector collector) {
+    if (TupleUtils.isTick(tuple)) {
+      return;
+    }
+
+    partialCount++;
+    if(partialCount == PartialCountBatchSize) {
+      collector.emit(new Values(PartialCountBatchSize));
+      partialCount = 0;
+    }
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    declarer.declare(new Fields("partial_count"));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-hbase/pom.xml b/external/storm-hbase/pom.xml
index 3d6aa0c..fd6bfe5 100644
--- a/external/storm-hbase/pom.xml
+++ b/external/storm-hbase/pom.xml
@@ -44,7 +44,7 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.storm</groupId>
-            <artifactId>storm-core</artifactId>
+            <artifactId>storm-client</artifactId>
             <version>${project.version}</version>
             <scope>${provided.scope}</scope>
         </dependency>

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/pom.xml b/external/storm-hdfs/pom.xml
index 281732e..8b3a792 100644
--- a/external/storm-hdfs/pom.xml
+++ b/external/storm-hdfs/pom.xml
@@ -45,7 +45,7 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.storm</groupId>
-            <artifactId>storm-core</artifactId>
+            <artifactId>storm-client</artifactId>
             <version>${project.version}</version>
             <scope>${provided.scope}</scope>
             <exclusions>
@@ -57,9 +57,10 @@
                 </exclusion>
             </exclusions>
         </dependency>
+
         <dependency>
             <groupId>org.apache.storm</groupId>
-            <artifactId>storm-core</artifactId>
+            <artifactId>storm-client</artifactId>
             <version>${project.version}</version>
             <type>test-jar</type>
             <scope>test</scope>
@@ -72,6 +73,7 @@
                 </exclusion>
             </exclusions>
         </dependency>
+
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-client</artifactId>

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
index a4c88ce..9acae35 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
@@ -19,7 +19,7 @@ package org.apache.storm.hdfs.blobstore;
 
 import org.apache.storm.Config;
 import org.apache.storm.blobstore.BlobStoreFile;
-import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.ObjectReader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -139,7 +139,7 @@ public class HdfsBlobStoreImpl {
         }
 
         Object shouldCleanup = conf.get(Config.BLOBSTORE_CLEANUP_ENABLE);
-        if (Utils.getBoolean(shouldCleanup, false)) {
+        if (ObjectReader.getBoolean(shouldCleanup, false)) {
             LOG.debug("Starting hdfs blobstore cleaner");
             _cleanup = new TimerTask() {
                 @Override