You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2018/03/23 17:10:04 UTC

[1/5] storm git commit: set '* text=auto' in .gitattributes in order to avoid merge work because of line feed changes

Repository: storm
Updated Branches:
  refs/heads/master 0fdad2c0f -> adf4efb71


http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateUpdater.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateUpdater.java
index 77e00ed..41311b3 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateUpdater.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateUpdater.java
@@ -1,35 +1,35 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.trident.state;
-
-import java.util.List;
-
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.state.BaseStateUpdater;
-import org.apache.storm.trident.tuple.TridentTuple;
-
-public class MongoStateUpdater extends BaseStateUpdater<MongoState>  {
-
-    @Override
-    public void updateState(MongoState state, List<TridentTuple> tuples,
-            TridentCollector collector) {
-        state.updateState(tuples, collector);
-    }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.trident.state;
+
+import java.util.List;
+
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.BaseStateUpdater;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+public class MongoStateUpdater extends BaseStateUpdater<MongoState>  {
+
+    @Override
+    public void updateState(MongoState state, List<TridentTuple> tuples,
+            TridentCollector collector) {
+        state.updateState(tuples, collector);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/integration-test/README.md
----------------------------------------------------------------------
diff --git a/integration-test/README.md b/integration-test/README.md
index 8805f33..1668e66 100644
--- a/integration-test/README.md
+++ b/integration-test/README.md
@@ -1,59 +1,59 @@
-End to end storm integration tests
-==================================
-
-Running tests end-to-end
-------------------------
-Assumption:
-A single version of storm binary zip such as `storm-dist/binary/target/apache-storm-2.0.0-SNAPSHOT.zip` is present
-The following command will bring up a vagrant cluster.
-```sh
-cd integration-test/config
-vagrant up
-```
-This automatically will run `integration-test/run-it.sh`.
-This brings up a vagrant machine, with storm and zookeeper daemons.
-And runs all the tests against it.
-
-Running tests for development & debugging
-=========================================
-```vagrant up``` command is setup as a complete auto-pilot.
-Following describes how we can run individual tests against this vagrant cluster or any other cluster.
-
-Configs for running
--------------------
-The supplied configuration will run tests against vagrant setup. However, it can be changed to use a different cluster.
-Change `integration-test/src/test/resources/storm.yaml` as necessary.
-
-Running all tests manually
---------------------------
-To run all tests:
-```sh
-mvn clean package -DskipTests && mvn test
-```
-
-To run a single test:
-```sh
-mvn clean package -DskipTests && mvn test -Dtest=SlidingWindowCountTest
-```
-
-Running tests from IDE
-----------------------
-You might have to enable intellij profile to make your IDE happy.
-Make sure that the following is run before tests are launched.
-```sh
-mvn package -DskipTests
-```
-
-Running tests with custom storm version
----------------------------------------
-You can supply custom storm version using `-Dstorm.version=<storm-version>` property to all the maven commands.
-```sh
-mvn clean package -DskipTests -Dstorm.version=<storm-version>
-mvn test -Dtest=DemoTest -Dstorm.version=<storm-version>
-```
-
-To find version of the storm that you are running run `storm version` command.
-
-Code
-----
-Start off by looking at file [DemoTest.java](https://github.com/apache/storm/blob/master/integration-test/src/test/java/org/apache/storm/st/DemoTest.java).
+End to end storm integration tests
+==================================
+
+Running tests end-to-end
+------------------------
+Assumption:
+A single version of storm binary zip such as `storm-dist/binary/target/apache-storm-2.0.0-SNAPSHOT.zip` is present
+The following command will bring up a vagrant cluster.
+```sh
+cd integration-test/config
+vagrant up
+```
+This automatically will run `integration-test/run-it.sh`.
+This brings up a vagrant machine, with storm and zookeeper daemons.
+And runs all the tests against it.
+
+Running tests for development & debugging
+=========================================
+```vagrant up``` command is setup as a complete auto-pilot.
+Following describes how we can run individual tests against this vagrant cluster or any other cluster.
+
+Configs for running
+-------------------
+The supplied configuration will run tests against vagrant setup. However, it can be changed to use a different cluster.
+Change `integration-test/src/test/resources/storm.yaml` as necessary.
+
+Running all tests manually
+--------------------------
+To run all tests:
+```sh
+mvn clean package -DskipTests && mvn test
+```
+
+To run a single test:
+```sh
+mvn clean package -DskipTests && mvn test -Dtest=SlidingWindowCountTest
+```
+
+Running tests from IDE
+----------------------
+You might have to enable intellij profile to make your IDE happy.
+Make sure that the following is run before tests are launched.
+```sh
+mvn package -DskipTests
+```
+
+Running tests with custom storm version
+---------------------------------------
+You can supply custom storm version using `-Dstorm.version=<storm-version>` property to all the maven commands.
+```sh
+mvn clean package -DskipTests -Dstorm.version=<storm-version>
+mvn test -Dtest=DemoTest -Dstorm.version=<storm-version>
+```
+
+To find version of the storm that you are running run `storm version` command.
+
+Code
+----
+Start off by looking at file [DemoTest.java](https://github.com/apache/storm/blob/master/integration-test/src/test/java/org/apache/storm/st/DemoTest.java).

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/storm-client/test/jvm/org/apache/storm/trident/TestTridentTopology.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/trident/TestTridentTopology.java b/storm-client/test/jvm/org/apache/storm/trident/TestTridentTopology.java
index 7adc8a0..b0303dc 100644
--- a/storm-client/test/jvm/org/apache/storm/trident/TestTridentTopology.java
+++ b/storm-client/test/jvm/org/apache/storm/trident/TestTridentTopology.java
@@ -1,76 +1,76 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.trident;
-
-import org.apache.storm.generated.Bolt;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-import org.junit.Assert;
-import org.junit.Test;
-import org.apache.storm.trident.operation.builtin.Count;
-import org.apache.storm.trident.operation.builtin.Sum;
-import org.apache.storm.trident.testing.FixedBatchSpout;
-import org.apache.storm.trident.testing.Split;
-import org.apache.storm.trident.testing.StringLength;
-
-import java.util.Map;
-import java.util.Set;
-
-public class TestTridentTopology {
-
-    private StormTopology buildTopology() {
-        FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
-                new Values("the cow jumped over the moon"),
-                new Values("the man went to the store and bought some candy"),
-                new Values("four score and seven years ago"),
-                new Values("how many apples can you eat"));
-        spout.setCycle(true);
-
-        TridentTopology topology = new TridentTopology();
-        topology.newStream("spout", spout)
-                //no name
-                .each(new Fields("sentence"), new Split(), new Fields("word"))
-                .partitionBy(new Fields("word"))
-                .name("abc")
-                .each(new Fields("word"), new StringLength(), new Fields("length"))
-                .partitionBy(new Fields("length"))
-                .name("def")
-                .aggregate(new Fields("length"), new Count(), new Fields("count"))
-                .partitionBy(new Fields("count"))
-                .name("ghi")
-                .aggregate(new Fields("count"), new Sum(), new Fields("sum"));
-        return topology.build();
-    }
-
-    @Test
-    public void testGenBoltId() {
-        Set<String> pre = null;
-        for (int i = 0; i < 100; i++) {
-            StormTopology topology = buildTopology();
-            Map<String, Bolt> cur = topology.get_bolts();
-            System.out.println(cur.keySet());
-            if (pre != null) {
-                Assert.assertTrue("bold id not consistent with group name", pre.equals(cur.keySet()));
-            }
-            pre = cur.keySet();
-        }
-    }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.trident;
+
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.storm.trident.operation.builtin.Count;
+import org.apache.storm.trident.operation.builtin.Sum;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.testing.Split;
+import org.apache.storm.trident.testing.StringLength;
+
+import java.util.Map;
+import java.util.Set;
+
+public class TestTridentTopology {
+
+    private StormTopology buildTopology() {
+        FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
+                new Values("the cow jumped over the moon"),
+                new Values("the man went to the store and bought some candy"),
+                new Values("four score and seven years ago"),
+                new Values("how many apples can you eat"));
+        spout.setCycle(true);
+
+        TridentTopology topology = new TridentTopology();
+        topology.newStream("spout", spout)
+                //no name
+                .each(new Fields("sentence"), new Split(), new Fields("word"))
+                .partitionBy(new Fields("word"))
+                .name("abc")
+                .each(new Fields("word"), new StringLength(), new Fields("length"))
+                .partitionBy(new Fields("length"))
+                .name("def")
+                .aggregate(new Fields("length"), new Count(), new Fields("count"))
+                .partitionBy(new Fields("count"))
+                .name("ghi")
+                .aggregate(new Fields("count"), new Sum(), new Fields("sum"));
+        return topology.build();
+    }
+
+    @Test
+    public void testGenBoltId() {
+        Set<String> pre = null;
+        for (int i = 0; i < 100; i++) {
+            StormTopology topology = buildTopology();
+            Map<String, Bolt> cur = topology.get_bolts();
+            System.out.println(cur.keySet());
+            if (pre != null) {
+                Assert.assertTrue("bold id not consistent with group name", pre.equals(cur.keySet()));
+            }
+            pre = cur.keySet();
+        }
+    }
+
+}


[5/5] storm git commit: Merge branch 'gitattributes' of https://github.com/krichter722/storm into fix-eol-to-auto

Posted by sr...@apache.org.
Merge branch 'gitattributes' of https://github.com/krichter722/storm into fix-eol-to-auto


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/adf4efb7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/adf4efb7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/adf4efb7

Branch: refs/heads/master
Commit: adf4efb715b33abe7107175dc893a9036232a166
Parents: 0fdad2c e9c427c
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Fri Mar 23 18:08:23 2018 +0100
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Fri Mar 23 18:08:23 2018 +0100

----------------------------------------------------------------------
 .gitattributes                                  |  11 +-
 docs/storm-eventhubs.md                         |  80 +--
 external/storm-eventhubs/README.md              |  90 +--
 .../eventhubs/bolt/DefaultEventDataFormat.java  |  94 +--
 .../storm/eventhubs/bolt/EventHubBolt.java      | 292 ++++-----
 .../eventhubs/bolt/EventHubBoltConfig.java      | 222 +++----
 .../storm/eventhubs/bolt/IEventDataFormat.java  |  56 +-
 .../storm/eventhubs/spout/EventDataScheme.java  | 156 ++---
 .../storm/eventhubs/spout/FieldConstants.java   |  56 +-
 .../storm/eventhubs/spout/IEventDataScheme.java |  88 +--
 .../spout/IEventHubReceiverFactory.java         |  60 +-
 .../eventhubs/spout/IPartitionCoordinator.java  |  54 +-
 .../eventhubs/spout/IPartitionManager.java      |  74 +--
 .../storm/eventhubs/spout/IStateStore.java      |  62 +-
 .../apache/storm/eventhubs/spout/MessageId.java | 112 ++--
 .../eventhubs/spout/ZookeeperStateStore.java    | 190 +++---
 .../storm/eventhubs/trident/Coordinator.java    | 120 ++--
 .../trident/ITridentPartitionManager.java       |  70 +-
 .../ITridentPartitionManagerFactory.java        |  52 +-
 .../trident/OpaqueTridentEventHubEmitter.java   | 136 ++--
 .../trident/OpaqueTridentEventHubSpout.java     | 128 ++--
 .../storm/eventhubs/trident/Partition.java      |  78 +--
 .../storm/eventhubs/trident/Partitions.java     |  82 +--
 .../TransactionalTridentEventHubSpout.java      | 132 ++--
 .../storm/eventhubs/samples/EventHubLoop.java   | 104 +--
 .../samples/OpaqueTridentEventCount.java        | 106 +--
 .../samples/TransactionalTridentEventCount.java | 162 ++---
 .../eventhubs/samples/bolt/GlobalCountBolt.java | 176 ++---
 .../samples/bolt/PartialCountBolt.java          | 136 ++--
 .../spout/SpoutOutputCollectorMock.java         | 142 ++--
 .../storm/eventhubs/spout/StateStoreMock.java   | 108 +--
 .../storm/eventhubs/spout/TestEventData.java    |  94 +--
 .../eventhubs/trident/TridentCollectorMock.java | 114 ++--
 external/storm-mongodb/README.md                | 650 +++++++++----------
 .../storm/mongodb/bolt/AbstractMongoBolt.java   | 124 ++--
 .../storm/mongodb/bolt/MongoInsertBolt.java     | 248 +++----
 .../storm/mongodb/bolt/MongoUpdateBolt.java     | 186 +++---
 .../storm/mongodb/common/MongoDbClient.java     | 218 +++----
 .../mongodb/common/QueryFilterCreator.java      |  94 +--
 .../common/SimpleQueryFilterCreator.java        |  94 +--
 .../mongodb/common/mapper/MongoMapper.java      |  94 +--
 .../common/mapper/SimpleMongoMapper.java        | 110 ++--
 .../common/mapper/SimpleMongoUpdateMapper.java  |  92 +--
 .../storm/mongodb/trident/state/MongoState.java | 290 ++++-----
 .../trident/state/MongoStateFactory.java        |  86 +--
 .../trident/state/MongoStateUpdater.java        |  70 +-
 integration-test/README.md                      | 118 ++--
 .../storm/trident/TestTridentTopology.java      | 152 ++---
 48 files changed, 3136 insertions(+), 3127 deletions(-)
----------------------------------------------------------------------



[2/5] storm git commit: set '* text=auto' in .gitattributes in order to avoid merge work because of line feed changes

Posted by sr...@apache.org.
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/README.md
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/README.md b/external/storm-mongodb/README.md
index b5b9e96..5c81710 100644
--- a/external/storm-mongodb/README.md
+++ b/external/storm-mongodb/README.md
@@ -1,325 +1,325 @@
-#Storm MongoDB
-
-Storm/Trident integration for [MongoDB](https://www.mongodb.org/). This package includes the core bolts and trident states that allows a storm topology to either insert storm tuples in a database collection or to execute select/update queries against a database collection in a storm topology.
-
-## Insert into Database
-The bolt and trident state included in this package for inserting data into a database collection.
-
-### MongoMapper
-The main API for inserting data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoMapper` interface:
-
-```java
-public interface MongoMapper extends Serializable {
-    Document toDocument(ITuple tuple);
-    Document toDocumentByKeys(List<Object> keys);
-}
-```
-
-### SimpleMongoMapper
-`storm-mongodb` includes a general purpose `MongoMapper` implementation called `SimpleMongoMapper` that can map Storm tuple to a Database document.  `SimpleMongoMapper` assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to.
-
-```java
-public class SimpleMongoMapper implements MongoMapper {
-    private String[] fields;
-
-    @Override
-    public Document toDocument(ITuple tuple) {
-        Document document = new Document();
-        for(String field : fields){
-            document.append(field, tuple.getValueByField(field));
-        }
-        return document;
-    }
-
-    @Override
-    public Document toDocumentByKeys(List<Object> keys) {
-        Document document = new Document();
-        document.append("_id", MongoUtils.getID(keys));
-        return document;
-    }
-
-    public SimpleMongoMapper withFields(String... fields) {
-        this.fields = fields;
-        return this;
-    }
-}
-```
-
-### MongoInsertBolt
-To use the `MongoInsertBolt`, you construct an instance of it by specifying url, collectionName and a `MongoMapper` implementation that converts storm tuple to DB document. The following is the standard URI connection scheme:
- `mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]`
-
-More options information(eg: Write Concern Options) about Mongo URI, you can visit https://docs.mongodb.org/manual/reference/connection-string/#connections-connection-options
-
- ```java
-String url = "mongodb://127.0.0.1:27017/test";
-String collectionName = "wordcount";
-
-MongoMapper mapper = new SimpleMongoMapper()
-        .withFields("word", "count");
-
-MongoInsertBolt insertBolt = new MongoInsertBolt(url, collectionName, mapper);
- ```
-
-
-## Update from Database
-The bolt included in this package for updating data from a database collection.
-
-### MongoUpdateMapper
-The main API for updating data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoUpdateMapper` interface:
-
-```java
-public interface MongoUpdateMapper extends MongoMapper { }
-```
-
-### SimpleMongoUpdateMapper
-`storm-mongodb` includes a general purpose `MongoUpdateMapper` implementation called `SimpleMongoUpdateMapper` that can map Storm tuple to a Database document. `SimpleMongoUpdateMapper` assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to.
-`SimpleMongoUpdateMapper` uses `$set` operator for setting the value of a field in a document. More information about update operator, you can visit 
-https://docs.mongodb.org/manual/reference/operator/update/
-
-```java
-public class SimpleMongoUpdateMapper extends SimpleMongoMapper implements MongoUpdateMapper {
-
-    private String[] fields;
-
-    @Override
-    public Document toDocument(ITuple tuple) {
-        Document document = new Document();
-        for(String field : fields){
-            document.append(field, tuple.getValueByField(field));
-        }
-        //$set operator: Sets the value of a field in a document.
-        return new Document("$set", document);
-    }
-
-    public SimpleMongoUpdateMapper withFields(String... fields) {
-        this.fields = fields;
-        return this;
-    }
-}
-```
-
-
-### QueryFilterCreator
-The main API for creating a MongoDB query Filter is the `org.apache.storm.mongodb.common.QueryFilterCreator` interface:
-
- ```java
-public interface QueryFilterCreator extends Serializable {
-    Bson createFilter(ITuple tuple);
-    Bson createFilterByKeys(List<Object> keys);
-}
- ```
-
-### SimpleQueryFilterCreator
-`storm-mongodb` includes a general purpose `QueryFilterCreator` implementation called `SimpleQueryFilterCreator` that can create a MongoDB query Filter by given Tuple.  `QueryFilterCreator` uses `$eq` operator for matching values that are equal to a specified value. More information about query operator, you can visit 
-https://docs.mongodb.org/manual/reference/operator/query/
-
- ```java
-public class SimpleQueryFilterCreator implements QueryFilterCreator {
-
-    private String field;
-    
-    @Override
-    public Bson createFilter(ITuple tuple) {
-        return Filters.eq(field, tuple.getValueByField(field));
-    }
-
-    @Override
-    public Bson createFilterByKeys(List<Object> keys) {
-        return Filters.eq("_id", MongoUtils.getID(keys));
-    }
-
-    public SimpleQueryFilterCreator withField(String field) {
-        this.field = field;
-        return this;
-    }
-
-}
- ```
-
-### MongoUpdateBolt
-To use the `MongoUpdateBolt`,  you construct an instance of it by specifying Mongo url, collectionName, a `QueryFilterCreator` implementation and a `MongoUpdateMapper` implementation that converts storm tuple to DB document.
-
- ```java
-        MongoUpdateMapper mapper = new SimpleMongoUpdateMapper()
-                .withFields("word", "count");
-
-        QueryFilterCreator updateQueryCreator = new SimpleQueryFilterCreator()
-                .withField("word");
-        
-        MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator, mapper);
-
-        //if a new document should be inserted if there are no matches to the query filter
-        //updateBolt.withUpsert(true);
-
-        //whether find all documents according to the query filter
-        //updateBolt.withMany(true);
- ```
- 
- Or use a anonymous inner class implementation for `QueryFilterCreator`:
- 
-  ```java
-        MongoUpdateMapper mapper = new SimpleMongoUpdateMapper()
-                .withFields("word", "count");
-
-        QueryFilterCreator updateQueryCreator = new QueryFilterCreator() {
-            @Override
-            public Bson createFilter(ITuple tuple) {
-                return Filters.gt("count", 3);
-            }
-        };
-        
-        MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator, mapper);
-
-        //if a new document should be inserted if there are no matches to the query filter
-        //updateBolt.withUpsert(true);
- ```
-
-
-## Lookup from Database
-The bolt included in this package for selecting data from a database collection.
-
-### MongoLookupMapper
-The main API for selecting data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoLookupMapper` interface:
-
-```java
-public interface MongoLookupMapper extends Serializable {
-
-    List<Values> toTuple(ITuple input, Document doc);
-
-    void declareOutputFields(OutputFieldsDeclarer declarer);
-}
-```
-
-### SimpleMongoLookupMapper
-`storm-mongodb` includes a general purpose `MongoLookupMapper` implementation called `SimpleMongoLookupMapper` that can converts a Mongo document to a list of storm values.
-
-```java
-public class SimpleMongoLookupMapper implements MongoLookupMapper {
-
-    private String[] fields;
-
-    @Override
-    public List<Values> toTuple(ITuple input, Document doc) {
-        Values values = new Values();
-
-        for(String field : fields) {
-            if(input.contains(field)) {
-                values.add(input.getValueByField(field));
-            } else {
-                values.add(doc.get(field));
-            }
-        }
-        List<Values> result = new ArrayList<Values>();
-        result.add(values);
-        return result;
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields(fields));
-    }
-
-    public SimpleMongoLookupMapper withFields(String... fields) {
-        this.fields = fields;
-        return this;
-    }
-
-}
-```
-
-### MongoLookupBolt
-To use the `MongoLookupBolt`,  you construct an instance of it by specifying Mongo url, collectionName, a `QueryFilterCreator` implementation and a `MongoLookupMapper` implementation that converts a Mongo document to a list of storm values.
-
- ```java
-        MongoLookupMapper mapper = new SimpleMongoLookupMapper()
-                .withFields("word", "count");
-
-        QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
-                .withField("word");
-
-        MongoLookupBolt lookupBolt = new MongoLookupBolt(url, collectionName, filterCreator, mapper);
- ```
-
-## Mongo Trident State&MapState
-### Trident State
-We support trident persistent state that can be used with trident topologies. To create a Mongo persistent trident state you need to initialize it with the url, collectionName, the `MongoMapper` instance. See the example below:
-
- ```java
-        MongoMapper mapper = new SimpleMongoMapper()
-                .withFields("word", "count");
-
-        MongoState.Options options = new MongoState.Options()
-                .withUrl(url)
-                .withCollectionName(collectionName)
-                .withMapper(mapper);
-
-        StateFactory factory = new MongoStateFactory(options);
-
-        TridentTopology topology = new TridentTopology();
-        Stream stream = topology.newStream("spout1", spout);
-
-        stream.partitionPersist(factory, fields,
-                new MongoStateUpdater(), new Fields());
-
-        TridentState state = topology.newStaticState(factory);
-        stream = stream.stateQuery(state, new Fields("word"),
-                new MongoStateQuery(), new Fields("columnName", "columnValue"));
-        stream.each(new Fields("word", "columnValue"), new PrintFunction(), new Fields());
- ```
- **NOTE**:
- >If there is no unique index provided, trident state inserts in the case of failures may result in duplicate documents.
-
-### Trident MapState
-We also support trident `MapState`. To create a Mongo trident `MapState` you need to initialize it with the url, collectionName, the `MongoMapper` and `QueryFilterCreator` instance. See the example below:
-
- ```java
-        MongoMapper mapper = new SimpleMongoMapper()
-                .withFields("word", "count");
-
-        QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
-                .withField("word");
-
-        MongoMapState.Options options = new MongoMapState.Options();
-        options.url = url;
-        options.collectionName = collectionName;
-        options.mapper = mapper;
-        options.queryCreator = filterCreator;
-
-        StateFactory factory = MongoMapState.transactional(options);
-
-        TridentTopology topology = new TridentTopology();
-        Stream stream = topology.newStream("spout1", spout);
-
-        TridentState state = stream.groupBy(new Fields("word"))
-                .persistentAggregate(factory, new Fields("count"), new Sum(), new Fields("sum"));
-
-        stream.stateQuery(state, new Fields("word"), new MapGet(), new Fields("sum"))
-                .each(new Fields("word", "sum"), new PrintFunction(), new Fields());
- ```
- 
-
-## License
-
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-
-## Committer Sponsors
-
- * Sriharsha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org))
- * Xin Wang ([xinwang@apache.org](mailto:xinwang@apache.org))
- 
+#Storm MongoDB
+
+Storm/Trident integration for [MongoDB](https://www.mongodb.org/). This package includes the core bolts and trident states that allows a storm topology to either insert storm tuples in a database collection or to execute select/update queries against a database collection in a storm topology.
+
+## Insert into Database
+The bolt and trident state included in this package for inserting data into a database collection.
+
+### MongoMapper
+The main API for inserting data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoMapper` interface:
+
+```java
+public interface MongoMapper extends Serializable {
+    Document toDocument(ITuple tuple);
+    Document toDocumentByKeys(List<Object> keys);
+}
+```
+
+### SimpleMongoMapper
+`storm-mongodb` includes a general purpose `MongoMapper` implementation called `SimpleMongoMapper` that can map Storm tuple to a Database document.  `SimpleMongoMapper` assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to.
+
+```java
+public class SimpleMongoMapper implements MongoMapper {
+    private String[] fields;
+
+    @Override
+    public Document toDocument(ITuple tuple) {
+        Document document = new Document();
+        for(String field : fields){
+            document.append(field, tuple.getValueByField(field));
+        }
+        return document;
+    }
+
+    @Override
+    public Document toDocumentByKeys(List<Object> keys) {
+        Document document = new Document();
+        document.append("_id", MongoUtils.getID(keys));
+        return document;
+    }
+
+    public SimpleMongoMapper withFields(String... fields) {
+        this.fields = fields;
+        return this;
+    }
+}
+```
+
+### MongoInsertBolt
+To use the `MongoInsertBolt`, you construct an instance of it by specifying url, collectionName and a `MongoMapper` implementation that converts storm tuple to DB document. The following is the standard URI connection scheme:
+ `mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]`
+
+More options information(eg: Write Concern Options) about Mongo URI, you can visit https://docs.mongodb.org/manual/reference/connection-string/#connections-connection-options
+
+ ```java
+String url = "mongodb://127.0.0.1:27017/test";
+String collectionName = "wordcount";
+
+MongoMapper mapper = new SimpleMongoMapper()
+        .withFields("word", "count");
+
+MongoInsertBolt insertBolt = new MongoInsertBolt(url, collectionName, mapper);
+ ```
+
+
+## Update from Database
+The bolt included in this package for updating data from a database collection.
+
+### MongoUpdateMapper
+The main API for updating data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoUpdateMapper` interface:
+
+```java
+public interface MongoUpdateMapper extends MongoMapper { }
+```
+
+### SimpleMongoUpdateMapper
+`storm-mongodb` includes a general purpose `MongoUpdateMapper` implementation called `SimpleMongoUpdateMapper` that can map Storm tuple to a Database document. `SimpleMongoUpdateMapper` assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to.
+`SimpleMongoUpdateMapper` uses `$set` operator for setting the value of a field in a document. More information about update operator, you can visit 
+https://docs.mongodb.org/manual/reference/operator/update/
+
+```java
+public class SimpleMongoUpdateMapper extends SimpleMongoMapper implements MongoUpdateMapper {
+
+    private String[] fields;
+
+    @Override
+    public Document toDocument(ITuple tuple) {
+        Document document = new Document();
+        for(String field : fields){
+            document.append(field, tuple.getValueByField(field));
+        }
+        //$set operator: Sets the value of a field in a document.
+        return new Document("$set", document);
+    }
+
+    public SimpleMongoUpdateMapper withFields(String... fields) {
+        this.fields = fields;
+        return this;
+    }
+}
+```
+
+
+### QueryFilterCreator
+The main API for creating a MongoDB query Filter is the `org.apache.storm.mongodb.common.QueryFilterCreator` interface:
+
+ ```java
+public interface QueryFilterCreator extends Serializable {
+    Bson createFilter(ITuple tuple);
+    Bson createFilterByKeys(List<Object> keys);
+}
+ ```
+
+### SimpleQueryFilterCreator
+`storm-mongodb` includes a general purpose `QueryFilterCreator` implementation called `SimpleQueryFilterCreator` that can create a MongoDB query Filter by given Tuple.  `QueryFilterCreator` uses `$eq` operator for matching values that are equal to a specified value. More information about query operator, you can visit 
+https://docs.mongodb.org/manual/reference/operator/query/
+
+ ```java
+public class SimpleQueryFilterCreator implements QueryFilterCreator {
+
+    private String field;
+    
+    @Override
+    public Bson createFilter(ITuple tuple) {
+        return Filters.eq(field, tuple.getValueByField(field));
+    }
+
+    @Override
+    public Bson createFilterByKeys(List<Object> keys) {
+        return Filters.eq("_id", MongoUtils.getID(keys));
+    }
+
+    public SimpleQueryFilterCreator withField(String field) {
+        this.field = field;
+        return this;
+    }
+
+}
+ ```
+
+### MongoUpdateBolt
+To use the `MongoUpdateBolt`,  you construct an instance of it by specifying Mongo url, collectionName, a `QueryFilterCreator` implementation and a `MongoUpdateMapper` implementation that converts storm tuple to DB document.
+
+ ```java
+        MongoUpdateMapper mapper = new SimpleMongoUpdateMapper()
+                .withFields("word", "count");
+
+        QueryFilterCreator updateQueryCreator = new SimpleQueryFilterCreator()
+                .withField("word");
+        
+        MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator, mapper);
+
+        //if a new document should be inserted if there are no matches to the query filter
+        //updateBolt.withUpsert(true);
+
+        //whether find all documents according to the query filter
+        //updateBolt.withMany(true);
+ ```
+ 
+ Or use a anonymous inner class implementation for `QueryFilterCreator`:
+ 
+  ```java
+        MongoUpdateMapper mapper = new SimpleMongoUpdateMapper()
+                .withFields("word", "count");
+
+        QueryFilterCreator updateQueryCreator = new QueryFilterCreator() {
+            @Override
+            public Bson createFilter(ITuple tuple) {
+                return Filters.gt("count", 3);
+            }
+        };
+        
+        MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator, mapper);
+
+        //if a new document should be inserted if there are no matches to the query filter
+        //updateBolt.withUpsert(true);
+ ```
+
+
+## Lookup from Database
+The bolt included in this package for selecting data from a database collection.
+
+### MongoLookupMapper
+The main API for selecting data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoLookupMapper` interface:
+
+```java
+public interface MongoLookupMapper extends Serializable {
+
+    List<Values> toTuple(ITuple input, Document doc);
+
+    void declareOutputFields(OutputFieldsDeclarer declarer);
+}
+```
+
+### SimpleMongoLookupMapper
+`storm-mongodb` includes a general purpose `MongoLookupMapper` implementation called `SimpleMongoLookupMapper` that can converts a Mongo document to a list of storm values.
+
+```java
+public class SimpleMongoLookupMapper implements MongoLookupMapper {
+
+    private String[] fields;
+
+    @Override
+    public List<Values> toTuple(ITuple input, Document doc) {
+        Values values = new Values();
+
+        for(String field : fields) {
+            if(input.contains(field)) {
+                values.add(input.getValueByField(field));
+            } else {
+                values.add(doc.get(field));
+            }
+        }
+        List<Values> result = new ArrayList<Values>();
+        result.add(values);
+        return result;
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields(fields));
+    }
+
+    public SimpleMongoLookupMapper withFields(String... fields) {
+        this.fields = fields;
+        return this;
+    }
+
+}
+```
+
+### MongoLookupBolt
+To use the `MongoLookupBolt`,  you construct an instance of it by specifying Mongo url, collectionName, a `QueryFilterCreator` implementation and a `MongoLookupMapper` implementation that converts a Mongo document to a list of storm values.
+
+ ```java
+        MongoLookupMapper mapper = new SimpleMongoLookupMapper()
+                .withFields("word", "count");
+
+        QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
+                .withField("word");
+
+        MongoLookupBolt lookupBolt = new MongoLookupBolt(url, collectionName, filterCreator, mapper);
+ ```
+
+## Mongo Trident State&MapState
+### Trident State
+We support trident persistent state that can be used with trident topologies. To create a Mongo persistent trident state you need to initialize it with the url, collectionName, the `MongoMapper` instance. See the example below:
+
+ ```java
+        MongoMapper mapper = new SimpleMongoMapper()
+                .withFields("word", "count");
+
+        MongoState.Options options = new MongoState.Options()
+                .withUrl(url)
+                .withCollectionName(collectionName)
+                .withMapper(mapper);
+
+        StateFactory factory = new MongoStateFactory(options);
+
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("spout1", spout);
+
+        stream.partitionPersist(factory, fields,
+                new MongoStateUpdater(), new Fields());
+
+        TridentState state = topology.newStaticState(factory);
+        stream = stream.stateQuery(state, new Fields("word"),
+                new MongoStateQuery(), new Fields("columnName", "columnValue"));
+        stream.each(new Fields("word", "columnValue"), new PrintFunction(), new Fields());
+ ```
+ **NOTE**:
+ >If there is no unique index provided, trident state inserts in the case of failures may result in duplicate documents.
+
+### Trident MapState
+We also support trident `MapState`. To create a Mongo trident `MapState` you need to initialize it with the url, collectionName, the `MongoMapper` and `QueryFilterCreator` instance. See the example below:
+
+ ```java
+        MongoMapper mapper = new SimpleMongoMapper()
+                .withFields("word", "count");
+
+        QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
+                .withField("word");
+
+        MongoMapState.Options options = new MongoMapState.Options();
+        options.url = url;
+        options.collectionName = collectionName;
+        options.mapper = mapper;
+        options.queryCreator = filterCreator;
+
+        StateFactory factory = MongoMapState.transactional(options);
+
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("spout1", spout);
+
+        TridentState state = stream.groupBy(new Fields("word"))
+                .persistentAggregate(factory, new Fields("count"), new Sum(), new Fields("sum"));
+
+        stream.stateQuery(state, new Fields("word"), new MapGet(), new Fields("sum"))
+                .each(new Fields("word", "sum"), new PrintFunction(), new Fields());
+ ```
+ 
+
+## License
+
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+## Committer Sponsors
+
+ * Sriharsha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org))
+ * Xin Wang ([xinwang@apache.org](mailto:xinwang@apache.org))
+ 

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/AbstractMongoBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/AbstractMongoBolt.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/AbstractMongoBolt.java
index ee66811..13049de 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/AbstractMongoBolt.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/AbstractMongoBolt.java
@@ -1,62 +1,62 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.bolt;
-
-import java.util.Map;
-
-import org.apache.commons.lang.Validate;
-import org.apache.storm.mongodb.common.MongoDbClient;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.base.BaseRichBolt;
-
-public abstract class AbstractMongoBolt extends BaseRichBolt {
-
-    private String url;
-    private String collectionName;
-
-    protected OutputCollector collector;
-    protected MongoDbClient mongoClient;
-
-    /**
-     * AbstractMongoBolt Constructor.
-     * @param url The MongoDB server url
-     * @param collectionName The collection where reading/writing data
-     */
-    public AbstractMongoBolt(String url, String collectionName) {
-        Validate.notEmpty(url, "url can not be blank or null");
-        Validate.notEmpty(collectionName, "collectionName can not be blank or null");
-
-        this.url = url;
-        this.collectionName = collectionName;
-    }
-
-    @Override
-    public void prepare(Map<String, Object> topoConf, TopologyContext context,
-            OutputCollector collector) {
-        this.collector = collector;
-        this.mongoClient = new MongoDbClient(url, collectionName);
-    }
-
-    @Override
-    public void cleanup() {
-        this.mongoClient.close();
-    }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.bolt;
+
+import java.util.Map;
+
+import org.apache.commons.lang.Validate;
+import org.apache.storm.mongodb.common.MongoDbClient;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.base.BaseRichBolt;
+
+public abstract class AbstractMongoBolt extends BaseRichBolt {
+
+    private String url;
+    private String collectionName;
+
+    protected OutputCollector collector;
+    protected MongoDbClient mongoClient;
+
+    /**
+     * AbstractMongoBolt Constructor.
+     * @param url The MongoDB server url
+     * @param collectionName The collection where reading/writing data
+     */
+    public AbstractMongoBolt(String url, String collectionName) {
+        Validate.notEmpty(url, "url can not be blank or null");
+        Validate.notEmpty(collectionName, "collectionName can not be blank or null");
+
+        this.url = url;
+        this.collectionName = collectionName;
+    }
+
+    @Override
+    public void prepare(Map<String, Object> topoConf, TopologyContext context,
+            OutputCollector collector) {
+        this.collector = collector;
+        this.mongoClient = new MongoDbClient(url, collectionName);
+    }
+
+    @Override
+    public void cleanup() {
+        this.mongoClient.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java
index afd1142..5d233da 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java
@@ -1,124 +1,124 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.bolt;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang.Validate;
-import org.apache.storm.mongodb.common.mapper.MongoMapper;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.BatchHelper;
-import org.apache.storm.utils.TupleUtils;
-import org.bson.Document;
-
-/**
- * Basic bolt for writing to MongoDB.
- * Note: Each MongoInsertBolt defined in a topology is tied to a specific collection.
- */
-public class MongoInsertBolt extends AbstractMongoBolt {
-
-    private static final int DEFAULT_FLUSH_INTERVAL_SECS = 1;
-
-    private MongoMapper mapper;
-
-    private boolean ordered = true;  //default is ordered.
-
-    private int batchSize;
-
-    private BatchHelper batchHelper;
-
-    private int flushIntervalSecs = DEFAULT_FLUSH_INTERVAL_SECS;
-
-    /**
-     * MongoInsertBolt Constructor.
-     * @param url The MongoDB server url
-     * @param collectionName The collection where reading/writing data
-     * @param mapper MongoMapper converting tuple to an MongoDB document
-     */
-    public MongoInsertBolt(String url, String collectionName, MongoMapper mapper) {
-        super(url, collectionName);
-
-        Validate.notNull(mapper, "MongoMapper can not be null");
-
-        this.mapper = mapper;
-    }
-
-    @Override
-    public void execute(Tuple tuple) {
-        try {
-            if (batchHelper.shouldHandle(tuple)) {
-                batchHelper.addBatch(tuple);
-            }
-
-            if (batchHelper.shouldFlush()) {
-                flushTuples();
-                batchHelper.ack();
-            }
-        } catch (Exception e) {
-            batchHelper.fail(e);
-        }
-    }
-
-    private void flushTuples() {
-        List<Document> docs = new LinkedList<>();
-        for (Tuple t : batchHelper.getBatchTuples()) {
-            Document doc = mapper.toDocument(t);
-            docs.add(doc);
-        }
-        mongoClient.insert(docs, ordered);
-    }
-
-    public MongoInsertBolt withBatchSize(int batchSize) {
-        this.batchSize = batchSize;
-        return this;
-    }
-
-    public MongoInsertBolt withOrdered(boolean ordered) {
-        this.ordered = ordered;
-        return this;
-    }
-
-    public MongoInsertBolt withFlushIntervalSecs(int flushIntervalSecs) {
-        this.flushIntervalSecs = flushIntervalSecs;
-        return this;
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        return TupleUtils.putTickFrequencyIntoComponentConfig(super.getComponentConfiguration(), flushIntervalSecs);
-    }
-
-    @Override
-    public void prepare(Map<String, Object> topoConf, TopologyContext context,
-            OutputCollector collector) {
-        super.prepare(topoConf, context, collector);
-        this.batchHelper = new BatchHelper(batchSize, collector);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        
-    }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.bolt;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.Validate;
+import org.apache.storm.mongodb.common.mapper.MongoMapper;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.BatchHelper;
+import org.apache.storm.utils.TupleUtils;
+import org.bson.Document;
+
+/**
+ * Basic bolt for writing to MongoDB.
+ * Note: Each MongoInsertBolt defined in a topology is tied to a specific collection.
+ */
+public class MongoInsertBolt extends AbstractMongoBolt {
+
+    private static final int DEFAULT_FLUSH_INTERVAL_SECS = 1;
+
+    private MongoMapper mapper;
+
+    private boolean ordered = true;  //default is ordered.
+
+    private int batchSize;
+
+    private BatchHelper batchHelper;
+
+    private int flushIntervalSecs = DEFAULT_FLUSH_INTERVAL_SECS;
+
+    /**
+     * MongoInsertBolt Constructor.
+     * @param url The MongoDB server url
+     * @param collectionName The collection where reading/writing data
+     * @param mapper MongoMapper converting tuple to an MongoDB document
+     */
+    public MongoInsertBolt(String url, String collectionName, MongoMapper mapper) {
+        super(url, collectionName);
+
+        Validate.notNull(mapper, "MongoMapper can not be null");
+
+        this.mapper = mapper;
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+        try {
+            if (batchHelper.shouldHandle(tuple)) {
+                batchHelper.addBatch(tuple);
+            }
+
+            if (batchHelper.shouldFlush()) {
+                flushTuples();
+                batchHelper.ack();
+            }
+        } catch (Exception e) {
+            batchHelper.fail(e);
+        }
+    }
+
+    private void flushTuples() {
+        List<Document> docs = new LinkedList<>();
+        for (Tuple t : batchHelper.getBatchTuples()) {
+            Document doc = mapper.toDocument(t);
+            docs.add(doc);
+        }
+        mongoClient.insert(docs, ordered);
+    }
+
+    public MongoInsertBolt withBatchSize(int batchSize) {
+        this.batchSize = batchSize;
+        return this;
+    }
+
+    public MongoInsertBolt withOrdered(boolean ordered) {
+        this.ordered = ordered;
+        return this;
+    }
+
+    public MongoInsertBolt withFlushIntervalSecs(int flushIntervalSecs) {
+        this.flushIntervalSecs = flushIntervalSecs;
+        return this;
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return TupleUtils.putTickFrequencyIntoComponentConfig(super.getComponentConfiguration(), flushIntervalSecs);
+    }
+
+    @Override
+    public void prepare(Map<String, Object> topoConf, TopologyContext context,
+            OutputCollector collector) {
+        super.prepare(topoConf, context, collector);
+        this.batchHelper = new BatchHelper(batchSize, collector);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
index 5579b8e..4f92b32 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
@@ -1,93 +1,93 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.bolt;
-
-import org.apache.commons.lang.Validate;
-import org.apache.storm.mongodb.common.QueryFilterCreator;
-import org.apache.storm.mongodb.common.mapper.MongoUpdateMapper;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.TupleUtils;
-import org.bson.Document;
-import org.bson.conversions.Bson;
-
-/**
- * Basic bolt for updating from MongoDB.
- * Note: Each MongoUpdateBolt defined in a topology is tied to a specific collection.
- */
-public class MongoUpdateBolt extends AbstractMongoBolt {
-
-    private QueryFilterCreator queryCreator;
-    private MongoUpdateMapper mapper;
-
-    private boolean upsert;  //the default is false.
-    private boolean many;  //the default is false.
-
-    /**
-     * MongoUpdateBolt Constructor.
-     * @param url The MongoDB server url
-     * @param collectionName The collection where reading/writing data
-     * @param queryCreator QueryFilterCreator
-     * @param mapper MongoMapper converting tuple to an MongoDB document
-     */
-    public MongoUpdateBolt(String url, String collectionName, QueryFilterCreator queryCreator, MongoUpdateMapper mapper) {
-        super(url, collectionName);
-
-        Validate.notNull(queryCreator, "QueryFilterCreator can not be null");
-        Validate.notNull(mapper, "MongoUpdateMapper can not be null");
-
-        this.queryCreator = queryCreator;
-        this.mapper = mapper;
-    }
-
-    @Override
-    public void execute(Tuple tuple) {
-        if (TupleUtils.isTick(tuple)) {
-            return;
-        }
-
-        try {
-            //get document
-            Document doc = mapper.toDocument(tuple);
-            //get query filter
-            Bson filter = queryCreator.createFilter(tuple);
-            mongoClient.update(filter, doc, upsert, many);
-            this.collector.ack(tuple);
-        } catch (Exception e) {
-            this.collector.reportError(e);
-            this.collector.fail(tuple);
-        }
-    }
-
-    public MongoUpdateBolt withUpsert(boolean upsert) {
-        this.upsert = upsert;
-        return this;
-    }
-
-    public MongoUpdateBolt withMany(boolean many) {
-        this.many = many;
-        return this;
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        
-    }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.bolt;
+
+import org.apache.commons.lang.Validate;
+import org.apache.storm.mongodb.common.QueryFilterCreator;
+import org.apache.storm.mongodb.common.mapper.MongoUpdateMapper;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+/**
+ * Basic bolt for updating from MongoDB.
+ * Note: Each MongoUpdateBolt defined in a topology is tied to a specific collection.
+ */
+public class MongoUpdateBolt extends AbstractMongoBolt {
+
+    private QueryFilterCreator queryCreator;
+    private MongoUpdateMapper mapper;
+
+    private boolean upsert;  //the default is false.
+    private boolean many;  //the default is false.
+
+    /**
+     * MongoUpdateBolt Constructor.
+     * @param url The MongoDB server url
+     * @param collectionName The collection where reading/writing data
+     * @param queryCreator QueryFilterCreator
+     * @param mapper MongoMapper converting tuple to an MongoDB document
+     */
+    public MongoUpdateBolt(String url, String collectionName, QueryFilterCreator queryCreator, MongoUpdateMapper mapper) {
+        super(url, collectionName);
+
+        Validate.notNull(queryCreator, "QueryFilterCreator can not be null");
+        Validate.notNull(mapper, "MongoUpdateMapper can not be null");
+
+        this.queryCreator = queryCreator;
+        this.mapper = mapper;
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+        if (TupleUtils.isTick(tuple)) {
+            return;
+        }
+
+        try {
+            //get document
+            Document doc = mapper.toDocument(tuple);
+            //get query filter
+            Bson filter = queryCreator.createFilter(tuple);
+            mongoClient.update(filter, doc, upsert, many);
+            this.collector.ack(tuple);
+        } catch (Exception e) {
+            this.collector.reportError(e);
+            this.collector.fail(tuple);
+        }
+    }
+
+    public MongoUpdateBolt withUpsert(boolean upsert) {
+        this.upsert = upsert;
+        return this;
+    }
+
+    public MongoUpdateBolt withMany(boolean many) {
+        this.many = many;
+        return this;
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDbClient.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDbClient.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDbClient.java
index c264f03..52ed237 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDbClient.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDbClient.java
@@ -1,109 +1,109 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.common;
-
-import com.mongodb.MongoClient;
-import com.mongodb.MongoClientURI;
-import com.mongodb.client.MongoCollection;
-import com.mongodb.client.MongoDatabase;
-import com.mongodb.client.model.InsertManyOptions;
-import com.mongodb.client.model.UpdateOptions;
-
-import java.util.List;
-
-import org.bson.Document;
-import org.bson.conversions.Bson;
-
-public class MongoDbClient {
-
-    private MongoClient client;
-    private MongoCollection<Document> collection;
-
-    /**
-     * The MongoDbClient constructor.
-     * @param url The Mongo server url
-     * @param collectionName The Mongo collection to read/write data
-     */
-    public MongoDbClient(String url, String collectionName) {
-        //Creates a MongoURI from the given string.
-        MongoClientURI uri = new MongoClientURI(url);
-        //Creates a MongoClient described by a URI.
-        this.client = new MongoClient(uri);
-        //Gets a Database.
-        MongoDatabase db = client.getDatabase(uri.getDatabase());
-        //Gets a collection.
-        this.collection = db.getCollection(collectionName);
-    }
-
-    /**
-     * Inserts one or more documents.
-     * This method is equivalent to a call to the bulkWrite method.
-     * The documents will be inserted in the order provided, 
-     * stopping on the first failed insertion. 
-     * 
-     * @param documents documents
-     */
-    public void insert(List<Document> documents, boolean ordered) {
-        InsertManyOptions options = new InsertManyOptions();
-        if (!ordered) {
-            options.ordered(false);
-        }
-        collection.insertMany(documents, options);
-    }
-
-    /**
-     * Update a single or all documents in the collection according to the specified arguments.
-     * When upsert set to true, the new document will be inserted if there are no matches to the query filter.
-     * 
-     * @param filter Bson filter
-     * @param document Bson document
-     * @param upsert a new document should be inserted if there are no matches to the query filter
-     * @param many whether find all documents according to the query filter
-     */
-    public void update(Bson filter, Bson document, boolean upsert, boolean many) {
-        //TODO batch updating
-        UpdateOptions options = new UpdateOptions();
-        if (upsert) {
-            options.upsert(true);
-        }
-        if (many) {
-            collection.updateMany(filter, document, options);
-        } else {
-            collection.updateOne(filter, document, options);
-        }
-    }
-
-    /**
-     * Finds a single document in the collection according to the specified arguments.
-     *
-     * @param filter Bson filter
-     */
-    public Document find(Bson filter) {
-        //TODO batch finding
-        return collection.find(filter).first();
-    }
-
-    /**
-     * Closes all resources associated with this instance.
-     */
-    public void close() {
-        client.close();
-    }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.common;
+
+import com.mongodb.MongoClient;
+import com.mongodb.MongoClientURI;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.model.InsertManyOptions;
+import com.mongodb.client.model.UpdateOptions;
+
+import java.util.List;
+
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+public class MongoDbClient {
+
+    private MongoClient client;
+    private MongoCollection<Document> collection;
+
+    /**
+     * The MongoDbClient constructor.
+     * @param url The Mongo server url
+     * @param collectionName The Mongo collection to read/write data
+     */
+    public MongoDbClient(String url, String collectionName) {
+        //Creates a MongoURI from the given string.
+        MongoClientURI uri = new MongoClientURI(url);
+        //Creates a MongoClient described by a URI.
+        this.client = new MongoClient(uri);
+        //Gets a Database.
+        MongoDatabase db = client.getDatabase(uri.getDatabase());
+        //Gets a collection.
+        this.collection = db.getCollection(collectionName);
+    }
+
+    /**
+     * Inserts one or more documents.
+     * This method is equivalent to a call to the bulkWrite method.
+     * The documents will be inserted in the order provided, 
+     * stopping on the first failed insertion. 
+     * 
+     * @param documents documents
+     */
+    public void insert(List<Document> documents, boolean ordered) {
+        InsertManyOptions options = new InsertManyOptions();
+        if (!ordered) {
+            options.ordered(false);
+        }
+        collection.insertMany(documents, options);
+    }
+
+    /**
+     * Update a single or all documents in the collection according to the specified arguments.
+     * When upsert set to true, the new document will be inserted if there are no matches to the query filter.
+     * 
+     * @param filter Bson filter
+     * @param document Bson document
+     * @param upsert a new document should be inserted if there are no matches to the query filter
+     * @param many whether find all documents according to the query filter
+     */
+    public void update(Bson filter, Bson document, boolean upsert, boolean many) {
+        //TODO batch updating
+        UpdateOptions options = new UpdateOptions();
+        if (upsert) {
+            options.upsert(true);
+        }
+        if (many) {
+            collection.updateMany(filter, document, options);
+        } else {
+            collection.updateOne(filter, document, options);
+        }
+    }
+
+    /**
+     * Finds a single document in the collection according to the specified arguments.
+     *
+     * @param filter Bson filter
+     */
+    public Document find(Bson filter) {
+        //TODO batch finding
+        return collection.find(filter).first();
+    }
+
+    /**
+     * Closes all resources associated with this instance.
+     */
+    public void close() {
+        client.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java
index 371b1ed..521678e 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java
@@ -1,47 +1,47 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.common;
-
-import java.io.Serializable;
-import java.util.List;
-
-import org.apache.storm.tuple.ITuple;
-import org.bson.conversions.Bson;
-
-/**
- * Create a MongoDB query Filter by given Tuple/trident keys.
- */
-public interface QueryFilterCreator extends Serializable {
-
-    /**
-     * Create a query Filter by given Tuple.
-     * 
-     * @param tuple ITuple tuple
-     * @return query Filter
-     */
-    Bson createFilter(ITuple tuple);
-
-    /**
-     * Create a query Filter by given trident keys.
-     *
-     * @param keys keys
-     * @return query Filter
-     */
-    Bson createFilterByKeys(List<Object> keys);
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.common;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.storm.tuple.ITuple;
+import org.bson.conversions.Bson;
+
+/**
+ * Create a MongoDB query Filter by given Tuple/trident keys.
+ */
+public interface QueryFilterCreator extends Serializable {
+
+    /**
+     * Create a query Filter by given Tuple.
+     * 
+     * @param tuple ITuple tuple
+     * @return query Filter
+     */
+    Bson createFilter(ITuple tuple);
+
+    /**
+     * Create a query Filter by given trident keys.
+     *
+     * @param keys keys
+     * @return query Filter
+     */
+    Bson createFilterByKeys(List<Object> keys);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java
index 1f5774a..16a71ec 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java
@@ -1,47 +1,47 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.common;
-
-import com.mongodb.client.model.Filters;
-
-import java.util.List;
-
-import org.apache.storm.tuple.ITuple;
-import org.bson.conversions.Bson;
-
-public class SimpleQueryFilterCreator implements QueryFilterCreator {
-
-    private String field;
-    
-    @Override
-    public Bson createFilter(ITuple tuple) {
-        return Filters.eq(field, tuple.getValueByField(field));
-    }
-
-    @Override
-    public Bson createFilterByKeys(List<Object> keys) {
-        return Filters.eq("_id", MongoUtils.getId(keys));
-    }
-
-    public SimpleQueryFilterCreator withField(String field) {
-        this.field = field;
-        return this;
-    }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.common;
+
+import com.mongodb.client.model.Filters;
+
+import java.util.List;
+
+import org.apache.storm.tuple.ITuple;
+import org.bson.conversions.Bson;
+
+public class SimpleQueryFilterCreator implements QueryFilterCreator {
+
+    private String field;
+    
+    @Override
+    public Bson createFilter(ITuple tuple) {
+        return Filters.eq(field, tuple.getValueByField(field));
+    }
+
+    @Override
+    public Bson createFilterByKeys(List<Object> keys) {
+        return Filters.eq("_id", MongoUtils.getId(keys));
+    }
+
+    public SimpleQueryFilterCreator withField(String field) {
+        this.field = field;
+        return this;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java
index 3ea9c16..ed27c92 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java
@@ -1,47 +1,47 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.common.mapper;
-
-import java.io.Serializable;
-import java.util.List;
-
-import org.apache.storm.tuple.ITuple;
-import org.bson.Document;
-
-/**
- * Given a Tuple/trident keys, converts it to an MongoDB document.
- */
-public interface MongoMapper extends Serializable {
-
-    /**
-     * Converts a Tuple to a Document.
-     * 
-     * @param tuple the incoming tuple
-     * @return the MongoDB document
-     */
-    Document toDocument(ITuple tuple);
-
-    /**
-     * Converts a keys to a Document.
-     *
-     * @param keys the trident keys
-     * @return the MongoDB document
-     */
-    Document toDocumentByKeys(List<Object> keys);
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.common.mapper;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.storm.tuple.ITuple;
+import org.bson.Document;
+
+/**
+ * Given a Tuple/trident keys, converts it to an MongoDB document.
+ */
+public interface MongoMapper extends Serializable {
+
+    /**
+     * Converts a Tuple to a Document.
+     * 
+     * @param tuple the incoming tuple
+     * @return the MongoDB document
+     */
+    Document toDocument(ITuple tuple);
+
+    /**
+     * Converts a keys to a Document.
+     *
+     * @param keys the trident keys
+     * @return the MongoDB document
+     */
+    Document toDocumentByKeys(List<Object> keys);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java
index 7bb0a06..1a38828 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java
@@ -1,55 +1,55 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.common.mapper;
-
-import java.util.List;
-
-import org.apache.storm.mongodb.common.MongoUtils;
-import org.apache.storm.tuple.ITuple;
-import org.bson.Document;
-
-public class SimpleMongoMapper implements MongoMapper {
-
-    private String[] fields;
-
-    public SimpleMongoMapper(String... fields) {
-        this.fields = fields;
-    }
-
-    @Override
-    public Document toDocument(ITuple tuple) {
-        Document document = new Document();
-        for (String field : fields) {
-            document.append(field, tuple.getValueByField(field));
-        }
-        return document;
-    }
-
-    @Override
-    public Document toDocumentByKeys(List<Object> keys) {
-        Document document = new Document();
-        document.append("_id", MongoUtils.getId(keys));
-        return document;
-    }
-
-    public SimpleMongoMapper withFields(String... fields) {
-        this.fields = fields;
-        return this;
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.common.mapper;
+
+import java.util.List;
+
+import org.apache.storm.mongodb.common.MongoUtils;
+import org.apache.storm.tuple.ITuple;
+import org.bson.Document;
+
+public class SimpleMongoMapper implements MongoMapper {
+
+    private String[] fields;
+
+    public SimpleMongoMapper(String... fields) {
+        this.fields = fields;
+    }
+
+    @Override
+    public Document toDocument(ITuple tuple) {
+        Document document = new Document();
+        for (String field : fields) {
+            document.append(field, tuple.getValueByField(field));
+        }
+        return document;
+    }
+
+    @Override
+    public Document toDocumentByKeys(List<Object> keys) {
+        Document document = new Document();
+        document.append("_id", MongoUtils.getId(keys));
+        return document;
+    }
+
+    public SimpleMongoMapper withFields(String... fields) {
+        this.fields = fields;
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java
index 3ed17ec..72328c9 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java
@@ -1,46 +1,46 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.common.mapper;
-
-import org.apache.storm.tuple.ITuple;
-import org.bson.Document;
-
-public class SimpleMongoUpdateMapper extends SimpleMongoMapper implements MongoUpdateMapper {
-
-    private String[] fields;
-
-    public SimpleMongoUpdateMapper(String... fields) {
-        this.fields = fields;
-    }
-
-    @Override
-    public Document toDocument(ITuple tuple) {
-        Document document = new Document();
-        for (String field : fields) {
-            document.append(field, tuple.getValueByField(field));
-        }
-        //$set operator: Sets the value of a field in a document.
-        return new Document("$set", document);
-    }
-
-    public SimpleMongoUpdateMapper withFields(String... fields) {
-        this.fields = fields;
-        return this;
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.common.mapper;
+
+import org.apache.storm.tuple.ITuple;
+import org.bson.Document;
+
+public class SimpleMongoUpdateMapper extends SimpleMongoMapper implements MongoUpdateMapper {
+
+    private String[] fields;
+
+    public SimpleMongoUpdateMapper(String... fields) {
+        this.fields = fields;
+    }
+
+    @Override
+    public Document toDocument(ITuple tuple) {
+        Document document = new Document();
+        for (String field : fields) {
+            document.append(field, tuple.getValueByField(field));
+        }
+        //$set operator: Sets the value of a field in a document.
+        return new Document("$set", document);
+    }
+
+    public SimpleMongoUpdateMapper withFields(String... fields) {
+        this.fields = fields;
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
index 77c394c..100f931 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
@@ -1,145 +1,145 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.trident.state;
-
-import com.google.common.collect.Lists;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang.Validate;
-import org.apache.storm.mongodb.common.MongoDbClient;
-import org.apache.storm.mongodb.common.QueryFilterCreator;
-import org.apache.storm.mongodb.common.mapper.MongoLookupMapper;
-import org.apache.storm.mongodb.common.mapper.MongoMapper;
-import org.apache.storm.topology.FailedException;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.state.State;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.apache.storm.tuple.Values;
-import org.bson.Document;
-import org.bson.conversions.Bson;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MongoState implements State {
-
-    private static final Logger LOG = LoggerFactory.getLogger(MongoState.class);
-
-    private Options options;
-    private MongoDbClient mongoClient;
-    private Map<String, Object> map;
-
-    protected MongoState(Map<String, Object> map, Options options) {
-        this.options = options;
-        this.map = map;
-    }
-
-    public static class Options implements Serializable {
-        private String url;
-        private String collectionName;
-        private MongoMapper mapper;
-        private MongoLookupMapper lookupMapper;
-        private QueryFilterCreator queryCreator;
-
-        public Options withUrl(String url) {
-            this.url = url;
-            return this;
-        }
-
-        public Options withCollectionName(String collectionName) {
-            this.collectionName = collectionName;
-            return this;
-        }
-
-        public Options withMapper(MongoMapper mapper) {
-            this.mapper = mapper;
-            return this;
-        }
-
-        public Options withMongoLookupMapper(MongoLookupMapper lookupMapper) {
-            this.lookupMapper = lookupMapper;
-            return this;
-        }
-
-        public Options withQueryFilterCreator(QueryFilterCreator queryCreator) {
-            this.queryCreator = queryCreator;
-            return this;
-        }
-    }
-
-    protected void prepare() {
-        Validate.notEmpty(options.url, "url can not be blank or null");
-        Validate.notEmpty(options.collectionName, "collectionName can not be blank or null");
-
-        this.mongoClient = new MongoDbClient(options.url, options.collectionName);
-    }
-
-    @Override
-    public void beginCommit(Long txid) {
-        LOG.debug("beginCommit is noop.");
-    }
-
-    @Override
-    public void commit(Long txid) {
-        LOG.debug("commit is noop.");
-    }
-
-    /**
-     * Update Mongo state.
-     * @param tuples trident tuples
-     * @param collector trident collector
-     */
-    public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
-        List<Document> documents = Lists.newArrayList();
-        for (TridentTuple tuple : tuples) {
-            Document document = options.mapper.toDocument(tuple);
-            documents.add(document);
-        }
-
-        try {
-            this.mongoClient.insert(documents, true);
-        } catch (Exception e) {
-            LOG.warn("Batch write failed but some requests might have succeeded. Triggering replay.", e);
-            throw new FailedException(e);
-        }
-    }
-
-    /**
-     * Batch retrieve values.
-     * @param tridentTuples trident tuples
-     * @return values
-     */
-    public List<List<Values>> batchRetrieve(List<TridentTuple> tridentTuples) {
-        List<List<Values>> batchRetrieveResult = Lists.newArrayList();
-        try {
-            for (TridentTuple tuple : tridentTuples) {
-                Bson filter = options.queryCreator.createFilter(tuple);
-                Document doc = mongoClient.find(filter);
-                List<Values> values = options.lookupMapper.toTuple(tuple, doc);
-                batchRetrieveResult.add(values);
-            }
-        } catch (Exception e) {
-            LOG.warn("Batch get operation failed. Triggering replay.", e);
-            throw new FailedException(e);
-        }
-        return batchRetrieveResult;
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.trident.state;
+
+import com.google.common.collect.Lists;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.Validate;
+import org.apache.storm.mongodb.common.MongoDbClient;
+import org.apache.storm.mongodb.common.QueryFilterCreator;
+import org.apache.storm.mongodb.common.mapper.MongoLookupMapper;
+import org.apache.storm.mongodb.common.mapper.MongoMapper;
+import org.apache.storm.topology.FailedException;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Values;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MongoState implements State {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MongoState.class);
+
+    private Options options;
+    private MongoDbClient mongoClient;
+    private Map<String, Object> map;
+
+    protected MongoState(Map<String, Object> map, Options options) {
+        this.options = options;
+        this.map = map;
+    }
+
+    public static class Options implements Serializable {
+        private String url;
+        private String collectionName;
+        private MongoMapper mapper;
+        private MongoLookupMapper lookupMapper;
+        private QueryFilterCreator queryCreator;
+
+        public Options withUrl(String url) {
+            this.url = url;
+            return this;
+        }
+
+        public Options withCollectionName(String collectionName) {
+            this.collectionName = collectionName;
+            return this;
+        }
+
+        public Options withMapper(MongoMapper mapper) {
+            this.mapper = mapper;
+            return this;
+        }
+
+        public Options withMongoLookupMapper(MongoLookupMapper lookupMapper) {
+            this.lookupMapper = lookupMapper;
+            return this;
+        }
+
+        public Options withQueryFilterCreator(QueryFilterCreator queryCreator) {
+            this.queryCreator = queryCreator;
+            return this;
+        }
+    }
+
+    protected void prepare() {
+        Validate.notEmpty(options.url, "url can not be blank or null");
+        Validate.notEmpty(options.collectionName, "collectionName can not be blank or null");
+
+        this.mongoClient = new MongoDbClient(options.url, options.collectionName);
+    }
+
+    @Override
+    public void beginCommit(Long txid) {
+        LOG.debug("beginCommit is noop.");
+    }
+
+    @Override
+    public void commit(Long txid) {
+        LOG.debug("commit is noop.");
+    }
+
+    /**
+     * Update Mongo state.
+     * @param tuples trident tuples
+     * @param collector trident collector
+     */
+    public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
+        List<Document> documents = Lists.newArrayList();
+        for (TridentTuple tuple : tuples) {
+            Document document = options.mapper.toDocument(tuple);
+            documents.add(document);
+        }
+
+        try {
+            this.mongoClient.insert(documents, true);
+        } catch (Exception e) {
+            LOG.warn("Batch write failed but some requests might have succeeded. Triggering replay.", e);
+            throw new FailedException(e);
+        }
+    }
+
+    /**
+     * Batch retrieve values.
+     * @param tridentTuples trident tuples
+     * @return values
+     */
+    public List<List<Values>> batchRetrieve(List<TridentTuple> tridentTuples) {
+        List<List<Values>> batchRetrieveResult = Lists.newArrayList();
+        try {
+            for (TridentTuple tuple : tridentTuples) {
+                Bson filter = options.queryCreator.createFilter(tuple);
+                Document doc = mongoClient.find(filter);
+                List<Values> values = options.lookupMapper.toTuple(tuple, doc);
+                batchRetrieveResult.add(values);
+            }
+        } catch (Exception e) {
+            LOG.warn("Batch get operation failed. Triggering replay.", e);
+            throw new FailedException(e);
+        }
+        return batchRetrieveResult;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateFactory.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateFactory.java
index a6797e2..d27d9a9 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateFactory.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateFactory.java
@@ -1,43 +1,43 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.trident.state;
-
-import java.util.Map;
-
-import org.apache.storm.task.IMetricsContext;
-import org.apache.storm.trident.state.State;
-import org.apache.storm.trident.state.StateFactory;
-
-public class MongoStateFactory implements StateFactory {
-
-    private MongoState.Options options;
-
-    public MongoStateFactory(MongoState.Options options) {
-        this.options = options;
-    }
-
-    @Override
-    public State makeState(Map<String, Object> conf, IMetricsContext metrics,
-            int partitionIndex, int numPartitions) {
-        MongoState state = new MongoState(conf, options);
-        state.prepare();
-        return state;
-    }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.trident.state;
+
+import java.util.Map;
+
+import org.apache.storm.task.IMetricsContext;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.state.StateFactory;
+
+public class MongoStateFactory implements StateFactory {
+
+    private MongoState.Options options;
+
+    public MongoStateFactory(MongoState.Options options) {
+        this.options = options;
+    }
+
+    @Override
+    public State makeState(Map<String, Object> conf, IMetricsContext metrics,
+            int partitionIndex, int numPartitions) {
+        MongoState state = new MongoState(conf, options);
+        state.prepare();
+        return state;
+    }
+
+}


[4/5] storm git commit: set '* text=auto' in .gitattributes in order to avoid merge work because of line feed changes

Posted by sr...@apache.org.
set '* text=auto' in .gitattributes in order to avoid merge work because of line feed changes


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e9c427cb
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e9c427cb
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e9c427cb

Branch: refs/heads/master
Commit: e9c427cbe4b32f6103e704c48bc1dbcb936b98cb
Parents: 0fdad2c
Author: Karl-Philipp Richter <kr...@aol.de>
Authored: Fri Mar 23 06:00:07 2018 +0100
Committer: Karl-Philipp Richter <kr...@aol.de>
Committed: Fri Mar 23 06:00:07 2018 +0100

----------------------------------------------------------------------
 .gitattributes                                  |  11 +-
 docs/storm-eventhubs.md                         |  80 +--
 external/storm-eventhubs/README.md              |  90 +--
 .../eventhubs/bolt/DefaultEventDataFormat.java  |  94 +--
 .../storm/eventhubs/bolt/EventHubBolt.java      | 292 ++++-----
 .../eventhubs/bolt/EventHubBoltConfig.java      | 222 +++----
 .../storm/eventhubs/bolt/IEventDataFormat.java  |  56 +-
 .../storm/eventhubs/spout/EventDataScheme.java  | 156 ++---
 .../storm/eventhubs/spout/FieldConstants.java   |  56 +-
 .../storm/eventhubs/spout/IEventDataScheme.java |  88 +--
 .../spout/IEventHubReceiverFactory.java         |  60 +-
 .../eventhubs/spout/IPartitionCoordinator.java  |  54 +-
 .../eventhubs/spout/IPartitionManager.java      |  74 +--
 .../storm/eventhubs/spout/IStateStore.java      |  62 +-
 .../apache/storm/eventhubs/spout/MessageId.java | 112 ++--
 .../eventhubs/spout/ZookeeperStateStore.java    | 190 +++---
 .../storm/eventhubs/trident/Coordinator.java    | 120 ++--
 .../trident/ITridentPartitionManager.java       |  70 +-
 .../ITridentPartitionManagerFactory.java        |  52 +-
 .../trident/OpaqueTridentEventHubEmitter.java   | 136 ++--
 .../trident/OpaqueTridentEventHubSpout.java     | 128 ++--
 .../storm/eventhubs/trident/Partition.java      |  78 +--
 .../storm/eventhubs/trident/Partitions.java     |  82 +--
 .../TransactionalTridentEventHubSpout.java      | 132 ++--
 .../storm/eventhubs/samples/EventHubLoop.java   | 104 +--
 .../samples/OpaqueTridentEventCount.java        | 106 +--
 .../samples/TransactionalTridentEventCount.java | 162 ++---
 .../eventhubs/samples/bolt/GlobalCountBolt.java | 176 ++---
 .../samples/bolt/PartialCountBolt.java          | 136 ++--
 .../spout/SpoutOutputCollectorMock.java         | 142 ++--
 .../storm/eventhubs/spout/StateStoreMock.java   | 108 +--
 .../storm/eventhubs/spout/TestEventData.java    |  94 +--
 .../eventhubs/trident/TridentCollectorMock.java | 114 ++--
 external/storm-mongodb/README.md                | 650 +++++++++----------
 .../storm/mongodb/bolt/AbstractMongoBolt.java   | 124 ++--
 .../storm/mongodb/bolt/MongoInsertBolt.java     | 248 +++----
 .../storm/mongodb/bolt/MongoUpdateBolt.java     | 186 +++---
 .../storm/mongodb/common/MongoDbClient.java     | 218 +++----
 .../mongodb/common/QueryFilterCreator.java      |  94 +--
 .../common/SimpleQueryFilterCreator.java        |  94 +--
 .../mongodb/common/mapper/MongoMapper.java      |  94 +--
 .../common/mapper/SimpleMongoMapper.java        | 110 ++--
 .../common/mapper/SimpleMongoUpdateMapper.java  |  92 +--
 .../storm/mongodb/trident/state/MongoState.java | 290 ++++-----
 .../trident/state/MongoStateFactory.java        |  86 +--
 .../trident/state/MongoStateUpdater.java        |  70 +-
 integration-test/README.md                      | 118 ++--
 .../storm/trident/TestTridentTopology.java      | 152 ++---
 48 files changed, 3136 insertions(+), 3127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/.gitattributes
----------------------------------------------------------------------
diff --git a/.gitattributes b/.gitattributes
index c2dc683..ed9fb85 100644
--- a/.gitattributes
+++ b/.gitattributes
@@ -1,2 +1,11 @@
 # Some storm-webapp logviewer tests require input files to have LF line endings due to byte counting.
-storm-webapp/src/test/resources/*.log.test text eol=lf
\ No newline at end of file
+storm-webapp/src/test/resources/*.log.test text eol=lf
+
+# Convert the test on check-in and check-out (the conversion of all files has been done once on master and should be enforced from now on)
+* text=auto
+
+# There're reports of EOL conversion messing up PNG files, but that might have been a bug in git 2.10 only (see https://github.com/git/git/blob/master/Documentation/RelNotes/2.10.0.txt#L248 for details)
+*.png binary
+*.tar.gz binary
+*.zip binary
+*.tgz binary

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/docs/storm-eventhubs.md
----------------------------------------------------------------------
diff --git a/docs/storm-eventhubs.md b/docs/storm-eventhubs.md
index 4af8c43..df46755 100644
--- a/docs/storm-eventhubs.md
+++ b/docs/storm-eventhubs.md
@@ -1,40 +1,40 @@
----
-title: Azue Event Hubs Integration
-layout: documentation
-documentation: true
----
-
-Storm spout and bolt implementation for Microsoft Azure Eventhubs
-
-### build ###
-	mvn clean package
-
-### run sample topology ###
-To run the sample topology, you need to modify the config.properties file with
-the eventhubs configurations. Here is an example:
-
-	eventhubspout.username = [username: policy name in EventHubs Portal]
-	eventhubspout.password = [password: shared access key in EventHubs Portal]
-	eventhubspout.namespace = [namespace]
-	eventhubspout.entitypath = [entitypath]
-	eventhubspout.partitions.count = [partitioncount]
-
-	# if not provided, will use storm's zookeeper settings
-	# zookeeper.connectionstring=zookeeper0:2181,zookeeper1:2181,zookeeper2:2181
-
-	eventhubspout.checkpoint.interval = 10
-	eventhub.receiver.credits = 1024
-
-Then you can use storm.cmd to submit the sample topology:
-	storm jar {jarfile} com.microsoft.eventhubs.samples.EventCount {topologyname} {spoutconffile}
-	where the {jarfile} should be: eventhubs-storm-spout-{version}-jar-with-dependencies.jar
-
-### Run EventHubSendClient ###
-We have included a simple EventHubs send client for testing purpose. You can run the client like this:
-	java -cp .\target\eventhubs-storm-spout-{version}-jar-with-dependencies.jar com.microsoft.eventhubs.client.EventHubSendClient
- 	[username] [password] [entityPath] [partitionId] [messageSize] [messageCount]
-If you want to send messages to all partitions, use "-1" as partitionId.
-
-### Windows Azure Eventhubs ###
-	http://azure.microsoft.com/en-us/services/event-hubs/
-
+---
+title: Azue Event Hubs Integration
+layout: documentation
+documentation: true
+---
+
+Storm spout and bolt implementation for Microsoft Azure Eventhubs
+
+### build ###
+	mvn clean package
+
+### run sample topology ###
+To run the sample topology, you need to modify the config.properties file with
+the eventhubs configurations. Here is an example:
+
+	eventhubspout.username = [username: policy name in EventHubs Portal]
+	eventhubspout.password = [password: shared access key in EventHubs Portal]
+	eventhubspout.namespace = [namespace]
+	eventhubspout.entitypath = [entitypath]
+	eventhubspout.partitions.count = [partitioncount]
+
+	# if not provided, will use storm's zookeeper settings
+	# zookeeper.connectionstring=zookeeper0:2181,zookeeper1:2181,zookeeper2:2181
+
+	eventhubspout.checkpoint.interval = 10
+	eventhub.receiver.credits = 1024
+
+Then you can use storm.cmd to submit the sample topology:
+	storm jar {jarfile} com.microsoft.eventhubs.samples.EventCount {topologyname} {spoutconffile}
+	where the {jarfile} should be: eventhubs-storm-spout-{version}-jar-with-dependencies.jar
+
+### Run EventHubSendClient ###
+We have included a simple EventHubs send client for testing purpose. You can run the client like this:
+	java -cp .\target\eventhubs-storm-spout-{version}-jar-with-dependencies.jar com.microsoft.eventhubs.client.EventHubSendClient
+ 	[username] [password] [entityPath] [partitionId] [messageSize] [messageCount]
+If you want to send messages to all partitions, use "-1" as partitionId.
+
+### Windows Azure Eventhubs ###
+	http://azure.microsoft.com/en-us/services/event-hubs/
+

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/README.md
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/README.md b/external/storm-eventhubs/README.md
index 681ab2d..98b43a5 100755
--- a/external/storm-eventhubs/README.md
+++ b/external/storm-eventhubs/README.md
@@ -1,45 +1,45 @@
-storm-eventhubs
-=====================
-
-Storm spout and bolt implementation for Microsoft Azure Eventhubs
-
-### build ###
-	mvn clean package
-
-### run sample topology ###
-To run the sample topology, you need to modify the config.properties file with
-the eventhubs configurations. Here is an example:
-
-	eventhubspout.username = [username: policy name in EventHubs Portal]
-	eventhubspout.password = [password: shared access key in EventHubs Portal]
-	eventhubspout.namespace = [namespace]
-	eventhubspout.entitypath = [entitypath]
-	eventhubspout.partitions.count = [partitioncount]
-
-	# if not provided, will use storm's zookeeper settings
-	# zookeeper.connectionstring=zookeeper0:2181,zookeeper1:2181,zookeeper2:2181
-
-	eventhubspout.checkpoint.interval = 10
-	eventhub.receiver.credits = 1024
-
-Then you can use storm.cmd to submit the sample topology:
-	storm jar {jarfile} com.microsoft.eventhubs.samples.EventCount {topologyname} {spoutconffile}
-	where the {jarfile} should be: eventhubs-storm-spout-{version}-jar-with-dependencies.jar
-
-### Run EventHubSendClient ###
-We have included a simple EventHubs send client for testing purpose. You can run the client like this:
-	java -cp .\target\eventhubs-storm-spout-{version}-jar-with-dependencies.jar com.microsoft.eventhubs.client.EventHubSendClient
- 	[username] [password] [entityPath] [partitionId] [messageSize] [messageCount]
-If you want to send messages to all partitions, use "-1" as partitionId.
-
-### Serialization Scheme ###
-    By default the serialization scheme is StringEventDataScheme where only the body of the eventdata is being sent.
-    To have a more comprehensive Eventhub metadata exposure look into the BinaryEventDataScheme.
-
-### Windows Azure Eventhubs ###
-	http://azure.microsoft.com/en-us/services/event-hubs/
-
-## Committer Sponsors
-
- * P. Taylor Goetz ([ptgoetz@apache.org](mailto:ptgoetz@apache.org))
-
+storm-eventhubs
+=====================
+
+Storm spout and bolt implementation for Microsoft Azure Eventhubs
+
+### build ###
+	mvn clean package
+
+### run sample topology ###
+To run the sample topology, you need to modify the config.properties file with
+the eventhubs configurations. Here is an example:
+
+	eventhubspout.username = [username: policy name in EventHubs Portal]
+	eventhubspout.password = [password: shared access key in EventHubs Portal]
+	eventhubspout.namespace = [namespace]
+	eventhubspout.entitypath = [entitypath]
+	eventhubspout.partitions.count = [partitioncount]
+
+	# if not provided, will use storm's zookeeper settings
+	# zookeeper.connectionstring=zookeeper0:2181,zookeeper1:2181,zookeeper2:2181
+
+	eventhubspout.checkpoint.interval = 10
+	eventhub.receiver.credits = 1024
+
+Then you can use storm.cmd to submit the sample topology:
+	storm jar {jarfile} com.microsoft.eventhubs.samples.EventCount {topologyname} {spoutconffile}
+	where the {jarfile} should be: eventhubs-storm-spout-{version}-jar-with-dependencies.jar
+
+### Run EventHubSendClient ###
+We have included a simple EventHubs send client for testing purpose. You can run the client like this:
+	java -cp .\target\eventhubs-storm-spout-{version}-jar-with-dependencies.jar com.microsoft.eventhubs.client.EventHubSendClient
+ 	[username] [password] [entityPath] [partitionId] [messageSize] [messageCount]
+If you want to send messages to all partitions, use "-1" as partitionId.
+
+### Serialization Scheme ###
+    By default the serialization scheme is StringEventDataScheme where only the body of the eventdata is being sent.
+    To have a more comprehensive Eventhub metadata exposure look into the BinaryEventDataScheme.
+
+### Windows Azure Eventhubs ###
+	http://azure.microsoft.com/en-us/services/event-hubs/
+
+## Committer Sponsors
+
+ * P. Taylor Goetz ([ptgoetz@apache.org](mailto:ptgoetz@apache.org))
+

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/DefaultEventDataFormat.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/DefaultEventDataFormat.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/DefaultEventDataFormat.java
index 21940de..d6e1dbc 100644
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/DefaultEventDataFormat.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/DefaultEventDataFormat.java
@@ -1,47 +1,47 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.bolt;
-
-import org.apache.storm.tuple.Tuple;
-
-/**
- * A default implementation of IEventDataFormat that converts the tuple
- * into a delimited string.
- */
-public class DefaultEventDataFormat implements IEventDataFormat {
-  private static final long serialVersionUID = 1L;
-  private String delimiter = ",";
-  
-  public DefaultEventDataFormat withFieldDelimiter(String delimiter) {
-    this.delimiter = delimiter;
-    return this;
-  }
-
-  @Override
-  public byte[] serialize(Tuple tuple) {
-    StringBuilder sb = new StringBuilder();
-    for(Object obj : tuple.getValues()) {
-      if(sb.length() != 0) {
-        sb.append(delimiter);
-      }
-      sb.append(obj.toString());
-    }
-    return sb.toString().getBytes();
-  }
-
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.bolt;
+
+import org.apache.storm.tuple.Tuple;
+
+/**
+ * A default implementation of IEventDataFormat that converts the tuple
+ * into a delimited string.
+ */
+public class DefaultEventDataFormat implements IEventDataFormat {
+  private static final long serialVersionUID = 1L;
+  private String delimiter = ",";
+  
+  public DefaultEventDataFormat withFieldDelimiter(String delimiter) {
+    this.delimiter = delimiter;
+    return this;
+  }
+
+  @Override
+  public byte[] serialize(Tuple tuple) {
+    StringBuilder sb = new StringBuilder();
+    for(Object obj : tuple.getValues()) {
+      if(sb.length() != 0) {
+        sb.append(delimiter);
+      }
+      sb.append(obj.toString());
+    }
+    return sb.toString().getBytes();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
index bede4a3..7d1aeab 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
@@ -1,146 +1,146 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.bolt;
-
-
-import com.microsoft.azure.eventhubs.EventData;
-import com.microsoft.azure.eventhubs.EventHubClient;
-import com.microsoft.azure.eventhubs.PartitionSender;
-import com.microsoft.azure.servicebus.ServiceBusException;
-import org.apache.storm.eventhubs.spout.EventHubException;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichBolt;
-import org.apache.storm.tuple.Tuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-
-/**
- * A bolt that writes event message to EventHub.
- */
-public class EventHubBolt extends BaseRichBolt {
-	private static final long serialVersionUID = 1L;
-	private static final Logger logger = LoggerFactory
-			.getLogger(EventHubBolt.class);
-
-	protected OutputCollector collector;
-	protected PartitionSender sender;
-	protected EventHubClient ehClient;
-	protected EventHubBoltConfig boltConfig;
-
-	public EventHubBolt(String connectionString, String entityPath) {
-		boltConfig = new EventHubBoltConfig(connectionString, entityPath);
-	}
-
-	public EventHubBolt(String userName, String password, String namespace,
-			String entityPath, boolean partitionMode) {
-		boltConfig = new EventHubBoltConfig(userName, password, namespace,
-				entityPath, partitionMode);
-	}
-
-	public EventHubBolt(EventHubBoltConfig config) {
-		boltConfig = config;
-	}
-
-	@Override
-	public void prepare(Map<String, Object> config, TopologyContext context,
-			OutputCollector collector) {
-		this.collector = collector;
-		String myPartitionId = null;
-		if (boltConfig.getPartitionMode()) {
-			// We can use the task index (starting from 0) as the partition ID
-			myPartitionId = "" + context.getThisTaskIndex();
-		}
-		logger.info("creating sender: " + boltConfig.getConnectionString()
-				+ ", " + boltConfig.getEntityPath() + ", " + myPartitionId);
-		try {
-			ehClient = EventHubClient.createFromConnectionStringSync(boltConfig.getConnectionString());
-			if (boltConfig.getPartitionMode()) {
-				sender = ehClient.createPartitionSenderSync(Integer.toString(context.getThisTaskIndex()));
-			}
-		} catch (Exception ex) {
-			collector.reportError(ex);
-			throw new RuntimeException(ex);
-		}
-
-	}
-
-	@Override
-	public void execute(Tuple tuple) {
-		try {
-			EventData sendEvent = new EventData(boltConfig.getEventDataFormat().serialize(tuple));
-			if (boltConfig.getPartitionMode() && sender!=null) {
-				sender.sendSync(sendEvent);
-			}
-			else if (boltConfig.getPartitionMode() && sender==null) {
-				throw new EventHubException("Sender is null");
-			}
-			else if (!boltConfig.getPartitionMode() && ehClient!=null) {
-				ehClient.sendSync(sendEvent);
-			}
-			else if (!boltConfig.getPartitionMode() && ehClient==null) {
-				throw new EventHubException("ehclient is null");
-			}
-			collector.ack(tuple);
-		} catch (EventHubException ex ) {
-			collector.reportError(ex);
-			collector.fail(tuple);
-		} catch (ServiceBusException e) {
-			collector.reportError(e);
-			collector.fail(tuple);
-		}
-	}
-
-	@Override
-	public void cleanup() {
-		if(sender != null) {
-			try {
-				sender.close().whenComplete((voidargs,error)->{
-					try{
-						if(error!=null){
-							logger.error("Exception during sender cleanup phase"+error.toString());
-						}
-						ehClient.closeSync();
-					}catch (Exception e){
-						logger.error("Exception during ehclient cleanup phase"+e.toString());
-					}
-				}).get();
-			} catch (InterruptedException e) {
-				logger.error("Exception occured during cleanup phase"+e.toString());
-			} catch (ExecutionException e) {
-				logger.error("Exception occured during cleanup phase"+e.toString());
-			}
-			logger.info("Eventhub Bolt cleaned up");
-			sender = null;
-			ehClient =  null;
-		}
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
-	}
-
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.bolt;
+
+
+import com.microsoft.azure.eventhubs.EventData;
+import com.microsoft.azure.eventhubs.EventHubClient;
+import com.microsoft.azure.eventhubs.PartitionSender;
+import com.microsoft.azure.servicebus.ServiceBusException;
+import org.apache.storm.eventhubs.spout.EventHubException;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * A bolt that writes event message to EventHub.
+ */
+public class EventHubBolt extends BaseRichBolt {
+	private static final long serialVersionUID = 1L;
+	private static final Logger logger = LoggerFactory
+			.getLogger(EventHubBolt.class);
+
+	protected OutputCollector collector;
+	protected PartitionSender sender;
+	protected EventHubClient ehClient;
+	protected EventHubBoltConfig boltConfig;
+
+	public EventHubBolt(String connectionString, String entityPath) {
+		boltConfig = new EventHubBoltConfig(connectionString, entityPath);
+	}
+
+	public EventHubBolt(String userName, String password, String namespace,
+			String entityPath, boolean partitionMode) {
+		boltConfig = new EventHubBoltConfig(userName, password, namespace,
+				entityPath, partitionMode);
+	}
+
+	public EventHubBolt(EventHubBoltConfig config) {
+		boltConfig = config;
+	}
+
+	@Override
+	public void prepare(Map<String, Object> config, TopologyContext context,
+			OutputCollector collector) {
+		this.collector = collector;
+		String myPartitionId = null;
+		if (boltConfig.getPartitionMode()) {
+			// We can use the task index (starting from 0) as the partition ID
+			myPartitionId = "" + context.getThisTaskIndex();
+		}
+		logger.info("creating sender: " + boltConfig.getConnectionString()
+				+ ", " + boltConfig.getEntityPath() + ", " + myPartitionId);
+		try {
+			ehClient = EventHubClient.createFromConnectionStringSync(boltConfig.getConnectionString());
+			if (boltConfig.getPartitionMode()) {
+				sender = ehClient.createPartitionSenderSync(Integer.toString(context.getThisTaskIndex()));
+			}
+		} catch (Exception ex) {
+			collector.reportError(ex);
+			throw new RuntimeException(ex);
+		}
+
+	}
+
+	@Override
+	public void execute(Tuple tuple) {
+		try {
+			EventData sendEvent = new EventData(boltConfig.getEventDataFormat().serialize(tuple));
+			if (boltConfig.getPartitionMode() && sender!=null) {
+				sender.sendSync(sendEvent);
+			}
+			else if (boltConfig.getPartitionMode() && sender==null) {
+				throw new EventHubException("Sender is null");
+			}
+			else if (!boltConfig.getPartitionMode() && ehClient!=null) {
+				ehClient.sendSync(sendEvent);
+			}
+			else if (!boltConfig.getPartitionMode() && ehClient==null) {
+				throw new EventHubException("ehclient is null");
+			}
+			collector.ack(tuple);
+		} catch (EventHubException ex ) {
+			collector.reportError(ex);
+			collector.fail(tuple);
+		} catch (ServiceBusException e) {
+			collector.reportError(e);
+			collector.fail(tuple);
+		}
+	}
+
+	@Override
+	public void cleanup() {
+		if(sender != null) {
+			try {
+				sender.close().whenComplete((voidargs,error)->{
+					try{
+						if(error!=null){
+							logger.error("Exception during sender cleanup phase"+error.toString());
+						}
+						ehClient.closeSync();
+					}catch (Exception e){
+						logger.error("Exception during ehclient cleanup phase"+e.toString());
+					}
+				}).get();
+			} catch (InterruptedException e) {
+				logger.error("Exception occured during cleanup phase"+e.toString());
+			} catch (ExecutionException e) {
+				logger.error("Exception occured during cleanup phase"+e.toString());
+			}
+			logger.info("Eventhub Bolt cleaned up");
+			sender = null;
+			ehClient =  null;
+		}
+	}
+
+	@Override
+	public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java
index fe2d989..f5e1458 100644
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java
@@ -1,111 +1,111 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.bolt;
-
-import com.microsoft.azure.servicebus.ConnectionStringBuilder;
-import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
-
-import java.io.Serializable;
-
-import java.io.Serializable;
-
-/*
- * EventHubs bolt configurations
- *
- * Partition mode:
- * With partitionMode=true you need to create the same number of tasks as the number of 
- * EventHubs partitions, and each bolt task will only send data to one partition.
- * The partition ID is the task ID of the bolt.
- * 
- * Event format:
- * The formatter to convert tuple to bytes for EventHubs.
- * if null, the default format is common delimited tuple fields.
- */
-public class EventHubBoltConfig implements Serializable {
-  private static final long serialVersionUID = 1L;
-  
-  private String connectionString;
-  private final String entityPath;
-  protected boolean partitionMode;
-  protected IEventDataFormat dataFormat;
-  
-  public EventHubBoltConfig(String connectionString, String entityPath) {
-    this(connectionString, entityPath, false, null);
-  }
-  
-  public EventHubBoltConfig(String connectionString, String entityPath,
-      boolean partitionMode) {
-    this(connectionString, entityPath, partitionMode, null);
-  }
-  
-  public EventHubBoltConfig(String userName, String password, String namespace,
-      String entityPath, boolean partitionMode) {
-    this(userName, password, namespace,
-        EventHubSpoutConfig.EH_SERVICE_FQDN_SUFFIX, entityPath, partitionMode);
-  }
-  
-  public EventHubBoltConfig(String connectionString, String entityPath,
-      boolean partitionMode, IEventDataFormat dataFormat) {
-    this.connectionString = connectionString;
-    this.entityPath = entityPath;
-    this.partitionMode = partitionMode;
-    this.dataFormat = dataFormat;
-    if(this.dataFormat == null) {
-      this.dataFormat = new DefaultEventDataFormat();
-    }
-  }
-  
-  public EventHubBoltConfig(String userName, String password, String namespace,
-      String targetFqnAddress, String entityPath) {
-    this(userName, password, namespace, targetFqnAddress, entityPath, false, null);
-  }
-  
-  public EventHubBoltConfig(String userName, String password, String namespace,
-      String targetFqnAddress, String entityPath, boolean partitionMode) {
-    this(userName, password, namespace, targetFqnAddress, entityPath, partitionMode, null);
-  }
-  
-  public EventHubBoltConfig(String userName, String password, String namespace,
-      String targetFqnAddress, String entityPath, boolean partitionMode,
-      IEventDataFormat dataFormat) {
-    this.connectionString = new ConnectionStringBuilder(namespace,entityPath,
-            userName,password).toString();
-    this.entityPath = entityPath;
-    this.partitionMode = partitionMode;
-    this.dataFormat = dataFormat;
-    if(this.dataFormat == null) {
-      this.dataFormat = new DefaultEventDataFormat();
-    }
-  }
-  
-  public String getConnectionString() {
-    return connectionString;
-  }
-  
-  public String getEntityPath() {
-    return entityPath;
-  }
-  
-  public boolean getPartitionMode() {
-    return partitionMode;
-  }
-  
-  public IEventDataFormat getEventDataFormat() {
-    return dataFormat;
-  }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.bolt;
+
+import com.microsoft.azure.servicebus.ConnectionStringBuilder;
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+
+import java.io.Serializable;
+
+import java.io.Serializable;
+
+/*
+ * EventHubs bolt configurations
+ *
+ * Partition mode:
+ * With partitionMode=true you need to create the same number of tasks as the number of 
+ * EventHubs partitions, and each bolt task will only send data to one partition.
+ * The partition ID is the task ID of the bolt.
+ * 
+ * Event format:
+ * The formatter to convert tuple to bytes for EventHubs.
+ * if null, the default format is common delimited tuple fields.
+ */
+public class EventHubBoltConfig implements Serializable {
+  private static final long serialVersionUID = 1L;
+  
+  private String connectionString;
+  private final String entityPath;
+  protected boolean partitionMode;
+  protected IEventDataFormat dataFormat;
+  
+  public EventHubBoltConfig(String connectionString, String entityPath) {
+    this(connectionString, entityPath, false, null);
+  }
+  
+  public EventHubBoltConfig(String connectionString, String entityPath,
+      boolean partitionMode) {
+    this(connectionString, entityPath, partitionMode, null);
+  }
+  
+  public EventHubBoltConfig(String userName, String password, String namespace,
+      String entityPath, boolean partitionMode) {
+    this(userName, password, namespace,
+        EventHubSpoutConfig.EH_SERVICE_FQDN_SUFFIX, entityPath, partitionMode);
+  }
+  
+  public EventHubBoltConfig(String connectionString, String entityPath,
+      boolean partitionMode, IEventDataFormat dataFormat) {
+    this.connectionString = connectionString;
+    this.entityPath = entityPath;
+    this.partitionMode = partitionMode;
+    this.dataFormat = dataFormat;
+    if(this.dataFormat == null) {
+      this.dataFormat = new DefaultEventDataFormat();
+    }
+  }
+  
+  public EventHubBoltConfig(String userName, String password, String namespace,
+      String targetFqnAddress, String entityPath) {
+    this(userName, password, namespace, targetFqnAddress, entityPath, false, null);
+  }
+  
+  public EventHubBoltConfig(String userName, String password, String namespace,
+      String targetFqnAddress, String entityPath, boolean partitionMode) {
+    this(userName, password, namespace, targetFqnAddress, entityPath, partitionMode, null);
+  }
+  
+  public EventHubBoltConfig(String userName, String password, String namespace,
+      String targetFqnAddress, String entityPath, boolean partitionMode,
+      IEventDataFormat dataFormat) {
+    this.connectionString = new ConnectionStringBuilder(namespace,entityPath,
+            userName,password).toString();
+    this.entityPath = entityPath;
+    this.partitionMode = partitionMode;
+    this.dataFormat = dataFormat;
+    if(this.dataFormat == null) {
+      this.dataFormat = new DefaultEventDataFormat();
+    }
+  }
+  
+  public String getConnectionString() {
+    return connectionString;
+  }
+  
+  public String getEntityPath() {
+    return entityPath;
+  }
+  
+  public boolean getPartitionMode() {
+    return partitionMode;
+  }
+  
+  public IEventDataFormat getEventDataFormat() {
+    return dataFormat;
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/IEventDataFormat.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/IEventDataFormat.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/IEventDataFormat.java
index 743d5bb..d2aacb7 100644
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/IEventDataFormat.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/IEventDataFormat.java
@@ -1,28 +1,28 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.bolt;
-
-import java.io.Serializable;
-import org.apache.storm.tuple.Tuple;
-
-/**
- * Serialize a tuple to a byte array to be sent to EventHubs
- */
-public interface IEventDataFormat extends Serializable {
-  public byte[] serialize(Tuple tuple);
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.bolt;
+
+import java.io.Serializable;
+import org.apache.storm.tuple.Tuple;
+
+/**
+ * Serialize a tuple to a byte array to be sent to EventHubs
+ */
+public interface IEventDataFormat extends Serializable {
+  public byte[] serialize(Tuple tuple);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
index f82ffe6..9fbcecf 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
@@ -1,78 +1,78 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.spout;
-
-import com.microsoft.azure.eventhubs.EventData;
-import org.apache.storm.tuple.Fields;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * An Event Data Scheme which deserializes message payload into the Strings. No
- * encoding is assumed. The receiver will need to handle parsing of the string
- * data in appropriate encoding.
- *
- * The resulting tuple would contain two items: the the message string, and a
- * map of properties that include metadata, which can be used to determine who
- * processes the message, and how it is processed.
- * 
- * For passing the raw bytes of a messsage to Bolts, refer to
- * {@link BinaryEventDataScheme}.
- */
-public class EventDataScheme implements IEventDataScheme {
-
-	private static final long serialVersionUID = 1L;
-	private static final Logger logger = LoggerFactory.getLogger(EventDataScheme.class);
-	@Override
-	public List<Object> deserialize(EventData eventData) {
-		final List<Object> fieldContents = new ArrayList<Object>();
-		String messageData = "";
-		if (eventData.getBytes()!=null) {
-			messageData = new String(eventData.getBytes());
-		}
-		/*Will only serialize AMQPValue type*/
-		else if (eventData.getObject()!=null) {
-			try {
-				if (!(eventData.getObject() instanceof List)) {
-					messageData = eventData.getObject().toString();
-				} else {
-					throw new RuntimeException("Cannot serialize the given AMQP type");
-				}
-			} catch (RuntimeException e) {
-				logger.error("Failed to serialize EventData payload class"
-						+ eventData.getObject().getClass());
-				logger.error("Exception encountered while serializing EventData payload is"
-						+ e.toString());
-				throw e;
-			}
-		}
-		Map<String, Object> metaDataMap = eventData.getProperties();
-                fieldContents.add(messageData);
-		fieldContents.add(metaDataMap);
-		return fieldContents;
-	}
-
-	@Override
-	public Fields getOutputFields() {
-		return new Fields(FieldConstants.Message, FieldConstants.META_DATA);
-	}
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import com.microsoft.azure.eventhubs.EventData;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * An Event Data Scheme which deserializes message payload into the Strings. No
+ * encoding is assumed. The receiver will need to handle parsing of the string
+ * data in appropriate encoding.
+ *
+ * The resulting tuple would contain two items: the the message string, and a
+ * map of properties that include metadata, which can be used to determine who
+ * processes the message, and how it is processed.
+ * 
+ * For passing the raw bytes of a messsage to Bolts, refer to
+ * {@link BinaryEventDataScheme}.
+ */
+public class EventDataScheme implements IEventDataScheme {
+
+	private static final long serialVersionUID = 1L;
+	private static final Logger logger = LoggerFactory.getLogger(EventDataScheme.class);
+	@Override
+	public List<Object> deserialize(EventData eventData) {
+		final List<Object> fieldContents = new ArrayList<Object>();
+		String messageData = "";
+		if (eventData.getBytes()!=null) {
+			messageData = new String(eventData.getBytes());
+		}
+		/*Will only serialize AMQPValue type*/
+		else if (eventData.getObject()!=null) {
+			try {
+				if (!(eventData.getObject() instanceof List)) {
+					messageData = eventData.getObject().toString();
+				} else {
+					throw new RuntimeException("Cannot serialize the given AMQP type");
+				}
+			} catch (RuntimeException e) {
+				logger.error("Failed to serialize EventData payload class"
+						+ eventData.getObject().getClass());
+				logger.error("Exception encountered while serializing EventData payload is"
+						+ e.toString());
+				throw e;
+			}
+		}
+		Map<String, Object> metaDataMap = eventData.getProperties();
+                fieldContents.add(messageData);
+		fieldContents.add(metaDataMap);
+		return fieldContents;
+	}
+
+	@Override
+	public Fields getOutputFields() {
+		return new Fields(FieldConstants.Message, FieldConstants.META_DATA);
+	}
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java
index 4c1c3e7..88855eb 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java
@@ -1,28 +1,28 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.spout;
-
-public class FieldConstants {
-
-  public static final String PartitionKey = "partitionKey";
-  public static final String Offset = "offset";
-  public static final String Message = "message";
-  public static final String META_DATA = "metadata";
-  public static final String SYSTEM_META_DATA = "eventdata_system_properties";
-  public static final String DefaultStartingOffset = "-1";
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+public class FieldConstants {
+
+  public static final String PartitionKey = "partitionKey";
+  public static final String Offset = "offset";
+  public static final String Message = "message";
+  public static final String META_DATA = "metadata";
+  public static final String SYSTEM_META_DATA = "eventdata_system_properties";
+  public static final String DefaultStartingOffset = "-1";
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java
index 6c78524..854da6f 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java
@@ -1,44 +1,44 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.spout;
-
-import com.microsoft.azure.eventhubs.EventData;
-import org.apache.storm.tuple.Fields;
-
-import java.io.Serializable;
-import java.util.List;
-
-public interface IEventDataScheme extends Serializable {
-
-  /**
-   * Deserialize an AMQP Message into a Tuple.
-   *
-   * @see #getOutputFields() for the list of fields the tuple will contain.
-   *
-   * @param eventData The EventData to Deserialize.
-   * @return A tuple containing the deserialized fields of the message.
-   */
-  List<Object> deserialize(EventData eventData);
-
-  /**
-   * Retrieve the Fields that are present on tuples created by this object.
-   *
-   * @return The Fields that are present on tuples created by this object.
-   */
-  Fields getOutputFields();
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import com.microsoft.azure.eventhubs.EventData;
+import org.apache.storm.tuple.Fields;
+
+import java.io.Serializable;
+import java.util.List;
+
+public interface IEventDataScheme extends Serializable {
+
+  /**
+   * Deserialize an AMQP Message into a Tuple.
+   *
+   * @see #getOutputFields() for the list of fields the tuple will contain.
+   *
+   * @param eventData The EventData to Deserialize.
+   * @return A tuple containing the deserialized fields of the message.
+   */
+  List<Object> deserialize(EventData eventData);
+
+  /**
+   * Retrieve the Fields that are present on tuples created by this object.
+   *
+   * @return The Fields that are present on tuples created by this object.
+   */
+  Fields getOutputFields();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFactory.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFactory.java
index fbebdc8..a4901ae 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFactory.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFactory.java
@@ -1,30 +1,30 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-/**
- * 
- */
-package org.apache.storm.eventhubs.spout;
-
-import java.io.Serializable;
-
-/**
- * An abstract factory to generate EventHubReceiver
- */
-public interface IEventHubReceiverFactory extends Serializable {
-  IEventHubReceiver create(EventHubSpoutConfig config, String partitionId);
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+/**
+ * 
+ */
+package org.apache.storm.eventhubs.spout;
+
+import java.io.Serializable;
+
+/**
+ * An abstract factory to generate EventHubReceiver
+ */
+public interface IEventHubReceiverFactory extends Serializable {
+  IEventHubReceiver create(EventHubSpoutConfig config, String partitionId);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionCoordinator.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionCoordinator.java
index a460681..e99f20a 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionCoordinator.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionCoordinator.java
@@ -1,27 +1,27 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.spout;
-
-import java.util.List;
-
-public interface IPartitionCoordinator {
-
-  List<IPartitionManager> getMyPartitionManagers();
-
-  IPartitionManager getPartitionManager(String partitionId);
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import java.util.List;
+
+public interface IPartitionCoordinator {
+
+  List<IPartitionManager> getMyPartitionManagers();
+
+  IPartitionManager getPartitionManager(String partitionId);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java
index 123d1c1..d391edd 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java
@@ -1,37 +1,37 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.spout;
-
-import java.util.Map;
-
-public interface IPartitionManager {
-
-  void open() throws Exception;
-
-  void close();
-
-  EventDataWrap receive();
-
-  void checkpoint();
-
-  void ack(String offset);
-
-  void fail(String offset);
-  
-  Map<String, Object> getMetricsData();
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import java.util.Map;
+
+public interface IPartitionManager {
+
+  void open() throws Exception;
+
+  void close();
+
+  EventDataWrap receive();
+
+  void checkpoint();
+
+  void ack(String offset);
+
+  void fail(String offset);
+  
+  Map<String, Object> getMetricsData();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IStateStore.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IStateStore.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IStateStore.java
index f0ee2be..03c7ae8 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IStateStore.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IStateStore.java
@@ -1,31 +1,31 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.spout;
-
-import java.io.Serializable;
-
-public interface IStateStore extends Serializable {
-
-  public void open();
-
-  public void close();
-
-  public void saveData(String path, String data);
-
-  public String readData(String path);
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import java.io.Serializable;
+
+public interface IStateStore extends Serializable {
+
+  public void open();
+
+  public void close();
+
+  public void saveData(String path, String data);
+
+  public String readData(String path);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/MessageId.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/MessageId.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/MessageId.java
index 2247f1f..59d5c71 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/MessageId.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/MessageId.java
@@ -1,56 +1,56 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.spout;
-
-public class MessageId {
-
-  private final String partitionId;
-  private final String offset;
-  private final long sequenceNumber;
-
-  public MessageId(
-    String partitionId,
-    String offset,
-    long sequenceNumber) {
-    this.partitionId = partitionId;
-    this.offset = offset;
-    this.sequenceNumber = sequenceNumber;
-  }
-
-  public static MessageId create(String partitionId, String offset, long sequenceNumber) {
-    return new MessageId(partitionId, offset, sequenceNumber);
-  }
-
-  public String getPartitionId() {
-    return this.partitionId;
-  }
-
-  public String getOffset() {
-    return this.offset;
-  }
-
-  public Long getSequenceNumber() {
-    return this.sequenceNumber;
-  }
-  
-  @Override
-  public String toString() {
-    return String.format("PartitionId: %s, Offset: %s, SequenceNumber: %s",
-      this.partitionId, this.offset, this.sequenceNumber);
-  }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+public class MessageId {
+
+  private final String partitionId;
+  private final String offset;
+  private final long sequenceNumber;
+
+  public MessageId(
+    String partitionId,
+    String offset,
+    long sequenceNumber) {
+    this.partitionId = partitionId;
+    this.offset = offset;
+    this.sequenceNumber = sequenceNumber;
+  }
+
+  public static MessageId create(String partitionId, String offset, long sequenceNumber) {
+    return new MessageId(partitionId, offset, sequenceNumber);
+  }
+
+  public String getPartitionId() {
+    return this.partitionId;
+  }
+
+  public String getOffset() {
+    return this.offset;
+  }
+
+  public Long getSequenceNumber() {
+    return this.sequenceNumber;
+  }
+  
+  @Override
+  public String toString() {
+    return String.format("PartitionId: %s, Offset: %s, SequenceNumber: %s",
+      this.partitionId, this.offset, this.sequenceNumber);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/ZookeeperStateStore.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/ZookeeperStateStore.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/ZookeeperStateStore.java
index 6af9df6..063aa4d 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/ZookeeperStateStore.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/ZookeeperStateStore.java
@@ -1,95 +1,95 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.spout;
-
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.RetryNTimes;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ZookeeperStateStore implements IStateStore {
-  private static final long serialVersionUID = 1L;
-  private static final Logger logger = LoggerFactory.getLogger(ZookeeperStateStore.class);
-
-  private final String zookeeperConnectionString;
-  private final CuratorFramework curatorFramework;
-  
-  public ZookeeperStateStore(String zookeeperConnectionString) {
-    this(zookeeperConnectionString, 3, 100);
-  }
-
-  public ZookeeperStateStore(String connectionString, int retries, int retryInterval) {
-    if (connectionString == null) {
-      zookeeperConnectionString = "localhost:2181";
-    } else {
-      zookeeperConnectionString = connectionString;
-    }
-
-    RetryPolicy retryPolicy = new RetryNTimes(retries, retryInterval);
-    curatorFramework = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
-  }
-
-  @Override
-  public void open() {
-    curatorFramework.start();
-  }
-
-  @Override
-  public void close() {
-    curatorFramework.close();
-  }
-
-  @Override
-  public void saveData(String statePath, String data) {
-    data = data == null ? "" : data;
-    byte[] bytes = data.getBytes();
-
-    try {
-      if (curatorFramework.checkExists().forPath(statePath) == null) {
-        curatorFramework.create().creatingParentsIfNeeded().forPath(statePath, bytes);
-      } else {
-        curatorFramework.setData().forPath(statePath, bytes);
-      }
-
-      logger.info(String.format("data was saved. path: %s, data: %s.", statePath, data));
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public String readData(String statePath) {
-    try {
-      if (curatorFramework.checkExists().forPath(statePath) == null) {
-        // do we want to throw an exception if path doesn't exist??
-        return null;
-      } else {
-        byte[] bytes = curatorFramework.getData().forPath(statePath);
-        String data = new String(bytes);
-
-        logger.info(String.format("data was retrieved. path: %s, data: %s.", statePath, data));
-
-        return data;
-      }
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZookeeperStateStore implements IStateStore {
+  private static final long serialVersionUID = 1L;
+  private static final Logger logger = LoggerFactory.getLogger(ZookeeperStateStore.class);
+
+  private final String zookeeperConnectionString;
+  private final CuratorFramework curatorFramework;
+  
+  public ZookeeperStateStore(String zookeeperConnectionString) {
+    this(zookeeperConnectionString, 3, 100);
+  }
+
+  public ZookeeperStateStore(String connectionString, int retries, int retryInterval) {
+    if (connectionString == null) {
+      zookeeperConnectionString = "localhost:2181";
+    } else {
+      zookeeperConnectionString = connectionString;
+    }
+
+    RetryPolicy retryPolicy = new RetryNTimes(retries, retryInterval);
+    curatorFramework = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
+  }
+
+  @Override
+  public void open() {
+    curatorFramework.start();
+  }
+
+  @Override
+  public void close() {
+    curatorFramework.close();
+  }
+
+  @Override
+  public void saveData(String statePath, String data) {
+    data = data == null ? "" : data;
+    byte[] bytes = data.getBytes();
+
+    try {
+      if (curatorFramework.checkExists().forPath(statePath) == null) {
+        curatorFramework.create().creatingParentsIfNeeded().forPath(statePath, bytes);
+      } else {
+        curatorFramework.setData().forPath(statePath, bytes);
+      }
+
+      logger.info(String.format("data was saved. path: %s, data: %s.", statePath, data));
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public String readData(String statePath) {
+    try {
+      if (curatorFramework.checkExists().forPath(statePath) == null) {
+        // do we want to throw an exception if path doesn't exist??
+        return null;
+      } else {
+        byte[] bytes = curatorFramework.getData().forPath(statePath);
+        String data = new String(bytes);
+
+        logger.info(String.format("data was retrieved. path: %s, data: %s.", statePath, data));
+
+        return data;
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Coordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Coordinator.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Coordinator.java
index ad3c75e..253b317 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Coordinator.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Coordinator.java
@@ -1,60 +1,60 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.trident;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
-import org.apache.storm.trident.spout.IPartitionedTridentSpout;
-
-import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
-
-public class Coordinator implements IPartitionedTridentSpout.Coordinator<Partitions>,
-    IOpaquePartitionedTridentSpout.Coordinator<Partitions> {
-  private static final Logger logger = LoggerFactory.getLogger(Coordinator.class);
-  private final EventHubSpoutConfig spoutConfig;
-  Partitions partitions;
-  
-  public Coordinator(EventHubSpoutConfig spoutConfig) {
-    this.spoutConfig = spoutConfig;
-  }
-
-  @Override
-  public void close() {
-  }
-
-  @Override
-  public Partitions getPartitionsForBatch() {
-    if(partitions != null) {
-      return partitions;
-    }
-    
-    partitions = new Partitions();
-    for(int i=0; i<spoutConfig.getPartitionCount(); ++i) {
-      partitions.addPartition(new Partition(spoutConfig, Integer.toString(i)));
-    }
-    logger.info("created partitions, size=" + spoutConfig.getPartitionCount());
-    return partitions;
-  }
-
-  @Override
-  public boolean isReady(long txid) {
-    return true;
-  }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
+import org.apache.storm.trident.spout.IPartitionedTridentSpout;
+
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+
+public class Coordinator implements IPartitionedTridentSpout.Coordinator<Partitions>,
+    IOpaquePartitionedTridentSpout.Coordinator<Partitions> {
+  private static final Logger logger = LoggerFactory.getLogger(Coordinator.class);
+  private final EventHubSpoutConfig spoutConfig;
+  Partitions partitions;
+  
+  public Coordinator(EventHubSpoutConfig spoutConfig) {
+    this.spoutConfig = spoutConfig;
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public Partitions getPartitionsForBatch() {
+    if(partitions != null) {
+      return partitions;
+    }
+    
+    partitions = new Partitions();
+    for(int i=0; i<spoutConfig.getPartitionCount(); ++i) {
+      partitions.addPartition(new Partition(spoutConfig, Integer.toString(i)));
+    }
+    logger.info("created partitions, size=" + spoutConfig.getPartitionCount());
+    return partitions;
+  }
+
+  @Override
+  public boolean isReady(long txid) {
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java
index 069d819..d1e8b9e 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java
@@ -1,35 +1,35 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.trident;
-
-import org.apache.storm.eventhubs.spout.EventDataWrap;
-
-import java.util.List;
-
-public interface ITridentPartitionManager {
-  boolean open(String offset);
-  void close();
-  
-  /**
-   * receive a batch of messages from EvenHub up to "count" messages
-   * @param offset the starting offset
-   * @param count max number of messages in this batch
-   * @return list of EventData, if failed to receive, return empty list
-   */
-  public List<EventDataWrap> receiveBatch(String offset, int count);
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import org.apache.storm.eventhubs.spout.EventDataWrap;
+
+import java.util.List;
+
+public interface ITridentPartitionManager {
+  boolean open(String offset);
+  void close();
+  
+  /**
+   * receive a batch of messages from EvenHub up to "count" messages
+   * @param offset the starting offset
+   * @param count max number of messages in this batch
+   * @return list of EventData, if failed to receive, return empty list
+   */
+  public List<EventDataWrap> receiveBatch(String offset, int count);
+}


[3/5] storm git commit: set '* text=auto' in .gitattributes in order to avoid merge work because of line feed changes

Posted by sr...@apache.org.
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManagerFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManagerFactory.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManagerFactory.java
index 5804e28..701bd46 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManagerFactory.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManagerFactory.java
@@ -1,26 +1,26 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.trident;
-
-import java.io.Serializable;
-
-import org.apache.storm.eventhubs.spout.IEventHubReceiver;
-
-public interface ITridentPartitionManagerFactory extends Serializable {
-  ITridentPartitionManager create(IEventHubReceiver receiver);
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import java.io.Serializable;
+
+import org.apache.storm.eventhubs.spout.IEventHubReceiver;
+
+public interface ITridentPartitionManagerFactory extends Serializable {
+  ITridentPartitionManager create(IEventHubReceiver receiver);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java
index 5b6b642..0da421c 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java
@@ -1,68 +1,68 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.trident;
-
-import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
-import org.apache.storm.eventhubs.spout.IEventHubReceiverFactory;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
-import org.apache.storm.trident.topology.TransactionAttempt;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * A thin wrapper of TransactionalTridentEventHubEmitter for OpaqueTridentEventHubSpout
- */
-public class OpaqueTridentEventHubEmitter implements IOpaquePartitionedTridentSpout.Emitter<Partitions, Partition, Map> {
-  private final TransactionalTridentEventHubEmitter transactionalEmitter;
-  public OpaqueTridentEventHubEmitter(EventHubSpoutConfig spoutConfig) {
-    transactionalEmitter = new TransactionalTridentEventHubEmitter(spoutConfig);
-  }
-  
-  public OpaqueTridentEventHubEmitter(EventHubSpoutConfig spoutConfig,
-      int batchSize,
-      ITridentPartitionManagerFactory pmFactory,
-      IEventHubReceiverFactory recvFactory) {
-    transactionalEmitter = new TransactionalTridentEventHubEmitter(spoutConfig,
-        batchSize,
-        pmFactory,
-        recvFactory);
-  }
-
-  @Override
-  public void close() {
-    transactionalEmitter.close();
-  }
-
-  @Override
-  public Map emitPartitionBatch(TransactionAttempt attempt, TridentCollector collector,
-      Partition partition, Map meta) {
-    return transactionalEmitter.emitPartitionBatchNew(attempt, collector, partition, meta);
-  }
-
-  @Override
-  public List<Partition> getOrderedPartitions(Partitions partitions) {
-    return transactionalEmitter.getOrderedPartitions(partitions);
-  }
-
-  @Override
-  public void refreshPartitions(List<Partition> partitionList) {
-    transactionalEmitter.refreshPartitions(partitionList);
-  }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+import org.apache.storm.eventhubs.spout.IEventHubReceiverFactory;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
+import org.apache.storm.trident.topology.TransactionAttempt;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A thin wrapper of TransactionalTridentEventHubEmitter for OpaqueTridentEventHubSpout
+ */
+public class OpaqueTridentEventHubEmitter implements IOpaquePartitionedTridentSpout.Emitter<Partitions, Partition, Map> {
+  private final TransactionalTridentEventHubEmitter transactionalEmitter;
+  public OpaqueTridentEventHubEmitter(EventHubSpoutConfig spoutConfig) {
+    transactionalEmitter = new TransactionalTridentEventHubEmitter(spoutConfig);
+  }
+  
+  public OpaqueTridentEventHubEmitter(EventHubSpoutConfig spoutConfig,
+      int batchSize,
+      ITridentPartitionManagerFactory pmFactory,
+      IEventHubReceiverFactory recvFactory) {
+    transactionalEmitter = new TransactionalTridentEventHubEmitter(spoutConfig,
+        batchSize,
+        pmFactory,
+        recvFactory);
+  }
+
+  @Override
+  public void close() {
+    transactionalEmitter.close();
+  }
+
+  @Override
+  public Map emitPartitionBatch(TransactionAttempt attempt, TridentCollector collector,
+      Partition partition, Map meta) {
+    return transactionalEmitter.emitPartitionBatchNew(attempt, collector, partition, meta);
+  }
+
+  @Override
+  public List<Partition> getOrderedPartitions(Partitions partitions) {
+    return transactionalEmitter.getOrderedPartitions(partitions);
+  }
+
+  @Override
+  public void refreshPartitions(List<Partition> partitionList) {
+    transactionalEmitter.refreshPartitions(partitionList);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubSpout.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubSpout.java
index f50ffb4..7123304 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubSpout.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubSpout.java
@@ -1,64 +1,64 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.trident;
-
-import java.util.Map;
-
-import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
-import org.apache.storm.eventhubs.spout.IEventDataScheme;
-
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
-
-/**
- * Opaque Trident EventHubs Spout
- */
-public class OpaqueTridentEventHubSpout implements IOpaquePartitionedTridentSpout<Partitions, Partition, Map> {
-  private static final long serialVersionUID = 1L;
-  private final IEventDataScheme scheme;
-  private final EventHubSpoutConfig spoutConfig;
-
-  public OpaqueTridentEventHubSpout(EventHubSpoutConfig config) {
-    spoutConfig = config;
-    scheme = spoutConfig.getEventDataScheme();
-  }
-
-  @Override
-  public Map<String, Object> getComponentConfiguration() {
-    return null;
-  }
-
-  @Override
-  public IOpaquePartitionedTridentSpout.Coordinator<Partitions> getCoordinator(
-      Map<String, Object> conf, TopologyContext context) {
-    return new org.apache.storm.eventhubs.trident.Coordinator(spoutConfig);
-  }
-
-  @Override
-  public IOpaquePartitionedTridentSpout.Emitter<Partitions, Partition, Map> getEmitter(
-      Map<String, Object> conf, TopologyContext context) {
-    return new OpaqueTridentEventHubEmitter(spoutConfig);
-  }
-
-  @Override
-  public Fields getOutputFields() {
-    return scheme.getOutputFields();
-  }
-
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import java.util.Map;
+
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+import org.apache.storm.eventhubs.spout.IEventDataScheme;
+
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
+
+/**
+ * Opaque Trident EventHubs Spout
+ */
+public class OpaqueTridentEventHubSpout implements IOpaquePartitionedTridentSpout<Partitions, Partition, Map> {
+  private static final long serialVersionUID = 1L;
+  private final IEventDataScheme scheme;
+  private final EventHubSpoutConfig spoutConfig;
+
+  public OpaqueTridentEventHubSpout(EventHubSpoutConfig config) {
+    spoutConfig = config;
+    scheme = spoutConfig.getEventDataScheme();
+  }
+
+  @Override
+  public Map<String, Object> getComponentConfiguration() {
+    return null;
+  }
+
+  @Override
+  public IOpaquePartitionedTridentSpout.Coordinator<Partitions> getCoordinator(
+      Map<String, Object> conf, TopologyContext context) {
+    return new org.apache.storm.eventhubs.trident.Coordinator(spoutConfig);
+  }
+
+  @Override
+  public IOpaquePartitionedTridentSpout.Emitter<Partitions, Partition, Map> getEmitter(
+      Map<String, Object> conf, TopologyContext context) {
+    return new OpaqueTridentEventHubEmitter(spoutConfig);
+  }
+
+  @Override
+  public Fields getOutputFields() {
+    return scheme.getOutputFields();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partition.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partition.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partition.java
index b726e7f..8857eec 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partition.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partition.java
@@ -1,39 +1,39 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.trident;
-
-import java.io.Serializable;
-import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
-import org.apache.storm.trident.spout.ISpoutPartition;
-
-/**
- * Represents an EventHub partition
- */
-public class Partition implements ISpoutPartition, Serializable {
-  private static final long serialVersionUID = 1L;
-  String partitionId;
-  
-  public Partition(EventHubSpoutConfig config, String partitionId) {
-    this.partitionId = partitionId;
-  }
-  
-  @Override
-  public String getId() {
-    return partitionId;
-  }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import java.io.Serializable;
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+import org.apache.storm.trident.spout.ISpoutPartition;
+
+/**
+ * Represents an EventHub partition
+ */
+public class Partition implements ISpoutPartition, Serializable {
+  private static final long serialVersionUID = 1L;
+  String partitionId;
+  
+  public Partition(EventHubSpoutConfig config, String partitionId) {
+    this.partitionId = partitionId;
+  }
+  
+  @Override
+  public String getId() {
+    return partitionId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partitions.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partitions.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partitions.java
index 235f5b6..c3317d9 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partitions.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partitions.java
@@ -1,41 +1,41 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.trident;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Represents all EventHub partitions a spout is receiving messages from.
- */
-public class Partitions implements Serializable {
-  private static final long serialVersionUID = 1L;
-  private List<Partition> partitionList;
-  public Partitions() {
-    partitionList = new ArrayList<Partition>();
-  }
-  
-  public void addPartition(Partition partition) {
-    partitionList.add(partition);
-  }
-  
-  public List<Partition> getPartitions() {
-    return partitionList;
-  }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Represents all EventHub partitions a spout is receiving messages from.
+ */
+public class Partitions implements Serializable {
+  private static final long serialVersionUID = 1L;
+  private List<Partition> partitionList;
+  public Partitions() {
+    partitionList = new ArrayList<Partition>();
+  }
+  
+  public void addPartition(Partition partition) {
+    partitionList.add(partition);
+  }
+  
+  public List<Partition> getPartitions() {
+    return partitionList;
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubSpout.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubSpout.java
index 4d4de16..ee3242a 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubSpout.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubSpout.java
@@ -1,66 +1,66 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.trident;
-
-import java.util.Map;
-
-import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
-import org.apache.storm.eventhubs.spout.IEventDataScheme;
-
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.trident.spout.IPartitionedTridentSpout;
-import org.apache.storm.eventhubs.trident.Partition;
-
-/**
- * Transactional Trident EventHub Spout
- */
-public class TransactionalTridentEventHubSpout implements 
-  IPartitionedTridentSpout<Partitions, Partition, Map<String, Object>> {
-  private static final long serialVersionUID = 1L;
-  private final IEventDataScheme scheme;
-  private final EventHubSpoutConfig spoutConfig;
-  
-  public TransactionalTridentEventHubSpout(EventHubSpoutConfig config) {
-    spoutConfig = config;
-    scheme = spoutConfig.getEventDataScheme();
-  }
-  
-  @Override
-  public Map<String, Object> getComponentConfiguration() {
-    return null;
-  }
-
-  @Override
-  public IPartitionedTridentSpout.Coordinator<Partitions> getCoordinator(
-      Map<String, Object> conf, TopologyContext context) {
-    return new org.apache.storm.eventhubs.trident.Coordinator(spoutConfig);
-  }
-
-  @Override
-  public IPartitionedTridentSpout.Emitter<Partitions, Partition, Map<String, Object>> getEmitter(
-      Map<String, Object> conf, TopologyContext context) {
-    return new TransactionalTridentEventHubEmitter(spoutConfig);
-  }
-
-  @Override
-  public Fields getOutputFields() {
-    return scheme.getOutputFields();
-  }
-
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import java.util.Map;
+
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+import org.apache.storm.eventhubs.spout.IEventDataScheme;
+
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.trident.spout.IPartitionedTridentSpout;
+import org.apache.storm.eventhubs.trident.Partition;
+
+/**
+ * Transactional Trident EventHub Spout
+ */
+public class TransactionalTridentEventHubSpout implements 
+  IPartitionedTridentSpout<Partitions, Partition, Map<String, Object>> {
+  private static final long serialVersionUID = 1L;
+  private final IEventDataScheme scheme;
+  private final EventHubSpoutConfig spoutConfig;
+  
+  public TransactionalTridentEventHubSpout(EventHubSpoutConfig config) {
+    spoutConfig = config;
+    scheme = spoutConfig.getEventDataScheme();
+  }
+  
+  @Override
+  public Map<String, Object> getComponentConfiguration() {
+    return null;
+  }
+
+  @Override
+  public IPartitionedTridentSpout.Coordinator<Partitions> getCoordinator(
+      Map<String, Object> conf, TopologyContext context) {
+    return new org.apache.storm.eventhubs.trident.Coordinator(spoutConfig);
+  }
+
+  @Override
+  public IPartitionedTridentSpout.Emitter<Partitions, Partition, Map<String, Object>> getEmitter(
+      Map<String, Object> conf, TopologyContext context) {
+    return new TransactionalTridentEventHubEmitter(spoutConfig);
+  }
+
+  @Override
+  public Fields getOutputFields() {
+    return scheme.getOutputFields();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventHubLoop.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
index 665fef9..32dc0d5 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
@@ -1,52 +1,52 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.samples;
-
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.topology.TopologyBuilder;
-
-import org.apache.storm.eventhubs.bolt.EventHubBolt;
-import org.apache.storm.eventhubs.bolt.EventHubBoltConfig;
-import org.apache.storm.eventhubs.spout.EventHubSpout;
-
-/**
- * A sample topology that loops message back to EventHub
- */
-public class EventHubLoop extends EventCount {
-
-  @Override
-  protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
-    TopologyBuilder topologyBuilder = new TopologyBuilder();
-
-    topologyBuilder.setSpout("EventHubsSpout", eventHubSpout, spoutConfig.getPartitionCount())
-      .setNumTasks(spoutConfig.getPartitionCount());
-    EventHubBoltConfig boltConfig = new EventHubBoltConfig(spoutConfig.getConnectionString(),
-        spoutConfig.getEntityPath(), true);
-    
-    EventHubBolt eventHubBolt = new EventHubBolt(boltConfig);
-    int boltTasks = spoutConfig.getPartitionCount();
-    topologyBuilder.setBolt("EventHubsBolt", eventHubBolt, boltTasks)
-      .localOrShuffleGrouping("EventHubsSpout").setNumTasks(boltTasks);
-    return topologyBuilder.createTopology();
-  }
-  
-  public static void main(String[] args) throws Exception {
-    EventHubLoop scenario = new EventHubLoop();
-    scenario.runScenario(args);
-  }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples;
+
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.TopologyBuilder;
+
+import org.apache.storm.eventhubs.bolt.EventHubBolt;
+import org.apache.storm.eventhubs.bolt.EventHubBoltConfig;
+import org.apache.storm.eventhubs.spout.EventHubSpout;
+
+/**
+ * A sample topology that loops message back to EventHub
+ */
+public class EventHubLoop extends EventCount {
+
+  @Override
+  protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
+    TopologyBuilder topologyBuilder = new TopologyBuilder();
+
+    topologyBuilder.setSpout("EventHubsSpout", eventHubSpout, spoutConfig.getPartitionCount())
+      .setNumTasks(spoutConfig.getPartitionCount());
+    EventHubBoltConfig boltConfig = new EventHubBoltConfig(spoutConfig.getConnectionString(),
+        spoutConfig.getEntityPath(), true);
+    
+    EventHubBolt eventHubBolt = new EventHubBolt(boltConfig);
+    int boltTasks = spoutConfig.getPartitionCount();
+    topologyBuilder.setBolt("EventHubsBolt", eventHubBolt, boltTasks)
+      .localOrShuffleGrouping("EventHubsSpout").setNumTasks(boltTasks);
+    return topologyBuilder.createTopology();
+  }
+  
+  public static void main(String[] args) throws Exception {
+    EventHubLoop scenario = new EventHubLoop();
+    scenario.runScenario(args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java
index e8538c1..a433b8b 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java
@@ -1,53 +1,53 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.samples;
-
-import org.apache.storm.trident.TridentState;
-import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.operation.builtin.Count;
-import org.apache.storm.trident.operation.builtin.Sum;
-import org.apache.storm.trident.testing.MemoryMapState;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.tuple.Fields;
-
-import org.apache.storm.eventhubs.samples.TransactionalTridentEventCount.LoggingFilter;
-import org.apache.storm.eventhubs.spout.EventHubSpout;
-import org.apache.storm.eventhubs.trident.OpaqueTridentEventHubSpout;
-
-/**
- * A simple Trident topology uses OpaqueTridentEventHubSpout
- */
-public class OpaqueTridentEventCount extends EventCount {
-  @Override
-  protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
-    TridentTopology topology = new TridentTopology();
-    
-    OpaqueTridentEventHubSpout spout = new OpaqueTridentEventHubSpout(spoutConfig);
-    TridentState state = topology.newStream("stream-" + spoutConfig.getTopologyName(), spout)
-        .parallelismHint(spoutConfig.getPartitionCount())
-        .aggregate(new Count(), new Fields("partial-count"))
-        .persistentAggregate(new MemoryMapState.Factory(), new Fields("partial-count"), new Sum(), new Fields("count"));
-    state.newValuesStream().each(new Fields("count"), new LoggingFilter("got count: ", 10000));
-    return topology.build();
-  }
-  
-  public static void main(String[] args) throws Exception {
-    OpaqueTridentEventCount scenario = new OpaqueTridentEventCount();
-    scenario.runScenario(args);
-  }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples;
+
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.builtin.Count;
+import org.apache.storm.trident.operation.builtin.Sum;
+import org.apache.storm.trident.testing.MemoryMapState;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.tuple.Fields;
+
+import org.apache.storm.eventhubs.samples.TransactionalTridentEventCount.LoggingFilter;
+import org.apache.storm.eventhubs.spout.EventHubSpout;
+import org.apache.storm.eventhubs.trident.OpaqueTridentEventHubSpout;
+
+/**
+ * A simple Trident topology uses OpaqueTridentEventHubSpout
+ */
+public class OpaqueTridentEventCount extends EventCount {
+  @Override
+  protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
+    TridentTopology topology = new TridentTopology();
+    
+    OpaqueTridentEventHubSpout spout = new OpaqueTridentEventHubSpout(spoutConfig);
+    TridentState state = topology.newStream("stream-" + spoutConfig.getTopologyName(), spout)
+        .parallelismHint(spoutConfig.getPartitionCount())
+        .aggregate(new Count(), new Fields("partial-count"))
+        .persistentAggregate(new MemoryMapState.Factory(), new Fields("partial-count"), new Sum(), new Fields("count"));
+    state.newValuesStream().each(new Fields("count"), new LoggingFilter("got count: ", 10000));
+    return topology.build();
+  }
+  
+  public static void main(String[] args) throws Exception {
+    OpaqueTridentEventCount scenario = new OpaqueTridentEventCount();
+    scenario.runScenario(args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java
index 0a5295f..718c229 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java
@@ -1,81 +1,81 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.samples;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.tuple.Fields;
-
-import org.apache.storm.eventhubs.spout.EventHubSpout;
-import org.apache.storm.eventhubs.trident.TransactionalTridentEventHubSpout;
-
-import org.apache.storm.trident.TridentState;
-import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.operation.BaseFilter;
-import org.apache.storm.trident.operation.builtin.Count;
-import org.apache.storm.trident.operation.builtin.Sum;
-import org.apache.storm.trident.testing.MemoryMapState;
-import org.apache.storm.trident.tuple.TridentTuple;
-
-/**
- * A simple Trident topology uses TransactionalTridentEventHubSpout
- */
-public class TransactionalTridentEventCount extends EventCount {
-  public static class LoggingFilter extends BaseFilter {
-    private static final long serialVersionUID = 1L;
-    private static final Logger logger = LoggerFactory.getLogger(LoggingFilter.class);
-    private final String prefix;
-    private final long logIntervalMs;
-    private long lastTime;
-    public LoggingFilter(String prefix, int logIntervalMs) {
-      this.prefix = prefix;
-      this.logIntervalMs = logIntervalMs;
-      lastTime = System.nanoTime();
-    }
-
-    @Override
-    public boolean isKeep(TridentTuple tuple) {
-      long now = System.nanoTime();
-      if(logIntervalMs < (now - lastTime) / 1000000) {
-        logger.info(prefix + tuple.toString());
-        lastTime = now;
-      }
-      return false;
-    }
-  }
-  
-  @Override
-  protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
-    TridentTopology topology = new TridentTopology();
-    
-    TransactionalTridentEventHubSpout spout = new TransactionalTridentEventHubSpout(spoutConfig);
-    TridentState state = topology.newStream("stream-" + spoutConfig.getTopologyName(), spout)
-        .parallelismHint(spoutConfig.getPartitionCount())
-        .aggregate(new Count(), new Fields("partial-count"))
-        .persistentAggregate(new MemoryMapState.Factory(), new Fields("partial-count"), new Sum(), new Fields("count"));
-    state.newValuesStream().each(new Fields("count"), new LoggingFilter("got count: ", 10000));
-    return topology.build();
-  }
-  
-  public static void main(String[] args) throws Exception {
-    TransactionalTridentEventCount scenario = new TransactionalTridentEventCount();
-    scenario.runScenario(args);
-  }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.tuple.Fields;
+
+import org.apache.storm.eventhubs.spout.EventHubSpout;
+import org.apache.storm.eventhubs.trident.TransactionalTridentEventHubSpout;
+
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.BaseFilter;
+import org.apache.storm.trident.operation.builtin.Count;
+import org.apache.storm.trident.operation.builtin.Sum;
+import org.apache.storm.trident.testing.MemoryMapState;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+/**
+ * A simple Trident topology uses TransactionalTridentEventHubSpout
+ */
+public class TransactionalTridentEventCount extends EventCount {
+  public static class LoggingFilter extends BaseFilter {
+    private static final long serialVersionUID = 1L;
+    private static final Logger logger = LoggerFactory.getLogger(LoggingFilter.class);
+    private final String prefix;
+    private final long logIntervalMs;
+    private long lastTime;
+    public LoggingFilter(String prefix, int logIntervalMs) {
+      this.prefix = prefix;
+      this.logIntervalMs = logIntervalMs;
+      lastTime = System.nanoTime();
+    }
+
+    @Override
+    public boolean isKeep(TridentTuple tuple) {
+      long now = System.nanoTime();
+      if(logIntervalMs < (now - lastTime) / 1000000) {
+        logger.info(prefix + tuple.toString());
+        lastTime = now;
+      }
+      return false;
+    }
+  }
+  
+  @Override
+  protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
+    TridentTopology topology = new TridentTopology();
+    
+    TransactionalTridentEventHubSpout spout = new TransactionalTridentEventHubSpout(spoutConfig);
+    TridentState state = topology.newStream("stream-" + spoutConfig.getTopologyName(), spout)
+        .parallelismHint(spoutConfig.getPartitionCount())
+        .aggregate(new Count(), new Fields("partial-count"))
+        .persistentAggregate(new MemoryMapState.Factory(), new Fields("partial-count"), new Sum(), new Fields("count"));
+    state.newValuesStream().each(new Fields("count"), new LoggingFilter("got count: ", 10000));
+    return topology.build();
+  }
+  
+  public static void main(String[] args) throws Exception {
+    TransactionalTridentEventCount scenario = new TransactionalTridentEventCount();
+    scenario.runScenario(args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java
index 4bed2e3..9d94f5f 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java
@@ -1,88 +1,88 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.samples.bolt;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.storm.utils.TupleUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.storm.Config;
-import org.apache.storm.metric.api.IMetric;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseBasicBolt;
-import org.apache.storm.tuple.Tuple;
-
-/**
- * Globally count number of messages
- */
-public class GlobalCountBolt extends BaseBasicBolt {
-  private static final long serialVersionUID = 1L;
-  private static final Logger logger = LoggerFactory
-      .getLogger(GlobalCountBolt.class);
-  private long globalCount;
-  private long globalCountDiff;
-  private long lastMetricsTime;
-  private long throughput;
-  
-  @Override
-  public void prepare(Map<String, Object> config, TopologyContext context) {
-    globalCount = 0;
-    globalCountDiff = 0;
-    lastMetricsTime = System.nanoTime();
-    context.registerMetric("GlobalMessageCount", new IMetric() {
-      @Override
-      public Object getValueAndReset() {
-        long now = System.nanoTime();
-        long millis = (now - lastMetricsTime) / 1000000;
-        throughput = globalCountDiff / millis * 1000;
-        Map<String, Object> values = new HashMap<>();
-        values.put("global_count", globalCount);
-        values.put("throughput", throughput);
-        lastMetricsTime = now;
-        globalCountDiff = 0;
-        return values;
-      }
-  }, (Integer)config.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS));
-  }
-
-  @Override
-  public void execute(Tuple tuple, BasicOutputCollector collector) {
-    if (TupleUtils.isTick(tuple)) {
-      return;
-    }
-
-    int partial = (Integer)tuple.getValueByField("partial_count");
-    globalCount += partial;
-    globalCountDiff += partial;
-    if((globalCountDiff == partial) && (globalCount != globalCountDiff)) {
-      //metrics has just been collected, let's also log it
-      logger.info("Current throughput (messages/second): " + throughput);
-    }
-  }
-
-  @Override
-  public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    
-  }
-
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples.bolt;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.storm.utils.TupleUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.Config;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Tuple;
+
+/**
+ * Globally count number of messages
+ */
+public class GlobalCountBolt extends BaseBasicBolt {
+  private static final long serialVersionUID = 1L;
+  private static final Logger logger = LoggerFactory
+      .getLogger(GlobalCountBolt.class);
+  private long globalCount;
+  private long globalCountDiff;
+  private long lastMetricsTime;
+  private long throughput;
+  
+  @Override
+  public void prepare(Map<String, Object> config, TopologyContext context) {
+    globalCount = 0;
+    globalCountDiff = 0;
+    lastMetricsTime = System.nanoTime();
+    context.registerMetric("GlobalMessageCount", new IMetric() {
+      @Override
+      public Object getValueAndReset() {
+        long now = System.nanoTime();
+        long millis = (now - lastMetricsTime) / 1000000;
+        throughput = globalCountDiff / millis * 1000;
+        Map<String, Object> values = new HashMap<>();
+        values.put("global_count", globalCount);
+        values.put("throughput", throughput);
+        lastMetricsTime = now;
+        globalCountDiff = 0;
+        return values;
+      }
+  }, (Integer)config.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS));
+  }
+
+  @Override
+  public void execute(Tuple tuple, BasicOutputCollector collector) {
+    if (TupleUtils.isTick(tuple)) {
+      return;
+    }
+
+    int partial = (Integer)tuple.getValueByField("partial_count");
+    globalCount += partial;
+    globalCountDiff += partial;
+    if((globalCountDiff == partial) && (globalCount != globalCountDiff)) {
+      //metrics has just been collected, let's also log it
+      logger.info("Current throughput (messages/second): " + throughput);
+    }
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java
index 544b6c8..3763c69 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java
@@ -1,68 +1,68 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.samples.bolt;
-
-import java.util.Map;
-
-import org.apache.storm.utils.TupleUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseBasicBolt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-
-/**
- * Partially count number of messages from EventHubs
- */
-public class PartialCountBolt extends BaseBasicBolt {
-  private static final long serialVersionUID = 1L;
-  private static final Logger logger = LoggerFactory
-      .getLogger(PartialCountBolt.class);
-  private static final int PartialCountBatchSize = 1000; 
-  
-  private int partialCount;
-  
-  @Override
-  public void prepare(Map<String, Object> topoConf, TopologyContext context) {
-    partialCount = 0;
-  }
-  
-  @Override
-  public void execute(Tuple tuple, BasicOutputCollector collector) {
-    if (TupleUtils.isTick(tuple)) {
-      return;
-    }
-
-    partialCount++;
-    if(partialCount == PartialCountBatchSize) {
-      collector.emit(new Values(PartialCountBatchSize));
-      partialCount = 0;
-    }
-  }
-
-  @Override
-  public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    declarer.declare(new Fields("partial_count"));
-  }
-
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples.bolt;
+
+import java.util.Map;
+
+import org.apache.storm.utils.TupleUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+/**
+ * Partially count number of messages from EventHubs
+ */
+public class PartialCountBolt extends BaseBasicBolt {
+  private static final long serialVersionUID = 1L;
+  private static final Logger logger = LoggerFactory
+      .getLogger(PartialCountBolt.class);
+  private static final int PartialCountBatchSize = 1000; 
+  
+  private int partialCount;
+  
+  @Override
+  public void prepare(Map<String, Object> topoConf, TopologyContext context) {
+    partialCount = 0;
+  }
+  
+  @Override
+  public void execute(Tuple tuple, BasicOutputCollector collector) {
+    if (TupleUtils.isTick(tuple)) {
+      return;
+    }
+
+    partialCount++;
+    if(partialCount == PartialCountBatchSize) {
+      collector.emit(new Values(PartialCountBatchSize));
+      partialCount = 0;
+    }
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    declarer.declare(new Fields("partial_count"));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
index ac724e8..621d6a8 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
@@ -1,71 +1,71 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.spout;
-
-import java.util.List;
-
-import org.apache.storm.spout.ISpoutOutputCollector;
-
-/**
- * Mock of ISpoutOutputCollector
- */
-public class SpoutOutputCollectorMock implements ISpoutOutputCollector {
-  //comma separated offsets
-  StringBuilder emittedOffset;
-  
-  public SpoutOutputCollectorMock() {
-    emittedOffset = new StringBuilder();
-  }
-  
-  public String getOffsetSequenceAndReset() {
-    String ret = null;
-    if(emittedOffset.length() > 0) {
-      emittedOffset.setLength(emittedOffset.length()-1);
-      ret = emittedOffset.toString();
-      emittedOffset.setLength(0);
-    }
-    return ret;
-  }
-
-  @Override
-  public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
-    MessageId mid = (MessageId)messageId;
-    String pid = mid.getPartitionId();
-    String offset = mid.getOffset();
-    emittedOffset.append(pid+"_"+offset+",");
-    return null;
-  }
-
-  @Override
-  public void emitDirect(int arg0, String arg1, List<Object> arg2, Object arg3) {
-  }
-
-  @Override
-  public void flush() {
-    // NO-OP
-  }
-
-  @Override
-  public void reportError(Throwable arg0) {
-  }
-
-  @Override
-  public long getPendingCount() {
-    return 0;
-  }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import java.util.List;
+
+import org.apache.storm.spout.ISpoutOutputCollector;
+
+/**
+ * Mock of ISpoutOutputCollector
+ */
+public class SpoutOutputCollectorMock implements ISpoutOutputCollector {
+  //comma separated offsets
+  StringBuilder emittedOffset;
+  
+  public SpoutOutputCollectorMock() {
+    emittedOffset = new StringBuilder();
+  }
+  
+  public String getOffsetSequenceAndReset() {
+    String ret = null;
+    if(emittedOffset.length() > 0) {
+      emittedOffset.setLength(emittedOffset.length()-1);
+      ret = emittedOffset.toString();
+      emittedOffset.setLength(0);
+    }
+    return ret;
+  }
+
+  @Override
+  public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
+    MessageId mid = (MessageId)messageId;
+    String pid = mid.getPartitionId();
+    String offset = mid.getOffset();
+    emittedOffset.append(pid+"_"+offset+",");
+    return null;
+  }
+
+  @Override
+  public void emitDirect(int arg0, String arg1, List<Object> arg2, Object arg3) {
+  }
+
+  @Override
+  public void flush() {
+    // NO-OP
+  }
+
+  @Override
+  public void reportError(Throwable arg0) {
+  }
+
+  @Override
+  public long getPendingCount() {
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/StateStoreMock.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/StateStoreMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/StateStoreMock.java
index cd6e13e..0abe3a4 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/StateStoreMock.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/StateStoreMock.java
@@ -1,54 +1,54 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.spout;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.storm.eventhubs.spout.IStateStore;
-
-/**
- * A state store mocker
- */
-public class StateStoreMock implements IStateStore {
-  Map<String, String> myDataMap;
-  @Override
-  public void open() {
-    myDataMap = new HashMap<String, String>();
-  }
-
-  @Override
-  public void close() {
-    myDataMap = null;
-  }
-
-  @Override
-  public void saveData(String path, String data) {
-    if(myDataMap != null) {
-      myDataMap.put(path, data);
-    }
-  }
-
-  @Override
-  public String readData(String path) {
-    if(myDataMap != null) {
-      return myDataMap.get(path);
-    }
-    return null;
-  }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.storm.eventhubs.spout.IStateStore;
+
+/**
+ * A state store mocker
+ */
+public class StateStoreMock implements IStateStore {
+  Map<String, String> myDataMap;
+  @Override
+  public void open() {
+    myDataMap = new HashMap<String, String>();
+  }
+
+  @Override
+  public void close() {
+    myDataMap = null;
+  }
+
+  @Override
+  public void saveData(String path, String data) {
+    if(myDataMap != null) {
+      myDataMap.put(path, data);
+    }
+  }
+
+  @Override
+  public String readData(String path) {
+    if(myDataMap != null) {
+      return myDataMap.get(path);
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java
index f260dea..926337b 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java
@@ -1,47 +1,47 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.spout;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertTrue;
-
-public class TestEventData {
-
-  @Before
-  public void setUp() throws Exception {
-  }
-
-  @After
-  public void tearDown() throws Exception {
-  }
-
-  @Test
-  public void testEventDataComparision() {
-
-	MessageId messageId1 = MessageId.create(null, "3", 1);
-	EventDataWrap eventData1 = EventDataWrap.create(null, messageId1);
-
-	MessageId messageId2 = MessageId.create(null, "13", 2);
-	EventDataWrap eventData2 = EventDataWrap.create(null, messageId2);
-
-	assertTrue(eventData2.compareTo(eventData1) > 0);
-  }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestEventData {
+
+  @Before
+  public void setUp() throws Exception {
+  }
+
+  @After
+  public void tearDown() throws Exception {
+  }
+
+  @Test
+  public void testEventDataComparision() {
+
+	MessageId messageId1 = MessageId.create(null, "3", 1);
+	EventDataWrap eventData1 = EventDataWrap.create(null, messageId1);
+
+	MessageId messageId2 = MessageId.create(null, "13", 2);
+	EventDataWrap eventData2 = EventDataWrap.create(null, messageId2);
+
+	assertTrue(eventData2.compareTo(eventData1) > 0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java
index c42f769..bd5b07f 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java
@@ -1,57 +1,57 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.trident;
-
-import java.util.List;
-
-import org.apache.storm.trident.operation.TridentCollector;
-
-/**
- * A mock of TridentCollector
- */
-public class TridentCollectorMock implements TridentCollector {
-  StringBuilder buffer;
-  
-  public TridentCollectorMock() {
-    buffer = new StringBuilder();
-  }
-  
-  @Override
-  public void emit(List<Object> tuples) {
-    for(Object o: tuples) {
-      buffer.append(o.toString());
-    }
-  }
-
-  @Override
-  public void flush() {
-    // NO-OP
-  }
-
-  @Override
-  public void reportError(Throwable arg0) {
-  }
-
-  public void clear() {
-    buffer.setLength(0);
-  }
-  
-  public String getBuffer() {
-    return buffer.toString();
-  }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import java.util.List;
+
+import org.apache.storm.trident.operation.TridentCollector;
+
+/**
+ * A mock of TridentCollector
+ */
+public class TridentCollectorMock implements TridentCollector {
+  StringBuilder buffer;
+  
+  public TridentCollectorMock() {
+    buffer = new StringBuilder();
+  }
+  
+  @Override
+  public void emit(List<Object> tuples) {
+    for(Object o: tuples) {
+      buffer.append(o.toString());
+    }
+  }
+
+  @Override
+  public void flush() {
+    // NO-OP
+  }
+
+  @Override
+  public void reportError(Throwable arg0) {
+  }
+
+  public void clear() {
+    buffer.setLength(0);
+  }
+  
+  public String getBuffer() {
+    return buffer.toString();
+  }
+}