You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2018/03/23 17:10:04 UTC
[1/5] storm git commit: set '* text=auto' in .gitattributes in order
to avoid merge work because of line feed changes
Repository: storm
Updated Branches:
refs/heads/master 0fdad2c0f -> adf4efb71
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateUpdater.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateUpdater.java
index 77e00ed..41311b3 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateUpdater.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateUpdater.java
@@ -1,35 +1,35 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.trident.state;
-
-import java.util.List;
-
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.state.BaseStateUpdater;
-import org.apache.storm.trident.tuple.TridentTuple;
-
-public class MongoStateUpdater extends BaseStateUpdater<MongoState> {
-
- @Override
- public void updateState(MongoState state, List<TridentTuple> tuples,
- TridentCollector collector) {
- state.updateState(tuples, collector);
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.trident.state;
+
+import java.util.List;
+
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.BaseStateUpdater;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+public class MongoStateUpdater extends BaseStateUpdater<MongoState> {
+
+ @Override
+ public void updateState(MongoState state, List<TridentTuple> tuples,
+ TridentCollector collector) {
+ state.updateState(tuples, collector);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/integration-test/README.md
----------------------------------------------------------------------
diff --git a/integration-test/README.md b/integration-test/README.md
index 8805f33..1668e66 100644
--- a/integration-test/README.md
+++ b/integration-test/README.md
@@ -1,59 +1,59 @@
-End to end storm integration tests
-==================================
-
-Running tests end-to-end
-------------------------
-Assumption:
-A single version of storm binary zip such as `storm-dist/binary/target/apache-storm-2.0.0-SNAPSHOT.zip` is present
-The following command will bring up a vagrant cluster.
-```sh
-cd integration-test/config
-vagrant up
-```
-This automatically will run `integration-test/run-it.sh`.
-This brings up a vagrant machine, with storm and zookeeper daemons.
-And runs all the tests against it.
-
-Running tests for development & debugging
-=========================================
-```vagrant up``` command is setup as a complete auto-pilot.
-Following describes how we can run individual tests against this vagrant cluster or any other cluster.
-
-Configs for running
--------------------
-The supplied configuration will run tests against vagrant setup. However, it can be changed to use a different cluster.
-Change `integration-test/src/test/resources/storm.yaml` as necessary.
-
-Running all tests manually
---------------------------
-To run all tests:
-```sh
-mvn clean package -DskipTests && mvn test
-```
-
-To run a single test:
-```sh
-mvn clean package -DskipTests && mvn test -Dtest=SlidingWindowCountTest
-```
-
-Running tests from IDE
-----------------------
-You might have to enable intellij profile to make your IDE happy.
-Make sure that the following is run before tests are launched.
-```sh
-mvn package -DskipTests
-```
-
-Running tests with custom storm version
----------------------------------------
-You can supply custom storm version using `-Dstorm.version=<storm-version>` property to all the maven commands.
-```sh
-mvn clean package -DskipTests -Dstorm.version=<storm-version>
-mvn test -Dtest=DemoTest -Dstorm.version=<storm-version>
-```
-
-To find version of the storm that you are running run `storm version` command.
-
-Code
-----
-Start off by looking at file [DemoTest.java](https://github.com/apache/storm/blob/master/integration-test/src/test/java/org/apache/storm/st/DemoTest.java).
+End to end storm integration tests
+==================================
+
+Running tests end-to-end
+------------------------
+Assumption:
+A single version of storm binary zip such as `storm-dist/binary/target/apache-storm-2.0.0-SNAPSHOT.zip` is present
+The following command will bring up a vagrant cluster.
+```sh
+cd integration-test/config
+vagrant up
+```
+This automatically will run `integration-test/run-it.sh`.
+This brings up a vagrant machine, with storm and zookeeper daemons.
+And runs all the tests against it.
+
+Running tests for development & debugging
+=========================================
+```vagrant up``` command is setup as a complete auto-pilot.
+Following describes how we can run individual tests against this vagrant cluster or any other cluster.
+
+Configs for running
+-------------------
+The supplied configuration will run tests against vagrant setup. However, it can be changed to use a different cluster.
+Change `integration-test/src/test/resources/storm.yaml` as necessary.
+
+Running all tests manually
+--------------------------
+To run all tests:
+```sh
+mvn clean package -DskipTests && mvn test
+```
+
+To run a single test:
+```sh
+mvn clean package -DskipTests && mvn test -Dtest=SlidingWindowCountTest
+```
+
+Running tests from IDE
+----------------------
+You might have to enable intellij profile to make your IDE happy.
+Make sure that the following is run before tests are launched.
+```sh
+mvn package -DskipTests
+```
+
+Running tests with custom storm version
+---------------------------------------
+You can supply custom storm version using `-Dstorm.version=<storm-version>` property to all the maven commands.
+```sh
+mvn clean package -DskipTests -Dstorm.version=<storm-version>
+mvn test -Dtest=DemoTest -Dstorm.version=<storm-version>
+```
+
+To find version of the storm that you are running run `storm version` command.
+
+Code
+----
+Start off by looking at file [DemoTest.java](https://github.com/apache/storm/blob/master/integration-test/src/test/java/org/apache/storm/st/DemoTest.java).
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/storm-client/test/jvm/org/apache/storm/trident/TestTridentTopology.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/trident/TestTridentTopology.java b/storm-client/test/jvm/org/apache/storm/trident/TestTridentTopology.java
index 7adc8a0..b0303dc 100644
--- a/storm-client/test/jvm/org/apache/storm/trident/TestTridentTopology.java
+++ b/storm-client/test/jvm/org/apache/storm/trident/TestTridentTopology.java
@@ -1,76 +1,76 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.trident;
-
-import org.apache.storm.generated.Bolt;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-import org.junit.Assert;
-import org.junit.Test;
-import org.apache.storm.trident.operation.builtin.Count;
-import org.apache.storm.trident.operation.builtin.Sum;
-import org.apache.storm.trident.testing.FixedBatchSpout;
-import org.apache.storm.trident.testing.Split;
-import org.apache.storm.trident.testing.StringLength;
-
-import java.util.Map;
-import java.util.Set;
-
-public class TestTridentTopology {
-
- private StormTopology buildTopology() {
- FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
- new Values("the cow jumped over the moon"),
- new Values("the man went to the store and bought some candy"),
- new Values("four score and seven years ago"),
- new Values("how many apples can you eat"));
- spout.setCycle(true);
-
- TridentTopology topology = new TridentTopology();
- topology.newStream("spout", spout)
- //no name
- .each(new Fields("sentence"), new Split(), new Fields("word"))
- .partitionBy(new Fields("word"))
- .name("abc")
- .each(new Fields("word"), new StringLength(), new Fields("length"))
- .partitionBy(new Fields("length"))
- .name("def")
- .aggregate(new Fields("length"), new Count(), new Fields("count"))
- .partitionBy(new Fields("count"))
- .name("ghi")
- .aggregate(new Fields("count"), new Sum(), new Fields("sum"));
- return topology.build();
- }
-
- @Test
- public void testGenBoltId() {
- Set<String> pre = null;
- for (int i = 0; i < 100; i++) {
- StormTopology topology = buildTopology();
- Map<String, Bolt> cur = topology.get_bolts();
- System.out.println(cur.keySet());
- if (pre != null) {
- Assert.assertTrue("bold id not consistent with group name", pre.equals(cur.keySet()));
- }
- pre = cur.keySet();
- }
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.trident;
+
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.storm.trident.operation.builtin.Count;
+import org.apache.storm.trident.operation.builtin.Sum;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.testing.Split;
+import org.apache.storm.trident.testing.StringLength;
+
+import java.util.Map;
+import java.util.Set;
+
+public class TestTridentTopology {
+
+ private StormTopology buildTopology() {
+ FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
+ new Values("the cow jumped over the moon"),
+ new Values("the man went to the store and bought some candy"),
+ new Values("four score and seven years ago"),
+ new Values("how many apples can you eat"));
+ spout.setCycle(true);
+
+ TridentTopology topology = new TridentTopology();
+ topology.newStream("spout", spout)
+ //no name
+ .each(new Fields("sentence"), new Split(), new Fields("word"))
+ .partitionBy(new Fields("word"))
+ .name("abc")
+ .each(new Fields("word"), new StringLength(), new Fields("length"))
+ .partitionBy(new Fields("length"))
+ .name("def")
+ .aggregate(new Fields("length"), new Count(), new Fields("count"))
+ .partitionBy(new Fields("count"))
+ .name("ghi")
+ .aggregate(new Fields("count"), new Sum(), new Fields("sum"));
+ return topology.build();
+ }
+
+ @Test
+ public void testGenBoltId() {
+ Set<String> pre = null;
+ for (int i = 0; i < 100; i++) {
+ StormTopology topology = buildTopology();
+ Map<String, Bolt> cur = topology.get_bolts();
+ System.out.println(cur.keySet());
+ if (pre != null) {
+ Assert.assertTrue("bold id not consistent with group name", pre.equals(cur.keySet()));
+ }
+ pre = cur.keySet();
+ }
+ }
+
+}
[5/5] storm git commit: Merge branch 'gitattributes' of
https://github.com/krichter722/storm into fix-eol-to-auto
Posted by sr...@apache.org.
Merge branch 'gitattributes' of https://github.com/krichter722/storm into fix-eol-to-auto
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/adf4efb7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/adf4efb7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/adf4efb7
Branch: refs/heads/master
Commit: adf4efb715b33abe7107175dc893a9036232a166
Parents: 0fdad2c e9c427c
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Fri Mar 23 18:08:23 2018 +0100
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Fri Mar 23 18:08:23 2018 +0100
----------------------------------------------------------------------
.gitattributes | 11 +-
docs/storm-eventhubs.md | 80 +--
external/storm-eventhubs/README.md | 90 +--
.../eventhubs/bolt/DefaultEventDataFormat.java | 94 +--
.../storm/eventhubs/bolt/EventHubBolt.java | 292 ++++-----
.../eventhubs/bolt/EventHubBoltConfig.java | 222 +++----
.../storm/eventhubs/bolt/IEventDataFormat.java | 56 +-
.../storm/eventhubs/spout/EventDataScheme.java | 156 ++---
.../storm/eventhubs/spout/FieldConstants.java | 56 +-
.../storm/eventhubs/spout/IEventDataScheme.java | 88 +--
.../spout/IEventHubReceiverFactory.java | 60 +-
.../eventhubs/spout/IPartitionCoordinator.java | 54 +-
.../eventhubs/spout/IPartitionManager.java | 74 +--
.../storm/eventhubs/spout/IStateStore.java | 62 +-
.../apache/storm/eventhubs/spout/MessageId.java | 112 ++--
.../eventhubs/spout/ZookeeperStateStore.java | 190 +++---
.../storm/eventhubs/trident/Coordinator.java | 120 ++--
.../trident/ITridentPartitionManager.java | 70 +-
.../ITridentPartitionManagerFactory.java | 52 +-
.../trident/OpaqueTridentEventHubEmitter.java | 136 ++--
.../trident/OpaqueTridentEventHubSpout.java | 128 ++--
.../storm/eventhubs/trident/Partition.java | 78 +--
.../storm/eventhubs/trident/Partitions.java | 82 +--
.../TransactionalTridentEventHubSpout.java | 132 ++--
.../storm/eventhubs/samples/EventHubLoop.java | 104 +--
.../samples/OpaqueTridentEventCount.java | 106 +--
.../samples/TransactionalTridentEventCount.java | 162 ++---
.../eventhubs/samples/bolt/GlobalCountBolt.java | 176 ++---
.../samples/bolt/PartialCountBolt.java | 136 ++--
.../spout/SpoutOutputCollectorMock.java | 142 ++--
.../storm/eventhubs/spout/StateStoreMock.java | 108 +--
.../storm/eventhubs/spout/TestEventData.java | 94 +--
.../eventhubs/trident/TridentCollectorMock.java | 114 ++--
external/storm-mongodb/README.md | 650 +++++++++----------
.../storm/mongodb/bolt/AbstractMongoBolt.java | 124 ++--
.../storm/mongodb/bolt/MongoInsertBolt.java | 248 +++----
.../storm/mongodb/bolt/MongoUpdateBolt.java | 186 +++---
.../storm/mongodb/common/MongoDbClient.java | 218 +++----
.../mongodb/common/QueryFilterCreator.java | 94 +--
.../common/SimpleQueryFilterCreator.java | 94 +--
.../mongodb/common/mapper/MongoMapper.java | 94 +--
.../common/mapper/SimpleMongoMapper.java | 110 ++--
.../common/mapper/SimpleMongoUpdateMapper.java | 92 +--
.../storm/mongodb/trident/state/MongoState.java | 290 ++++-----
.../trident/state/MongoStateFactory.java | 86 +--
.../trident/state/MongoStateUpdater.java | 70 +-
integration-test/README.md | 118 ++--
.../storm/trident/TestTridentTopology.java | 152 ++---
48 files changed, 3136 insertions(+), 3127 deletions(-)
----------------------------------------------------------------------
[2/5] storm git commit: set '* text=auto' in .gitattributes in order
to avoid merge work because of line feed changes
Posted by sr...@apache.org.
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/README.md
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/README.md b/external/storm-mongodb/README.md
index b5b9e96..5c81710 100644
--- a/external/storm-mongodb/README.md
+++ b/external/storm-mongodb/README.md
@@ -1,325 +1,325 @@
-#Storm MongoDB
-
-Storm/Trident integration for [MongoDB](https://www.mongodb.org/). This package includes the core bolts and trident states that allows a storm topology to either insert storm tuples in a database collection or to execute select/update queries against a database collection in a storm topology.
-
-## Insert into Database
-The bolt and trident state included in this package for inserting data into a database collection.
-
-### MongoMapper
-The main API for inserting data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoMapper` interface:
-
-```java
-public interface MongoMapper extends Serializable {
- Document toDocument(ITuple tuple);
- Document toDocumentByKeys(List<Object> keys);
-}
-```
-
-### SimpleMongoMapper
-`storm-mongodb` includes a general purpose `MongoMapper` implementation called `SimpleMongoMapper` that can map Storm tuple to a Database document. `SimpleMongoMapper` assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to.
-
-```java
-public class SimpleMongoMapper implements MongoMapper {
- private String[] fields;
-
- @Override
- public Document toDocument(ITuple tuple) {
- Document document = new Document();
- for(String field : fields){
- document.append(field, tuple.getValueByField(field));
- }
- return document;
- }
-
- @Override
- public Document toDocumentByKeys(List<Object> keys) {
- Document document = new Document();
- document.append("_id", MongoUtils.getID(keys));
- return document;
- }
-
- public SimpleMongoMapper withFields(String... fields) {
- this.fields = fields;
- return this;
- }
-}
-```
-
-### MongoInsertBolt
-To use the `MongoInsertBolt`, you construct an instance of it by specifying url, collectionName and a `MongoMapper` implementation that converts storm tuple to DB document. The following is the standard URI connection scheme:
- `mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]`
-
-More options information(eg: Write Concern Options) about Mongo URI, you can visit https://docs.mongodb.org/manual/reference/connection-string/#connections-connection-options
-
- ```java
-String url = "mongodb://127.0.0.1:27017/test";
-String collectionName = "wordcount";
-
-MongoMapper mapper = new SimpleMongoMapper()
- .withFields("word", "count");
-
-MongoInsertBolt insertBolt = new MongoInsertBolt(url, collectionName, mapper);
- ```
-
-
-## Update from Database
-The bolt included in this package for updating data from a database collection.
-
-### MongoUpdateMapper
-The main API for updating data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoUpdateMapper` interface:
-
-```java
-public interface MongoUpdateMapper extends MongoMapper { }
-```
-
-### SimpleMongoUpdateMapper
-`storm-mongodb` includes a general purpose `MongoUpdateMapper` implementation called `SimpleMongoUpdateMapper` that can map Storm tuple to a Database document. `SimpleMongoUpdateMapper` assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to.
-`SimpleMongoUpdateMapper` uses `$set` operator for setting the value of a field in a document. More information about update operator, you can visit
-https://docs.mongodb.org/manual/reference/operator/update/
-
-```java
-public class SimpleMongoUpdateMapper extends SimpleMongoMapper implements MongoUpdateMapper {
-
- private String[] fields;
-
- @Override
- public Document toDocument(ITuple tuple) {
- Document document = new Document();
- for(String field : fields){
- document.append(field, tuple.getValueByField(field));
- }
- //$set operator: Sets the value of a field in a document.
- return new Document("$set", document);
- }
-
- public SimpleMongoUpdateMapper withFields(String... fields) {
- this.fields = fields;
- return this;
- }
-}
-```
-
-
-### QueryFilterCreator
-The main API for creating a MongoDB query Filter is the `org.apache.storm.mongodb.common.QueryFilterCreator` interface:
-
- ```java
-public interface QueryFilterCreator extends Serializable {
- Bson createFilter(ITuple tuple);
- Bson createFilterByKeys(List<Object> keys);
-}
- ```
-
-### SimpleQueryFilterCreator
-`storm-mongodb` includes a general purpose `QueryFilterCreator` implementation called `SimpleQueryFilterCreator` that can create a MongoDB query Filter by given Tuple. `QueryFilterCreator` uses `$eq` operator for matching values that are equal to a specified value. More information about query operator, you can visit
-https://docs.mongodb.org/manual/reference/operator/query/
-
- ```java
-public class SimpleQueryFilterCreator implements QueryFilterCreator {
-
- private String field;
-
- @Override
- public Bson createFilter(ITuple tuple) {
- return Filters.eq(field, tuple.getValueByField(field));
- }
-
- @Override
- public Bson createFilterByKeys(List<Object> keys) {
- return Filters.eq("_id", MongoUtils.getID(keys));
- }
-
- public SimpleQueryFilterCreator withField(String field) {
- this.field = field;
- return this;
- }
-
-}
- ```
-
-### MongoUpdateBolt
-To use the `MongoUpdateBolt`, you construct an instance of it by specifying Mongo url, collectionName, a `QueryFilterCreator` implementation and a `MongoUpdateMapper` implementation that converts storm tuple to DB document.
-
- ```java
- MongoUpdateMapper mapper = new SimpleMongoUpdateMapper()
- .withFields("word", "count");
-
- QueryFilterCreator updateQueryCreator = new SimpleQueryFilterCreator()
- .withField("word");
-
- MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator, mapper);
-
- //if a new document should be inserted if there are no matches to the query filter
- //updateBolt.withUpsert(true);
-
- //whether find all documents according to the query filter
- //updateBolt.withMany(true);
- ```
-
- Or use a anonymous inner class implementation for `QueryFilterCreator`:
-
- ```java
- MongoUpdateMapper mapper = new SimpleMongoUpdateMapper()
- .withFields("word", "count");
-
- QueryFilterCreator updateQueryCreator = new QueryFilterCreator() {
- @Override
- public Bson createFilter(ITuple tuple) {
- return Filters.gt("count", 3);
- }
- };
-
- MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator, mapper);
-
- //if a new document should be inserted if there are no matches to the query filter
- //updateBolt.withUpsert(true);
- ```
-
-
-## Lookup from Database
-The bolt included in this package for selecting data from a database collection.
-
-### MongoLookupMapper
-The main API for selecting data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoLookupMapper` interface:
-
-```java
-public interface MongoLookupMapper extends Serializable {
-
- List<Values> toTuple(ITuple input, Document doc);
-
- void declareOutputFields(OutputFieldsDeclarer declarer);
-}
-```
-
-### SimpleMongoLookupMapper
-`storm-mongodb` includes a general purpose `MongoLookupMapper` implementation called `SimpleMongoLookupMapper` that can converts a Mongo document to a list of storm values.
-
-```java
-public class SimpleMongoLookupMapper implements MongoLookupMapper {
-
- private String[] fields;
-
- @Override
- public List<Values> toTuple(ITuple input, Document doc) {
- Values values = new Values();
-
- for(String field : fields) {
- if(input.contains(field)) {
- values.add(input.getValueByField(field));
- } else {
- values.add(doc.get(field));
- }
- }
- List<Values> result = new ArrayList<Values>();
- result.add(values);
- return result;
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields(fields));
- }
-
- public SimpleMongoLookupMapper withFields(String... fields) {
- this.fields = fields;
- return this;
- }
-
-}
-```
-
-### MongoLookupBolt
-To use the `MongoLookupBolt`, you construct an instance of it by specifying Mongo url, collectionName, a `QueryFilterCreator` implementation and a `MongoLookupMapper` implementation that converts a Mongo document to a list of storm values.
-
- ```java
- MongoLookupMapper mapper = new SimpleMongoLookupMapper()
- .withFields("word", "count");
-
- QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
- .withField("word");
-
- MongoLookupBolt lookupBolt = new MongoLookupBolt(url, collectionName, filterCreator, mapper);
- ```
-
-## Mongo Trident State&MapState
-### Trident State
-We support trident persistent state that can be used with trident topologies. To create a Mongo persistent trident state you need to initialize it with the url, collectionName, the `MongoMapper` instance. See the example below:
-
- ```java
- MongoMapper mapper = new SimpleMongoMapper()
- .withFields("word", "count");
-
- MongoState.Options options = new MongoState.Options()
- .withUrl(url)
- .withCollectionName(collectionName)
- .withMapper(mapper);
-
- StateFactory factory = new MongoStateFactory(options);
-
- TridentTopology topology = new TridentTopology();
- Stream stream = topology.newStream("spout1", spout);
-
- stream.partitionPersist(factory, fields,
- new MongoStateUpdater(), new Fields());
-
- TridentState state = topology.newStaticState(factory);
- stream = stream.stateQuery(state, new Fields("word"),
- new MongoStateQuery(), new Fields("columnName", "columnValue"));
- stream.each(new Fields("word", "columnValue"), new PrintFunction(), new Fields());
- ```
- **NOTE**:
- >If there is no unique index provided, trident state inserts in the case of failures may result in duplicate documents.
-
-### Trident MapState
-We also support trident `MapState`. To create a Mongo trident `MapState` you need to initialize it with the url, collectionName, the `MongoMapper` and `QueryFilterCreator` instance. See the example below:
-
- ```java
- MongoMapper mapper = new SimpleMongoMapper()
- .withFields("word", "count");
-
- QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
- .withField("word");
-
- MongoMapState.Options options = new MongoMapState.Options();
- options.url = url;
- options.collectionName = collectionName;
- options.mapper = mapper;
- options.queryCreator = filterCreator;
-
- StateFactory factory = MongoMapState.transactional(options);
-
- TridentTopology topology = new TridentTopology();
- Stream stream = topology.newStream("spout1", spout);
-
- TridentState state = stream.groupBy(new Fields("word"))
- .persistentAggregate(factory, new Fields("count"), new Sum(), new Fields("sum"));
-
- stream.stateQuery(state, new Fields("word"), new MapGet(), new Fields("sum"))
- .each(new Fields("word", "sum"), new PrintFunction(), new Fields());
- ```
-
-
-## License
-
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-
-## Committer Sponsors
-
- * Sriharsha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org))
- * Xin Wang ([xinwang@apache.org](mailto:xinwang@apache.org))
-
+#Storm MongoDB
+
+Storm/Trident integration for [MongoDB](https://www.mongodb.org/). This package includes the core bolts and trident states that allows a storm topology to either insert storm tuples in a database collection or to execute select/update queries against a database collection in a storm topology.
+
+## Insert into Database
+The bolt and trident state included in this package for inserting data into a database collection.
+
+### MongoMapper
+The main API for inserting data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoMapper` interface:
+
+```java
+public interface MongoMapper extends Serializable {
+ Document toDocument(ITuple tuple);
+ Document toDocumentByKeys(List<Object> keys);
+}
+```
+
+### SimpleMongoMapper
+`storm-mongodb` includes a general purpose `MongoMapper` implementation called `SimpleMongoMapper` that can map Storm tuple to a Database document. `SimpleMongoMapper` assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to.
+
+```java
+public class SimpleMongoMapper implements MongoMapper {
+ private String[] fields;
+
+ @Override
+ public Document toDocument(ITuple tuple) {
+ Document document = new Document();
+ for(String field : fields){
+ document.append(field, tuple.getValueByField(field));
+ }
+ return document;
+ }
+
+ @Override
+ public Document toDocumentByKeys(List<Object> keys) {
+ Document document = new Document();
+ document.append("_id", MongoUtils.getID(keys));
+ return document;
+ }
+
+ public SimpleMongoMapper withFields(String... fields) {
+ this.fields = fields;
+ return this;
+ }
+}
+```
+
+### MongoInsertBolt
+To use the `MongoInsertBolt`, you construct an instance of it by specifying url, collectionName and a `MongoMapper` implementation that converts storm tuple to DB document. The following is the standard URI connection scheme:
+ `mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]`
+
+More options information(eg: Write Concern Options) about Mongo URI, you can visit https://docs.mongodb.org/manual/reference/connection-string/#connections-connection-options
+
+ ```java
+String url = "mongodb://127.0.0.1:27017/test";
+String collectionName = "wordcount";
+
+MongoMapper mapper = new SimpleMongoMapper()
+ .withFields("word", "count");
+
+MongoInsertBolt insertBolt = new MongoInsertBolt(url, collectionName, mapper);
+ ```
+
+
+## Update from Database
+The bolt included in this package for updating data from a database collection.
+
+### MongoUpdateMapper
+The main API for updating data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoUpdateMapper` interface:
+
+```java
+public interface MongoUpdateMapper extends MongoMapper { }
+```
+
+### SimpleMongoUpdateMapper
+`storm-mongodb` includes a general purpose `MongoUpdateMapper` implementation called `SimpleMongoUpdateMapper` that can map Storm tuple to a Database document. `SimpleMongoUpdateMapper` assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to.
+`SimpleMongoUpdateMapper` uses `$set` operator for setting the value of a field in a document. More information about update operator, you can visit
+https://docs.mongodb.org/manual/reference/operator/update/
+
+```java
+public class SimpleMongoUpdateMapper extends SimpleMongoMapper implements MongoUpdateMapper {
+
+ private String[] fields;
+
+ @Override
+ public Document toDocument(ITuple tuple) {
+ Document document = new Document();
+ for(String field : fields){
+ document.append(field, tuple.getValueByField(field));
+ }
+ //$set operator: Sets the value of a field in a document.
+ return new Document("$set", document);
+ }
+
+ public SimpleMongoUpdateMapper withFields(String... fields) {
+ this.fields = fields;
+ return this;
+ }
+}
+```
+
+
+### QueryFilterCreator
+The main API for creating a MongoDB query Filter is the `org.apache.storm.mongodb.common.QueryFilterCreator` interface:
+
+ ```java
+public interface QueryFilterCreator extends Serializable {
+ Bson createFilter(ITuple tuple);
+ Bson createFilterByKeys(List<Object> keys);
+}
+ ```
+
+### SimpleQueryFilterCreator
+`storm-mongodb` includes a general purpose `QueryFilterCreator` implementation called `SimpleQueryFilterCreator` that can create a MongoDB query Filter by given Tuple. `QueryFilterCreator` uses `$eq` operator for matching values that are equal to a specified value. More information about query operator, you can visit
+https://docs.mongodb.org/manual/reference/operator/query/
+
+ ```java
+public class SimpleQueryFilterCreator implements QueryFilterCreator {
+
+ private String field;
+
+ @Override
+ public Bson createFilter(ITuple tuple) {
+ return Filters.eq(field, tuple.getValueByField(field));
+ }
+
+ @Override
+ public Bson createFilterByKeys(List<Object> keys) {
+ return Filters.eq("_id", MongoUtils.getID(keys));
+ }
+
+ public SimpleQueryFilterCreator withField(String field) {
+ this.field = field;
+ return this;
+ }
+
+}
+ ```
+
+### MongoUpdateBolt
+To use the `MongoUpdateBolt`, you construct an instance of it by specifying Mongo url, collectionName, a `QueryFilterCreator` implementation and a `MongoUpdateMapper` implementation that converts storm tuple to DB document.
+
+ ```java
+ MongoUpdateMapper mapper = new SimpleMongoUpdateMapper()
+ .withFields("word", "count");
+
+ QueryFilterCreator updateQueryCreator = new SimpleQueryFilterCreator()
+ .withField("word");
+
+ MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator, mapper);
+
+ //if a new document should be inserted if there are no matches to the query filter
+ //updateBolt.withUpsert(true);
+
+ //whether find all documents according to the query filter
+ //updateBolt.withMany(true);
+ ```
+
+ Or use a anonymous inner class implementation for `QueryFilterCreator`:
+
+ ```java
+ MongoUpdateMapper mapper = new SimpleMongoUpdateMapper()
+ .withFields("word", "count");
+
+ QueryFilterCreator updateQueryCreator = new QueryFilterCreator() {
+ @Override
+ public Bson createFilter(ITuple tuple) {
+ return Filters.gt("count", 3);
+ }
+ };
+
+ MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator, mapper);
+
+ //if a new document should be inserted if there are no matches to the query filter
+ //updateBolt.withUpsert(true);
+ ```
+
+
+## Lookup from Database
+The bolt included in this package for selecting data from a database collection.
+
+### MongoLookupMapper
+The main API for selecting data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoLookupMapper` interface:
+
+```java
+public interface MongoLookupMapper extends Serializable {
+
+ List<Values> toTuple(ITuple input, Document doc);
+
+ void declareOutputFields(OutputFieldsDeclarer declarer);
+}
+```
+
+### SimpleMongoLookupMapper
+`storm-mongodb` includes a general purpose `MongoLookupMapper` implementation called `SimpleMongoLookupMapper` that can converts a Mongo document to a list of storm values.
+
+```java
+public class SimpleMongoLookupMapper implements MongoLookupMapper {
+
+ private String[] fields;
+
+ @Override
+ public List<Values> toTuple(ITuple input, Document doc) {
+ Values values = new Values();
+
+ for(String field : fields) {
+ if(input.contains(field)) {
+ values.add(input.getValueByField(field));
+ } else {
+ values.add(doc.get(field));
+ }
+ }
+ List<Values> result = new ArrayList<Values>();
+ result.add(values);
+ return result;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields(fields));
+ }
+
+ public SimpleMongoLookupMapper withFields(String... fields) {
+ this.fields = fields;
+ return this;
+ }
+
+}
+```
+
+### MongoLookupBolt
+To use the `MongoLookupBolt`, you construct an instance of it by specifying Mongo url, collectionName, a `QueryFilterCreator` implementation and a `MongoLookupMapper` implementation that converts a Mongo document to a list of storm values.
+
+ ```java
+ MongoLookupMapper mapper = new SimpleMongoLookupMapper()
+ .withFields("word", "count");
+
+ QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
+ .withField("word");
+
+ MongoLookupBolt lookupBolt = new MongoLookupBolt(url, collectionName, filterCreator, mapper);
+ ```
+
+## Mongo Trident State&MapState
+### Trident State
+We support trident persistent state that can be used with trident topologies. To create a Mongo persistent trident state you need to initialize it with the url, collectionName, the `MongoMapper` instance. See the example below:
+
+ ```java
+ MongoMapper mapper = new SimpleMongoMapper()
+ .withFields("word", "count");
+
+ MongoState.Options options = new MongoState.Options()
+ .withUrl(url)
+ .withCollectionName(collectionName)
+ .withMapper(mapper);
+
+ StateFactory factory = new MongoStateFactory(options);
+
+ TridentTopology topology = new TridentTopology();
+ Stream stream = topology.newStream("spout1", spout);
+
+ stream.partitionPersist(factory, fields,
+ new MongoStateUpdater(), new Fields());
+
+ TridentState state = topology.newStaticState(factory);
+ stream = stream.stateQuery(state, new Fields("word"),
+ new MongoStateQuery(), new Fields("columnName", "columnValue"));
+ stream.each(new Fields("word", "columnValue"), new PrintFunction(), new Fields());
+ ```
+ **NOTE**:
+ >If there is no unique index provided, trident state inserts in the case of failures may result in duplicate documents.
+
+### Trident MapState
+We also support trident `MapState`. To create a Mongo trident `MapState` you need to initialize it with the url, collectionName, the `MongoMapper` and `QueryFilterCreator` instance. See the example below:
+
+ ```java
+ MongoMapper mapper = new SimpleMongoMapper()
+ .withFields("word", "count");
+
+ QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
+ .withField("word");
+
+ MongoMapState.Options options = new MongoMapState.Options();
+ options.url = url;
+ options.collectionName = collectionName;
+ options.mapper = mapper;
+ options.queryCreator = filterCreator;
+
+ StateFactory factory = MongoMapState.transactional(options);
+
+ TridentTopology topology = new TridentTopology();
+ Stream stream = topology.newStream("spout1", spout);
+
+ TridentState state = stream.groupBy(new Fields("word"))
+ .persistentAggregate(factory, new Fields("count"), new Sum(), new Fields("sum"));
+
+ stream.stateQuery(state, new Fields("word"), new MapGet(), new Fields("sum"))
+ .each(new Fields("word", "sum"), new PrintFunction(), new Fields());
+ ```
+
+
+## License
+
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+
+## Committer Sponsors
+
+ * Sriharsha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org))
+ * Xin Wang ([xinwang@apache.org](mailto:xinwang@apache.org))
+
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/AbstractMongoBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/AbstractMongoBolt.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/AbstractMongoBolt.java
index ee66811..13049de 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/AbstractMongoBolt.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/AbstractMongoBolt.java
@@ -1,62 +1,62 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.bolt;
-
-import java.util.Map;
-
-import org.apache.commons.lang.Validate;
-import org.apache.storm.mongodb.common.MongoDbClient;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.base.BaseRichBolt;
-
-public abstract class AbstractMongoBolt extends BaseRichBolt {
-
- private String url;
- private String collectionName;
-
- protected OutputCollector collector;
- protected MongoDbClient mongoClient;
-
- /**
- * AbstractMongoBolt Constructor.
- * @param url The MongoDB server url
- * @param collectionName The collection where reading/writing data
- */
- public AbstractMongoBolt(String url, String collectionName) {
- Validate.notEmpty(url, "url can not be blank or null");
- Validate.notEmpty(collectionName, "collectionName can not be blank or null");
-
- this.url = url;
- this.collectionName = collectionName;
- }
-
- @Override
- public void prepare(Map<String, Object> topoConf, TopologyContext context,
- OutputCollector collector) {
- this.collector = collector;
- this.mongoClient = new MongoDbClient(url, collectionName);
- }
-
- @Override
- public void cleanup() {
- this.mongoClient.close();
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.bolt;
+
+import java.util.Map;
+
+import org.apache.commons.lang.Validate;
+import org.apache.storm.mongodb.common.MongoDbClient;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.base.BaseRichBolt;
+
+public abstract class AbstractMongoBolt extends BaseRichBolt {
+
+ private String url;
+ private String collectionName;
+
+ protected OutputCollector collector;
+ protected MongoDbClient mongoClient;
+
+ /**
+ * AbstractMongoBolt Constructor.
+ * @param url The MongoDB server url
+ * @param collectionName The collection where reading/writing data
+ */
+ public AbstractMongoBolt(String url, String collectionName) {
+ Validate.notEmpty(url, "url can not be blank or null");
+ Validate.notEmpty(collectionName, "collectionName can not be blank or null");
+
+ this.url = url;
+ this.collectionName = collectionName;
+ }
+
+ @Override
+ public void prepare(Map<String, Object> topoConf, TopologyContext context,
+ OutputCollector collector) {
+ this.collector = collector;
+ this.mongoClient = new MongoDbClient(url, collectionName);
+ }
+
+ @Override
+ public void cleanup() {
+ this.mongoClient.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java
index afd1142..5d233da 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java
@@ -1,124 +1,124 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.bolt;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang.Validate;
-import org.apache.storm.mongodb.common.mapper.MongoMapper;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.BatchHelper;
-import org.apache.storm.utils.TupleUtils;
-import org.bson.Document;
-
-/**
- * Basic bolt for writing to MongoDB.
- * Note: Each MongoInsertBolt defined in a topology is tied to a specific collection.
- */
-public class MongoInsertBolt extends AbstractMongoBolt {
-
- private static final int DEFAULT_FLUSH_INTERVAL_SECS = 1;
-
- private MongoMapper mapper;
-
- private boolean ordered = true; //default is ordered.
-
- private int batchSize;
-
- private BatchHelper batchHelper;
-
- private int flushIntervalSecs = DEFAULT_FLUSH_INTERVAL_SECS;
-
- /**
- * MongoInsertBolt Constructor.
- * @param url The MongoDB server url
- * @param collectionName The collection where reading/writing data
- * @param mapper MongoMapper converting tuple to an MongoDB document
- */
- public MongoInsertBolt(String url, String collectionName, MongoMapper mapper) {
- super(url, collectionName);
-
- Validate.notNull(mapper, "MongoMapper can not be null");
-
- this.mapper = mapper;
- }
-
- @Override
- public void execute(Tuple tuple) {
- try {
- if (batchHelper.shouldHandle(tuple)) {
- batchHelper.addBatch(tuple);
- }
-
- if (batchHelper.shouldFlush()) {
- flushTuples();
- batchHelper.ack();
- }
- } catch (Exception e) {
- batchHelper.fail(e);
- }
- }
-
- private void flushTuples() {
- List<Document> docs = new LinkedList<>();
- for (Tuple t : batchHelper.getBatchTuples()) {
- Document doc = mapper.toDocument(t);
- docs.add(doc);
- }
- mongoClient.insert(docs, ordered);
- }
-
- public MongoInsertBolt withBatchSize(int batchSize) {
- this.batchSize = batchSize;
- return this;
- }
-
- public MongoInsertBolt withOrdered(boolean ordered) {
- this.ordered = ordered;
- return this;
- }
-
- public MongoInsertBolt withFlushIntervalSecs(int flushIntervalSecs) {
- this.flushIntervalSecs = flushIntervalSecs;
- return this;
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return TupleUtils.putTickFrequencyIntoComponentConfig(super.getComponentConfiguration(), flushIntervalSecs);
- }
-
- @Override
- public void prepare(Map<String, Object> topoConf, TopologyContext context,
- OutputCollector collector) {
- super.prepare(topoConf, context, collector);
- this.batchHelper = new BatchHelper(batchSize, collector);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.bolt;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.Validate;
+import org.apache.storm.mongodb.common.mapper.MongoMapper;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.BatchHelper;
+import org.apache.storm.utils.TupleUtils;
+import org.bson.Document;
+
+/**
+ * Basic bolt for writing to MongoDB.
+ * Note: Each MongoInsertBolt defined in a topology is tied to a specific collection.
+ */
+public class MongoInsertBolt extends AbstractMongoBolt {
+
+ private static final int DEFAULT_FLUSH_INTERVAL_SECS = 1;
+
+ private MongoMapper mapper;
+
+ private boolean ordered = true; //default is ordered.
+
+ private int batchSize;
+
+ private BatchHelper batchHelper;
+
+ private int flushIntervalSecs = DEFAULT_FLUSH_INTERVAL_SECS;
+
+ /**
+ * MongoInsertBolt Constructor.
+ * @param url The MongoDB server url
+ * @param collectionName The collection where reading/writing data
+ * @param mapper MongoMapper converting tuple to an MongoDB document
+ */
+ public MongoInsertBolt(String url, String collectionName, MongoMapper mapper) {
+ super(url, collectionName);
+
+ Validate.notNull(mapper, "MongoMapper can not be null");
+
+ this.mapper = mapper;
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ try {
+ if (batchHelper.shouldHandle(tuple)) {
+ batchHelper.addBatch(tuple);
+ }
+
+ if (batchHelper.shouldFlush()) {
+ flushTuples();
+ batchHelper.ack();
+ }
+ } catch (Exception e) {
+ batchHelper.fail(e);
+ }
+ }
+
+ private void flushTuples() {
+ List<Document> docs = new LinkedList<>();
+ for (Tuple t : batchHelper.getBatchTuples()) {
+ Document doc = mapper.toDocument(t);
+ docs.add(doc);
+ }
+ mongoClient.insert(docs, ordered);
+ }
+
+ public MongoInsertBolt withBatchSize(int batchSize) {
+ this.batchSize = batchSize;
+ return this;
+ }
+
+ public MongoInsertBolt withOrdered(boolean ordered) {
+ this.ordered = ordered;
+ return this;
+ }
+
+ public MongoInsertBolt withFlushIntervalSecs(int flushIntervalSecs) {
+ this.flushIntervalSecs = flushIntervalSecs;
+ return this;
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return TupleUtils.putTickFrequencyIntoComponentConfig(super.getComponentConfiguration(), flushIntervalSecs);
+ }
+
+ @Override
+ public void prepare(Map<String, Object> topoConf, TopologyContext context,
+ OutputCollector collector) {
+ super.prepare(topoConf, context, collector);
+ this.batchHelper = new BatchHelper(batchSize, collector);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
index 5579b8e..4f92b32 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
@@ -1,93 +1,93 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.bolt;
-
-import org.apache.commons.lang.Validate;
-import org.apache.storm.mongodb.common.QueryFilterCreator;
-import org.apache.storm.mongodb.common.mapper.MongoUpdateMapper;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.TupleUtils;
-import org.bson.Document;
-import org.bson.conversions.Bson;
-
-/**
- * Basic bolt for updating from MongoDB.
- * Note: Each MongoUpdateBolt defined in a topology is tied to a specific collection.
- */
-public class MongoUpdateBolt extends AbstractMongoBolt {
-
- private QueryFilterCreator queryCreator;
- private MongoUpdateMapper mapper;
-
- private boolean upsert; //the default is false.
- private boolean many; //the default is false.
-
- /**
- * MongoUpdateBolt Constructor.
- * @param url The MongoDB server url
- * @param collectionName The collection where reading/writing data
- * @param queryCreator QueryFilterCreator
- * @param mapper MongoMapper converting tuple to an MongoDB document
- */
- public MongoUpdateBolt(String url, String collectionName, QueryFilterCreator queryCreator, MongoUpdateMapper mapper) {
- super(url, collectionName);
-
- Validate.notNull(queryCreator, "QueryFilterCreator can not be null");
- Validate.notNull(mapper, "MongoUpdateMapper can not be null");
-
- this.queryCreator = queryCreator;
- this.mapper = mapper;
- }
-
- @Override
- public void execute(Tuple tuple) {
- if (TupleUtils.isTick(tuple)) {
- return;
- }
-
- try {
- //get document
- Document doc = mapper.toDocument(tuple);
- //get query filter
- Bson filter = queryCreator.createFilter(tuple);
- mongoClient.update(filter, doc, upsert, many);
- this.collector.ack(tuple);
- } catch (Exception e) {
- this.collector.reportError(e);
- this.collector.fail(tuple);
- }
- }
-
- public MongoUpdateBolt withUpsert(boolean upsert) {
- this.upsert = upsert;
- return this;
- }
-
- public MongoUpdateBolt withMany(boolean many) {
- this.many = many;
- return this;
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.bolt;
+
+import org.apache.commons.lang.Validate;
+import org.apache.storm.mongodb.common.QueryFilterCreator;
+import org.apache.storm.mongodb.common.mapper.MongoUpdateMapper;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+/**
+ * Basic bolt for updating from MongoDB.
+ * Note: Each MongoUpdateBolt defined in a topology is tied to a specific collection.
+ */
+public class MongoUpdateBolt extends AbstractMongoBolt {
+
+ private QueryFilterCreator queryCreator;
+ private MongoUpdateMapper mapper;
+
+ private boolean upsert; //the default is false.
+ private boolean many; //the default is false.
+
+ /**
+ * MongoUpdateBolt Constructor.
+ * @param url The MongoDB server url
+ * @param collectionName The collection where reading/writing data
+ * @param queryCreator QueryFilterCreator
+ * @param mapper MongoMapper converting tuple to an MongoDB document
+ */
+ public MongoUpdateBolt(String url, String collectionName, QueryFilterCreator queryCreator, MongoUpdateMapper mapper) {
+ super(url, collectionName);
+
+ Validate.notNull(queryCreator, "QueryFilterCreator can not be null");
+ Validate.notNull(mapper, "MongoUpdateMapper can not be null");
+
+ this.queryCreator = queryCreator;
+ this.mapper = mapper;
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ if (TupleUtils.isTick(tuple)) {
+ return;
+ }
+
+ try {
+ //get document
+ Document doc = mapper.toDocument(tuple);
+ //get query filter
+ Bson filter = queryCreator.createFilter(tuple);
+ mongoClient.update(filter, doc, upsert, many);
+ this.collector.ack(tuple);
+ } catch (Exception e) {
+ this.collector.reportError(e);
+ this.collector.fail(tuple);
+ }
+ }
+
+ public MongoUpdateBolt withUpsert(boolean upsert) {
+ this.upsert = upsert;
+ return this;
+ }
+
+ public MongoUpdateBolt withMany(boolean many) {
+ this.many = many;
+ return this;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDbClient.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDbClient.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDbClient.java
index c264f03..52ed237 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDbClient.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDbClient.java
@@ -1,109 +1,109 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.common;
-
-import com.mongodb.MongoClient;
-import com.mongodb.MongoClientURI;
-import com.mongodb.client.MongoCollection;
-import com.mongodb.client.MongoDatabase;
-import com.mongodb.client.model.InsertManyOptions;
-import com.mongodb.client.model.UpdateOptions;
-
-import java.util.List;
-
-import org.bson.Document;
-import org.bson.conversions.Bson;
-
-public class MongoDbClient {
-
- private MongoClient client;
- private MongoCollection<Document> collection;
-
- /**
- * The MongoDbClient constructor.
- * @param url The Mongo server url
- * @param collectionName The Mongo collection to read/write data
- */
- public MongoDbClient(String url, String collectionName) {
- //Creates a MongoURI from the given string.
- MongoClientURI uri = new MongoClientURI(url);
- //Creates a MongoClient described by a URI.
- this.client = new MongoClient(uri);
- //Gets a Database.
- MongoDatabase db = client.getDatabase(uri.getDatabase());
- //Gets a collection.
- this.collection = db.getCollection(collectionName);
- }
-
- /**
- * Inserts one or more documents.
- * This method is equivalent to a call to the bulkWrite method.
- * The documents will be inserted in the order provided,
- * stopping on the first failed insertion.
- *
- * @param documents documents
- */
- public void insert(List<Document> documents, boolean ordered) {
- InsertManyOptions options = new InsertManyOptions();
- if (!ordered) {
- options.ordered(false);
- }
- collection.insertMany(documents, options);
- }
-
- /**
- * Update a single or all documents in the collection according to the specified arguments.
- * When upsert set to true, the new document will be inserted if there are no matches to the query filter.
- *
- * @param filter Bson filter
- * @param document Bson document
- * @param upsert a new document should be inserted if there are no matches to the query filter
- * @param many whether find all documents according to the query filter
- */
- public void update(Bson filter, Bson document, boolean upsert, boolean many) {
- //TODO batch updating
- UpdateOptions options = new UpdateOptions();
- if (upsert) {
- options.upsert(true);
- }
- if (many) {
- collection.updateMany(filter, document, options);
- } else {
- collection.updateOne(filter, document, options);
- }
- }
-
- /**
- * Finds a single document in the collection according to the specified arguments.
- *
- * @param filter Bson filter
- */
- public Document find(Bson filter) {
- //TODO batch finding
- return collection.find(filter).first();
- }
-
- /**
- * Closes all resources associated with this instance.
- */
- public void close() {
- client.close();
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.common;
+
+import com.mongodb.MongoClient;
+import com.mongodb.MongoClientURI;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.model.InsertManyOptions;
+import com.mongodb.client.model.UpdateOptions;
+
+import java.util.List;
+
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+public class MongoDbClient {
+
+ private MongoClient client;
+ private MongoCollection<Document> collection;
+
+ /**
+ * The MongoDbClient constructor.
+ * @param url The Mongo server url
+ * @param collectionName The Mongo collection to read/write data
+ */
+ public MongoDbClient(String url, String collectionName) {
+ //Creates a MongoURI from the given string.
+ MongoClientURI uri = new MongoClientURI(url);
+ //Creates a MongoClient described by a URI.
+ this.client = new MongoClient(uri);
+ //Gets a Database.
+ MongoDatabase db = client.getDatabase(uri.getDatabase());
+ //Gets a collection.
+ this.collection = db.getCollection(collectionName);
+ }
+
+ /**
+ * Inserts one or more documents.
+ * This method is equivalent to a call to the bulkWrite method.
+ * The documents will be inserted in the order provided,
+ * stopping on the first failed insertion.
+ *
+ * @param documents documents
+ */
+ public void insert(List<Document> documents, boolean ordered) {
+ InsertManyOptions options = new InsertManyOptions();
+ if (!ordered) {
+ options.ordered(false);
+ }
+ collection.insertMany(documents, options);
+ }
+
+ /**
+ * Update a single or all documents in the collection according to the specified arguments.
+ * When upsert set to true, the new document will be inserted if there are no matches to the query filter.
+ *
+ * @param filter Bson filter
+ * @param document Bson document
+ * @param upsert a new document should be inserted if there are no matches to the query filter
+ * @param many whether find all documents according to the query filter
+ */
+ public void update(Bson filter, Bson document, boolean upsert, boolean many) {
+ //TODO batch updating
+ UpdateOptions options = new UpdateOptions();
+ if (upsert) {
+ options.upsert(true);
+ }
+ if (many) {
+ collection.updateMany(filter, document, options);
+ } else {
+ collection.updateOne(filter, document, options);
+ }
+ }
+
+ /**
+ * Finds a single document in the collection according to the specified arguments.
+ *
+ * @param filter Bson filter
+ */
+ public Document find(Bson filter) {
+ //TODO batch finding
+ return collection.find(filter).first();
+ }
+
+ /**
+ * Closes all resources associated with this instance.
+ */
+ public void close() {
+ client.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java
index 371b1ed..521678e 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java
@@ -1,47 +1,47 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.common;
-
-import java.io.Serializable;
-import java.util.List;
-
-import org.apache.storm.tuple.ITuple;
-import org.bson.conversions.Bson;
-
-/**
- * Create a MongoDB query Filter by given Tuple/trident keys.
- */
-public interface QueryFilterCreator extends Serializable {
-
- /**
- * Create a query Filter by given Tuple.
- *
- * @param tuple ITuple tuple
- * @return query Filter
- */
- Bson createFilter(ITuple tuple);
-
- /**
- * Create a query Filter by given trident keys.
- *
- * @param keys keys
- * @return query Filter
- */
- Bson createFilterByKeys(List<Object> keys);
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.common;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.storm.tuple.ITuple;
+import org.bson.conversions.Bson;
+
+/**
+ * Create a MongoDB query Filter by given Tuple/trident keys.
+ */
+public interface QueryFilterCreator extends Serializable {
+
+ /**
+ * Create a query Filter by given Tuple.
+ *
+ * @param tuple ITuple tuple
+ * @return query Filter
+ */
+ Bson createFilter(ITuple tuple);
+
+ /**
+ * Create a query Filter by given trident keys.
+ *
+ * @param keys keys
+ * @return query Filter
+ */
+ Bson createFilterByKeys(List<Object> keys);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java
index 1f5774a..16a71ec 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java
@@ -1,47 +1,47 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.common;
-
-import com.mongodb.client.model.Filters;
-
-import java.util.List;
-
-import org.apache.storm.tuple.ITuple;
-import org.bson.conversions.Bson;
-
-public class SimpleQueryFilterCreator implements QueryFilterCreator {
-
- private String field;
-
- @Override
- public Bson createFilter(ITuple tuple) {
- return Filters.eq(field, tuple.getValueByField(field));
- }
-
- @Override
- public Bson createFilterByKeys(List<Object> keys) {
- return Filters.eq("_id", MongoUtils.getId(keys));
- }
-
- public SimpleQueryFilterCreator withField(String field) {
- this.field = field;
- return this;
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.common;
+
+import com.mongodb.client.model.Filters;
+
+import java.util.List;
+
+import org.apache.storm.tuple.ITuple;
+import org.bson.conversions.Bson;
+
+public class SimpleQueryFilterCreator implements QueryFilterCreator {
+
+ private String field;
+
+ @Override
+ public Bson createFilter(ITuple tuple) {
+ return Filters.eq(field, tuple.getValueByField(field));
+ }
+
+ @Override
+ public Bson createFilterByKeys(List<Object> keys) {
+ return Filters.eq("_id", MongoUtils.getId(keys));
+ }
+
+ public SimpleQueryFilterCreator withField(String field) {
+ this.field = field;
+ return this;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java
index 3ea9c16..ed27c92 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java
@@ -1,47 +1,47 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.common.mapper;
-
-import java.io.Serializable;
-import java.util.List;
-
-import org.apache.storm.tuple.ITuple;
-import org.bson.Document;
-
-/**
- * Given a Tuple/trident keys, converts it to an MongoDB document.
- */
-public interface MongoMapper extends Serializable {
-
- /**
- * Converts a Tuple to a Document.
- *
- * @param tuple the incoming tuple
- * @return the MongoDB document
- */
- Document toDocument(ITuple tuple);
-
- /**
- * Converts a keys to a Document.
- *
- * @param keys the trident keys
- * @return the MongoDB document
- */
- Document toDocumentByKeys(List<Object> keys);
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.common.mapper;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.storm.tuple.ITuple;
+import org.bson.Document;
+
+/**
+ * Given a Tuple/trident keys, converts it to an MongoDB document.
+ */
+public interface MongoMapper extends Serializable {
+
+ /**
+ * Converts a Tuple to a Document.
+ *
+ * @param tuple the incoming tuple
+ * @return the MongoDB document
+ */
+ Document toDocument(ITuple tuple);
+
+ /**
+ * Converts a keys to a Document.
+ *
+ * @param keys the trident keys
+ * @return the MongoDB document
+ */
+ Document toDocumentByKeys(List<Object> keys);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java
index 7bb0a06..1a38828 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java
@@ -1,55 +1,55 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.common.mapper;
-
-import java.util.List;
-
-import org.apache.storm.mongodb.common.MongoUtils;
-import org.apache.storm.tuple.ITuple;
-import org.bson.Document;
-
-public class SimpleMongoMapper implements MongoMapper {
-
- private String[] fields;
-
- public SimpleMongoMapper(String... fields) {
- this.fields = fields;
- }
-
- @Override
- public Document toDocument(ITuple tuple) {
- Document document = new Document();
- for (String field : fields) {
- document.append(field, tuple.getValueByField(field));
- }
- return document;
- }
-
- @Override
- public Document toDocumentByKeys(List<Object> keys) {
- Document document = new Document();
- document.append("_id", MongoUtils.getId(keys));
- return document;
- }
-
- public SimpleMongoMapper withFields(String... fields) {
- this.fields = fields;
- return this;
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.common.mapper;
+
+import java.util.List;
+
+import org.apache.storm.mongodb.common.MongoUtils;
+import org.apache.storm.tuple.ITuple;
+import org.bson.Document;
+
+public class SimpleMongoMapper implements MongoMapper {
+
+ private String[] fields;
+
+ public SimpleMongoMapper(String... fields) {
+ this.fields = fields;
+ }
+
+ @Override
+ public Document toDocument(ITuple tuple) {
+ Document document = new Document();
+ for (String field : fields) {
+ document.append(field, tuple.getValueByField(field));
+ }
+ return document;
+ }
+
+ @Override
+ public Document toDocumentByKeys(List<Object> keys) {
+ Document document = new Document();
+ document.append("_id", MongoUtils.getId(keys));
+ return document;
+ }
+
+ public SimpleMongoMapper withFields(String... fields) {
+ this.fields = fields;
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java
index 3ed17ec..72328c9 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java
@@ -1,46 +1,46 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.common.mapper;
-
-import org.apache.storm.tuple.ITuple;
-import org.bson.Document;
-
-public class SimpleMongoUpdateMapper extends SimpleMongoMapper implements MongoUpdateMapper {
-
- private String[] fields;
-
- public SimpleMongoUpdateMapper(String... fields) {
- this.fields = fields;
- }
-
- @Override
- public Document toDocument(ITuple tuple) {
- Document document = new Document();
- for (String field : fields) {
- document.append(field, tuple.getValueByField(field));
- }
- //$set operator: Sets the value of a field in a document.
- return new Document("$set", document);
- }
-
- public SimpleMongoUpdateMapper withFields(String... fields) {
- this.fields = fields;
- return this;
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.common.mapper;
+
+import org.apache.storm.tuple.ITuple;
+import org.bson.Document;
+
+public class SimpleMongoUpdateMapper extends SimpleMongoMapper implements MongoUpdateMapper {
+
+ private String[] fields;
+
+ public SimpleMongoUpdateMapper(String... fields) {
+ this.fields = fields;
+ }
+
+ @Override
+ public Document toDocument(ITuple tuple) {
+ Document document = new Document();
+ for (String field : fields) {
+ document.append(field, tuple.getValueByField(field));
+ }
+ //$set operator: Sets the value of a field in a document.
+ return new Document("$set", document);
+ }
+
+ public SimpleMongoUpdateMapper withFields(String... fields) {
+ this.fields = fields;
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
index 77c394c..100f931 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
@@ -1,145 +1,145 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.trident.state;
-
-import com.google.common.collect.Lists;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang.Validate;
-import org.apache.storm.mongodb.common.MongoDbClient;
-import org.apache.storm.mongodb.common.QueryFilterCreator;
-import org.apache.storm.mongodb.common.mapper.MongoLookupMapper;
-import org.apache.storm.mongodb.common.mapper.MongoMapper;
-import org.apache.storm.topology.FailedException;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.state.State;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.apache.storm.tuple.Values;
-import org.bson.Document;
-import org.bson.conversions.Bson;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MongoState implements State {
-
- private static final Logger LOG = LoggerFactory.getLogger(MongoState.class);
-
- private Options options;
- private MongoDbClient mongoClient;
- private Map<String, Object> map;
-
- protected MongoState(Map<String, Object> map, Options options) {
- this.options = options;
- this.map = map;
- }
-
- public static class Options implements Serializable {
- private String url;
- private String collectionName;
- private MongoMapper mapper;
- private MongoLookupMapper lookupMapper;
- private QueryFilterCreator queryCreator;
-
- public Options withUrl(String url) {
- this.url = url;
- return this;
- }
-
- public Options withCollectionName(String collectionName) {
- this.collectionName = collectionName;
- return this;
- }
-
- public Options withMapper(MongoMapper mapper) {
- this.mapper = mapper;
- return this;
- }
-
- public Options withMongoLookupMapper(MongoLookupMapper lookupMapper) {
- this.lookupMapper = lookupMapper;
- return this;
- }
-
- public Options withQueryFilterCreator(QueryFilterCreator queryCreator) {
- this.queryCreator = queryCreator;
- return this;
- }
- }
-
- protected void prepare() {
- Validate.notEmpty(options.url, "url can not be blank or null");
- Validate.notEmpty(options.collectionName, "collectionName can not be blank or null");
-
- this.mongoClient = new MongoDbClient(options.url, options.collectionName);
- }
-
- @Override
- public void beginCommit(Long txid) {
- LOG.debug("beginCommit is noop.");
- }
-
- @Override
- public void commit(Long txid) {
- LOG.debug("commit is noop.");
- }
-
- /**
- * Update Mongo state.
- * @param tuples trident tuples
- * @param collector trident collector
- */
- public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
- List<Document> documents = Lists.newArrayList();
- for (TridentTuple tuple : tuples) {
- Document document = options.mapper.toDocument(tuple);
- documents.add(document);
- }
-
- try {
- this.mongoClient.insert(documents, true);
- } catch (Exception e) {
- LOG.warn("Batch write failed but some requests might have succeeded. Triggering replay.", e);
- throw new FailedException(e);
- }
- }
-
- /**
- * Batch retrieve values.
- * @param tridentTuples trident tuples
- * @return values
- */
- public List<List<Values>> batchRetrieve(List<TridentTuple> tridentTuples) {
- List<List<Values>> batchRetrieveResult = Lists.newArrayList();
- try {
- for (TridentTuple tuple : tridentTuples) {
- Bson filter = options.queryCreator.createFilter(tuple);
- Document doc = mongoClient.find(filter);
- List<Values> values = options.lookupMapper.toTuple(tuple, doc);
- batchRetrieveResult.add(values);
- }
- } catch (Exception e) {
- LOG.warn("Batch get operation failed. Triggering replay.", e);
- throw new FailedException(e);
- }
- return batchRetrieveResult;
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.trident.state;
+
+import com.google.common.collect.Lists;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.Validate;
+import org.apache.storm.mongodb.common.MongoDbClient;
+import org.apache.storm.mongodb.common.QueryFilterCreator;
+import org.apache.storm.mongodb.common.mapper.MongoLookupMapper;
+import org.apache.storm.mongodb.common.mapper.MongoMapper;
+import org.apache.storm.topology.FailedException;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Values;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MongoState implements State {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MongoState.class);
+
+ private Options options;
+ private MongoDbClient mongoClient;
+ private Map<String, Object> map;
+
+ protected MongoState(Map<String, Object> map, Options options) {
+ this.options = options;
+ this.map = map;
+ }
+
+ public static class Options implements Serializable {
+ private String url;
+ private String collectionName;
+ private MongoMapper mapper;
+ private MongoLookupMapper lookupMapper;
+ private QueryFilterCreator queryCreator;
+
+ public Options withUrl(String url) {
+ this.url = url;
+ return this;
+ }
+
+ public Options withCollectionName(String collectionName) {
+ this.collectionName = collectionName;
+ return this;
+ }
+
+ public Options withMapper(MongoMapper mapper) {
+ this.mapper = mapper;
+ return this;
+ }
+
+ public Options withMongoLookupMapper(MongoLookupMapper lookupMapper) {
+ this.lookupMapper = lookupMapper;
+ return this;
+ }
+
+ public Options withQueryFilterCreator(QueryFilterCreator queryCreator) {
+ this.queryCreator = queryCreator;
+ return this;
+ }
+ }
+
+ protected void prepare() {
+ Validate.notEmpty(options.url, "url can not be blank or null");
+ Validate.notEmpty(options.collectionName, "collectionName can not be blank or null");
+
+ this.mongoClient = new MongoDbClient(options.url, options.collectionName);
+ }
+
+ @Override
+ public void beginCommit(Long txid) {
+ LOG.debug("beginCommit is noop.");
+ }
+
+ @Override
+ public void commit(Long txid) {
+ LOG.debug("commit is noop.");
+ }
+
+ /**
+ * Update Mongo state.
+ * @param tuples trident tuples
+ * @param collector trident collector
+ */
+ public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
+ List<Document> documents = Lists.newArrayList();
+ for (TridentTuple tuple : tuples) {
+ Document document = options.mapper.toDocument(tuple);
+ documents.add(document);
+ }
+
+ try {
+ this.mongoClient.insert(documents, true);
+ } catch (Exception e) {
+ LOG.warn("Batch write failed but some requests might have succeeded. Triggering replay.", e);
+ throw new FailedException(e);
+ }
+ }
+
+ /**
+ * Batch retrieve values.
+ * @param tridentTuples trident tuples
+ * @return values
+ */
+ public List<List<Values>> batchRetrieve(List<TridentTuple> tridentTuples) {
+ List<List<Values>> batchRetrieveResult = Lists.newArrayList();
+ try {
+ for (TridentTuple tuple : tridentTuples) {
+ Bson filter = options.queryCreator.createFilter(tuple);
+ Document doc = mongoClient.find(filter);
+ List<Values> values = options.lookupMapper.toTuple(tuple, doc);
+ batchRetrieveResult.add(values);
+ }
+ } catch (Exception e) {
+ LOG.warn("Batch get operation failed. Triggering replay.", e);
+ throw new FailedException(e);
+ }
+ return batchRetrieveResult;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateFactory.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateFactory.java
index a6797e2..d27d9a9 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateFactory.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateFactory.java
@@ -1,43 +1,43 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.trident.state;
-
-import java.util.Map;
-
-import org.apache.storm.task.IMetricsContext;
-import org.apache.storm.trident.state.State;
-import org.apache.storm.trident.state.StateFactory;
-
-public class MongoStateFactory implements StateFactory {
-
- private MongoState.Options options;
-
- public MongoStateFactory(MongoState.Options options) {
- this.options = options;
- }
-
- @Override
- public State makeState(Map<String, Object> conf, IMetricsContext metrics,
- int partitionIndex, int numPartitions) {
- MongoState state = new MongoState(conf, options);
- state.prepare();
- return state;
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.trident.state;
+
+import java.util.Map;
+
+import org.apache.storm.task.IMetricsContext;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.state.StateFactory;
+
+public class MongoStateFactory implements StateFactory {
+
+ private MongoState.Options options;
+
+ public MongoStateFactory(MongoState.Options options) {
+ this.options = options;
+ }
+
+ @Override
+ public State makeState(Map<String, Object> conf, IMetricsContext metrics,
+ int partitionIndex, int numPartitions) {
+ MongoState state = new MongoState(conf, options);
+ state.prepare();
+ return state;
+ }
+
+}
[4/5] storm git commit: set '* text=auto' in .gitattributes in order
to avoid merge work because of line feed changes
Posted by sr...@apache.org.
set '* text=auto' in .gitattributes in order to avoid merge work because of line feed changes
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e9c427cb
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e9c427cb
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e9c427cb
Branch: refs/heads/master
Commit: e9c427cbe4b32f6103e704c48bc1dbcb936b98cb
Parents: 0fdad2c
Author: Karl-Philipp Richter <kr...@aol.de>
Authored: Fri Mar 23 06:00:07 2018 +0100
Committer: Karl-Philipp Richter <kr...@aol.de>
Committed: Fri Mar 23 06:00:07 2018 +0100
----------------------------------------------------------------------
.gitattributes | 11 +-
docs/storm-eventhubs.md | 80 +--
external/storm-eventhubs/README.md | 90 +--
.../eventhubs/bolt/DefaultEventDataFormat.java | 94 +--
.../storm/eventhubs/bolt/EventHubBolt.java | 292 ++++-----
.../eventhubs/bolt/EventHubBoltConfig.java | 222 +++----
.../storm/eventhubs/bolt/IEventDataFormat.java | 56 +-
.../storm/eventhubs/spout/EventDataScheme.java | 156 ++---
.../storm/eventhubs/spout/FieldConstants.java | 56 +-
.../storm/eventhubs/spout/IEventDataScheme.java | 88 +--
.../spout/IEventHubReceiverFactory.java | 60 +-
.../eventhubs/spout/IPartitionCoordinator.java | 54 +-
.../eventhubs/spout/IPartitionManager.java | 74 +--
.../storm/eventhubs/spout/IStateStore.java | 62 +-
.../apache/storm/eventhubs/spout/MessageId.java | 112 ++--
.../eventhubs/spout/ZookeeperStateStore.java | 190 +++---
.../storm/eventhubs/trident/Coordinator.java | 120 ++--
.../trident/ITridentPartitionManager.java | 70 +-
.../ITridentPartitionManagerFactory.java | 52 +-
.../trident/OpaqueTridentEventHubEmitter.java | 136 ++--
.../trident/OpaqueTridentEventHubSpout.java | 128 ++--
.../storm/eventhubs/trident/Partition.java | 78 +--
.../storm/eventhubs/trident/Partitions.java | 82 +--
.../TransactionalTridentEventHubSpout.java | 132 ++--
.../storm/eventhubs/samples/EventHubLoop.java | 104 +--
.../samples/OpaqueTridentEventCount.java | 106 +--
.../samples/TransactionalTridentEventCount.java | 162 ++---
.../eventhubs/samples/bolt/GlobalCountBolt.java | 176 ++---
.../samples/bolt/PartialCountBolt.java | 136 ++--
.../spout/SpoutOutputCollectorMock.java | 142 ++--
.../storm/eventhubs/spout/StateStoreMock.java | 108 +--
.../storm/eventhubs/spout/TestEventData.java | 94 +--
.../eventhubs/trident/TridentCollectorMock.java | 114 ++--
external/storm-mongodb/README.md | 650 +++++++++----------
.../storm/mongodb/bolt/AbstractMongoBolt.java | 124 ++--
.../storm/mongodb/bolt/MongoInsertBolt.java | 248 +++----
.../storm/mongodb/bolt/MongoUpdateBolt.java | 186 +++---
.../storm/mongodb/common/MongoDbClient.java | 218 +++----
.../mongodb/common/QueryFilterCreator.java | 94 +--
.../common/SimpleQueryFilterCreator.java | 94 +--
.../mongodb/common/mapper/MongoMapper.java | 94 +--
.../common/mapper/SimpleMongoMapper.java | 110 ++--
.../common/mapper/SimpleMongoUpdateMapper.java | 92 +--
.../storm/mongodb/trident/state/MongoState.java | 290 ++++-----
.../trident/state/MongoStateFactory.java | 86 +--
.../trident/state/MongoStateUpdater.java | 70 +-
integration-test/README.md | 118 ++--
.../storm/trident/TestTridentTopology.java | 152 ++---
48 files changed, 3136 insertions(+), 3127 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/.gitattributes
----------------------------------------------------------------------
diff --git a/.gitattributes b/.gitattributes
index c2dc683..ed9fb85 100644
--- a/.gitattributes
+++ b/.gitattributes
@@ -1,2 +1,11 @@
# Some storm-webapp logviewer tests require input files to have LF line endings due to byte counting.
-storm-webapp/src/test/resources/*.log.test text eol=lf
\ No newline at end of file
+storm-webapp/src/test/resources/*.log.test text eol=lf
+
+# Convert the test on check-in and check-out (the conversion of all files has been done once on master and should be enforced from now on)
+* text=auto
+
+# There're reports of EOL conversion messing up PNG files, but that might have been a bug in git 2.10 only (see https://github.com/git/git/blob/master/Documentation/RelNotes/2.10.0.txt#L248 for details)
+*.png binary
+*.tar.gz binary
+*.zip binary
+*.tgz binary
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/docs/storm-eventhubs.md
----------------------------------------------------------------------
diff --git a/docs/storm-eventhubs.md b/docs/storm-eventhubs.md
index 4af8c43..df46755 100644
--- a/docs/storm-eventhubs.md
+++ b/docs/storm-eventhubs.md
@@ -1,40 +1,40 @@
----
-title: Azue Event Hubs Integration
-layout: documentation
-documentation: true
----
-
-Storm spout and bolt implementation for Microsoft Azure Eventhubs
-
-### build ###
- mvn clean package
-
-### run sample topology ###
-To run the sample topology, you need to modify the config.properties file with
-the eventhubs configurations. Here is an example:
-
- eventhubspout.username = [username: policy name in EventHubs Portal]
- eventhubspout.password = [password: shared access key in EventHubs Portal]
- eventhubspout.namespace = [namespace]
- eventhubspout.entitypath = [entitypath]
- eventhubspout.partitions.count = [partitioncount]
-
- # if not provided, will use storm's zookeeper settings
- # zookeeper.connectionstring=zookeeper0:2181,zookeeper1:2181,zookeeper2:2181
-
- eventhubspout.checkpoint.interval = 10
- eventhub.receiver.credits = 1024
-
-Then you can use storm.cmd to submit the sample topology:
- storm jar {jarfile} com.microsoft.eventhubs.samples.EventCount {topologyname} {spoutconffile}
- where the {jarfile} should be: eventhubs-storm-spout-{version}-jar-with-dependencies.jar
-
-### Run EventHubSendClient ###
-We have included a simple EventHubs send client for testing purpose. You can run the client like this:
- java -cp .\target\eventhubs-storm-spout-{version}-jar-with-dependencies.jar com.microsoft.eventhubs.client.EventHubSendClient
- [username] [password] [entityPath] [partitionId] [messageSize] [messageCount]
-If you want to send messages to all partitions, use "-1" as partitionId.
-
-### Windows Azure Eventhubs ###
- http://azure.microsoft.com/en-us/services/event-hubs/
-
+---
+title: Azue Event Hubs Integration
+layout: documentation
+documentation: true
+---
+
+Storm spout and bolt implementation for Microsoft Azure Eventhubs
+
+### build ###
+ mvn clean package
+
+### run sample topology ###
+To run the sample topology, you need to modify the config.properties file with
+the eventhubs configurations. Here is an example:
+
+ eventhubspout.username = [username: policy name in EventHubs Portal]
+ eventhubspout.password = [password: shared access key in EventHubs Portal]
+ eventhubspout.namespace = [namespace]
+ eventhubspout.entitypath = [entitypath]
+ eventhubspout.partitions.count = [partitioncount]
+
+ # if not provided, will use storm's zookeeper settings
+ # zookeeper.connectionstring=zookeeper0:2181,zookeeper1:2181,zookeeper2:2181
+
+ eventhubspout.checkpoint.interval = 10
+ eventhub.receiver.credits = 1024
+
+Then you can use storm.cmd to submit the sample topology:
+ storm jar {jarfile} com.microsoft.eventhubs.samples.EventCount {topologyname} {spoutconffile}
+ where the {jarfile} should be: eventhubs-storm-spout-{version}-jar-with-dependencies.jar
+
+### Run EventHubSendClient ###
+We have included a simple EventHubs send client for testing purpose. You can run the client like this:
+ java -cp .\target\eventhubs-storm-spout-{version}-jar-with-dependencies.jar com.microsoft.eventhubs.client.EventHubSendClient
+ [username] [password] [entityPath] [partitionId] [messageSize] [messageCount]
+If you want to send messages to all partitions, use "-1" as partitionId.
+
+### Windows Azure Eventhubs ###
+ http://azure.microsoft.com/en-us/services/event-hubs/
+
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/README.md
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/README.md b/external/storm-eventhubs/README.md
index 681ab2d..98b43a5 100755
--- a/external/storm-eventhubs/README.md
+++ b/external/storm-eventhubs/README.md
@@ -1,45 +1,45 @@
-storm-eventhubs
-=====================
-
-Storm spout and bolt implementation for Microsoft Azure Eventhubs
-
-### build ###
- mvn clean package
-
-### run sample topology ###
-To run the sample topology, you need to modify the config.properties file with
-the eventhubs configurations. Here is an example:
-
- eventhubspout.username = [username: policy name in EventHubs Portal]
- eventhubspout.password = [password: shared access key in EventHubs Portal]
- eventhubspout.namespace = [namespace]
- eventhubspout.entitypath = [entitypath]
- eventhubspout.partitions.count = [partitioncount]
-
- # if not provided, will use storm's zookeeper settings
- # zookeeper.connectionstring=zookeeper0:2181,zookeeper1:2181,zookeeper2:2181
-
- eventhubspout.checkpoint.interval = 10
- eventhub.receiver.credits = 1024
-
-Then you can use storm.cmd to submit the sample topology:
- storm jar {jarfile} com.microsoft.eventhubs.samples.EventCount {topologyname} {spoutconffile}
- where the {jarfile} should be: eventhubs-storm-spout-{version}-jar-with-dependencies.jar
-
-### Run EventHubSendClient ###
-We have included a simple EventHubs send client for testing purpose. You can run the client like this:
- java -cp .\target\eventhubs-storm-spout-{version}-jar-with-dependencies.jar com.microsoft.eventhubs.client.EventHubSendClient
- [username] [password] [entityPath] [partitionId] [messageSize] [messageCount]
-If you want to send messages to all partitions, use "-1" as partitionId.
-
-### Serialization Scheme ###
- By default the serialization scheme is StringEventDataScheme where only the body of the eventdata is being sent.
- To have a more comprehensive Eventhub metadata exposure look into the BinaryEventDataScheme.
-
-### Windows Azure Eventhubs ###
- http://azure.microsoft.com/en-us/services/event-hubs/
-
-## Committer Sponsors
-
- * P. Taylor Goetz ([ptgoetz@apache.org](mailto:ptgoetz@apache.org))
-
+storm-eventhubs
+=====================
+
+Storm spout and bolt implementation for Microsoft Azure Eventhubs
+
+### build ###
+ mvn clean package
+
+### run sample topology ###
+To run the sample topology, you need to modify the config.properties file with
+the eventhubs configurations. Here is an example:
+
+ eventhubspout.username = [username: policy name in EventHubs Portal]
+ eventhubspout.password = [password: shared access key in EventHubs Portal]
+ eventhubspout.namespace = [namespace]
+ eventhubspout.entitypath = [entitypath]
+ eventhubspout.partitions.count = [partitioncount]
+
+ # if not provided, will use storm's zookeeper settings
+ # zookeeper.connectionstring=zookeeper0:2181,zookeeper1:2181,zookeeper2:2181
+
+ eventhubspout.checkpoint.interval = 10
+ eventhub.receiver.credits = 1024
+
+Then you can use storm.cmd to submit the sample topology:
+ storm jar {jarfile} com.microsoft.eventhubs.samples.EventCount {topologyname} {spoutconffile}
+ where the {jarfile} should be: eventhubs-storm-spout-{version}-jar-with-dependencies.jar
+
+### Run EventHubSendClient ###
+We have included a simple EventHubs send client for testing purpose. You can run the client like this:
+ java -cp .\target\eventhubs-storm-spout-{version}-jar-with-dependencies.jar com.microsoft.eventhubs.client.EventHubSendClient
+ [username] [password] [entityPath] [partitionId] [messageSize] [messageCount]
+If you want to send messages to all partitions, use "-1" as partitionId.
+
+### Serialization Scheme ###
+ By default the serialization scheme is StringEventDataScheme where only the body of the eventdata is being sent.
+ To have a more comprehensive Eventhub metadata exposure look into the BinaryEventDataScheme.
+
+### Windows Azure Eventhubs ###
+ http://azure.microsoft.com/en-us/services/event-hubs/
+
+## Committer Sponsors
+
+ * P. Taylor Goetz ([ptgoetz@apache.org](mailto:ptgoetz@apache.org))
+
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/DefaultEventDataFormat.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/DefaultEventDataFormat.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/DefaultEventDataFormat.java
index 21940de..d6e1dbc 100644
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/DefaultEventDataFormat.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/DefaultEventDataFormat.java
@@ -1,47 +1,47 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.bolt;
-
-import org.apache.storm.tuple.Tuple;
-
-/**
- * A default implementation of IEventDataFormat that converts the tuple
- * into a delimited string.
- */
-public class DefaultEventDataFormat implements IEventDataFormat {
- private static final long serialVersionUID = 1L;
- private String delimiter = ",";
-
- public DefaultEventDataFormat withFieldDelimiter(String delimiter) {
- this.delimiter = delimiter;
- return this;
- }
-
- @Override
- public byte[] serialize(Tuple tuple) {
- StringBuilder sb = new StringBuilder();
- for(Object obj : tuple.getValues()) {
- if(sb.length() != 0) {
- sb.append(delimiter);
- }
- sb.append(obj.toString());
- }
- return sb.toString().getBytes();
- }
-
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.bolt;
+
+import org.apache.storm.tuple.Tuple;
+
+/**
+ * A default implementation of IEventDataFormat that converts the tuple
+ * into a delimited string.
+ */
+public class DefaultEventDataFormat implements IEventDataFormat {
+ private static final long serialVersionUID = 1L;
+ private String delimiter = ",";
+
+ public DefaultEventDataFormat withFieldDelimiter(String delimiter) {
+ this.delimiter = delimiter;
+ return this;
+ }
+
+ @Override
+ public byte[] serialize(Tuple tuple) {
+ StringBuilder sb = new StringBuilder();
+ for(Object obj : tuple.getValues()) {
+ if(sb.length() != 0) {
+ sb.append(delimiter);
+ }
+ sb.append(obj.toString());
+ }
+ return sb.toString().getBytes();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
index bede4a3..7d1aeab 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
@@ -1,146 +1,146 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.bolt;
-
-
-import com.microsoft.azure.eventhubs.EventData;
-import com.microsoft.azure.eventhubs.EventHubClient;
-import com.microsoft.azure.eventhubs.PartitionSender;
-import com.microsoft.azure.servicebus.ServiceBusException;
-import org.apache.storm.eventhubs.spout.EventHubException;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichBolt;
-import org.apache.storm.tuple.Tuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-
-/**
- * A bolt that writes event message to EventHub.
- */
-public class EventHubBolt extends BaseRichBolt {
- private static final long serialVersionUID = 1L;
- private static final Logger logger = LoggerFactory
- .getLogger(EventHubBolt.class);
-
- protected OutputCollector collector;
- protected PartitionSender sender;
- protected EventHubClient ehClient;
- protected EventHubBoltConfig boltConfig;
-
- public EventHubBolt(String connectionString, String entityPath) {
- boltConfig = new EventHubBoltConfig(connectionString, entityPath);
- }
-
- public EventHubBolt(String userName, String password, String namespace,
- String entityPath, boolean partitionMode) {
- boltConfig = new EventHubBoltConfig(userName, password, namespace,
- entityPath, partitionMode);
- }
-
- public EventHubBolt(EventHubBoltConfig config) {
- boltConfig = config;
- }
-
- @Override
- public void prepare(Map<String, Object> config, TopologyContext context,
- OutputCollector collector) {
- this.collector = collector;
- String myPartitionId = null;
- if (boltConfig.getPartitionMode()) {
- // We can use the task index (starting from 0) as the partition ID
- myPartitionId = "" + context.getThisTaskIndex();
- }
- logger.info("creating sender: " + boltConfig.getConnectionString()
- + ", " + boltConfig.getEntityPath() + ", " + myPartitionId);
- try {
- ehClient = EventHubClient.createFromConnectionStringSync(boltConfig.getConnectionString());
- if (boltConfig.getPartitionMode()) {
- sender = ehClient.createPartitionSenderSync(Integer.toString(context.getThisTaskIndex()));
- }
- } catch (Exception ex) {
- collector.reportError(ex);
- throw new RuntimeException(ex);
- }
-
- }
-
- @Override
- public void execute(Tuple tuple) {
- try {
- EventData sendEvent = new EventData(boltConfig.getEventDataFormat().serialize(tuple));
- if (boltConfig.getPartitionMode() && sender!=null) {
- sender.sendSync(sendEvent);
- }
- else if (boltConfig.getPartitionMode() && sender==null) {
- throw new EventHubException("Sender is null");
- }
- else if (!boltConfig.getPartitionMode() && ehClient!=null) {
- ehClient.sendSync(sendEvent);
- }
- else if (!boltConfig.getPartitionMode() && ehClient==null) {
- throw new EventHubException("ehclient is null");
- }
- collector.ack(tuple);
- } catch (EventHubException ex ) {
- collector.reportError(ex);
- collector.fail(tuple);
- } catch (ServiceBusException e) {
- collector.reportError(e);
- collector.fail(tuple);
- }
- }
-
- @Override
- public void cleanup() {
- if(sender != null) {
- try {
- sender.close().whenComplete((voidargs,error)->{
- try{
- if(error!=null){
- logger.error("Exception during sender cleanup phase"+error.toString());
- }
- ehClient.closeSync();
- }catch (Exception e){
- logger.error("Exception during ehclient cleanup phase"+e.toString());
- }
- }).get();
- } catch (InterruptedException e) {
- logger.error("Exception occured during cleanup phase"+e.toString());
- } catch (ExecutionException e) {
- logger.error("Exception occured during cleanup phase"+e.toString());
- }
- logger.info("Eventhub Bolt cleaned up");
- sender = null;
- ehClient = null;
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
- }
-
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.bolt;
+
+
+import com.microsoft.azure.eventhubs.EventData;
+import com.microsoft.azure.eventhubs.EventHubClient;
+import com.microsoft.azure.eventhubs.PartitionSender;
+import com.microsoft.azure.servicebus.ServiceBusException;
+import org.apache.storm.eventhubs.spout.EventHubException;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * A bolt that writes event message to EventHub.
+ */
+public class EventHubBolt extends BaseRichBolt {
+ private static final long serialVersionUID = 1L;
+ private static final Logger logger = LoggerFactory
+ .getLogger(EventHubBolt.class);
+
+ protected OutputCollector collector;
+ protected PartitionSender sender;
+ protected EventHubClient ehClient;
+ protected EventHubBoltConfig boltConfig;
+
+ public EventHubBolt(String connectionString, String entityPath) {
+ boltConfig = new EventHubBoltConfig(connectionString, entityPath);
+ }
+
+ public EventHubBolt(String userName, String password, String namespace,
+ String entityPath, boolean partitionMode) {
+ boltConfig = new EventHubBoltConfig(userName, password, namespace,
+ entityPath, partitionMode);
+ }
+
+ public EventHubBolt(EventHubBoltConfig config) {
+ boltConfig = config;
+ }
+
+ @Override
+ public void prepare(Map<String, Object> config, TopologyContext context,
+ OutputCollector collector) {
+ this.collector = collector;
+ String myPartitionId = null;
+ if (boltConfig.getPartitionMode()) {
+ // We can use the task index (starting from 0) as the partition ID
+ myPartitionId = "" + context.getThisTaskIndex();
+ }
+ logger.info("creating sender: " + boltConfig.getConnectionString()
+ + ", " + boltConfig.getEntityPath() + ", " + myPartitionId);
+ try {
+ ehClient = EventHubClient.createFromConnectionStringSync(boltConfig.getConnectionString());
+ if (boltConfig.getPartitionMode()) {
+ sender = ehClient.createPartitionSenderSync(Integer.toString(context.getThisTaskIndex()));
+ }
+ } catch (Exception ex) {
+ collector.reportError(ex);
+ throw new RuntimeException(ex);
+ }
+
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ try {
+ EventData sendEvent = new EventData(boltConfig.getEventDataFormat().serialize(tuple));
+ if (boltConfig.getPartitionMode() && sender!=null) {
+ sender.sendSync(sendEvent);
+ }
+ else if (boltConfig.getPartitionMode() && sender==null) {
+ throw new EventHubException("Sender is null");
+ }
+ else if (!boltConfig.getPartitionMode() && ehClient!=null) {
+ ehClient.sendSync(sendEvent);
+ }
+ else if (!boltConfig.getPartitionMode() && ehClient==null) {
+ throw new EventHubException("ehclient is null");
+ }
+ collector.ack(tuple);
+ } catch (EventHubException ex ) {
+ collector.reportError(ex);
+ collector.fail(tuple);
+ } catch (ServiceBusException e) {
+ collector.reportError(e);
+ collector.fail(tuple);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ if(sender != null) {
+ try {
+ sender.close().whenComplete((voidargs,error)->{
+ try{
+ if(error!=null){
+ logger.error("Exception during sender cleanup phase"+error.toString());
+ }
+ ehClient.closeSync();
+ }catch (Exception e){
+ logger.error("Exception during ehclient cleanup phase"+e.toString());
+ }
+ }).get();
+ } catch (InterruptedException e) {
+ logger.error("Exception occured during cleanup phase"+e.toString());
+ } catch (ExecutionException e) {
+ logger.error("Exception occured during cleanup phase"+e.toString());
+ }
+ logger.info("Eventhub Bolt cleaned up");
+ sender = null;
+ ehClient = null;
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java
index fe2d989..f5e1458 100644
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java
@@ -1,111 +1,111 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.bolt;
-
-import com.microsoft.azure.servicebus.ConnectionStringBuilder;
-import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
-
-import java.io.Serializable;
-
-import java.io.Serializable;
-
-/*
- * EventHubs bolt configurations
- *
- * Partition mode:
- * With partitionMode=true you need to create the same number of tasks as the number of
- * EventHubs partitions, and each bolt task will only send data to one partition.
- * The partition ID is the task ID of the bolt.
- *
- * Event format:
- * The formatter to convert tuple to bytes for EventHubs.
- * if null, the default format is common delimited tuple fields.
- */
-public class EventHubBoltConfig implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private String connectionString;
- private final String entityPath;
- protected boolean partitionMode;
- protected IEventDataFormat dataFormat;
-
- public EventHubBoltConfig(String connectionString, String entityPath) {
- this(connectionString, entityPath, false, null);
- }
-
- public EventHubBoltConfig(String connectionString, String entityPath,
- boolean partitionMode) {
- this(connectionString, entityPath, partitionMode, null);
- }
-
- public EventHubBoltConfig(String userName, String password, String namespace,
- String entityPath, boolean partitionMode) {
- this(userName, password, namespace,
- EventHubSpoutConfig.EH_SERVICE_FQDN_SUFFIX, entityPath, partitionMode);
- }
-
- public EventHubBoltConfig(String connectionString, String entityPath,
- boolean partitionMode, IEventDataFormat dataFormat) {
- this.connectionString = connectionString;
- this.entityPath = entityPath;
- this.partitionMode = partitionMode;
- this.dataFormat = dataFormat;
- if(this.dataFormat == null) {
- this.dataFormat = new DefaultEventDataFormat();
- }
- }
-
- public EventHubBoltConfig(String userName, String password, String namespace,
- String targetFqnAddress, String entityPath) {
- this(userName, password, namespace, targetFqnAddress, entityPath, false, null);
- }
-
- public EventHubBoltConfig(String userName, String password, String namespace,
- String targetFqnAddress, String entityPath, boolean partitionMode) {
- this(userName, password, namespace, targetFqnAddress, entityPath, partitionMode, null);
- }
-
- public EventHubBoltConfig(String userName, String password, String namespace,
- String targetFqnAddress, String entityPath, boolean partitionMode,
- IEventDataFormat dataFormat) {
- this.connectionString = new ConnectionStringBuilder(namespace,entityPath,
- userName,password).toString();
- this.entityPath = entityPath;
- this.partitionMode = partitionMode;
- this.dataFormat = dataFormat;
- if(this.dataFormat == null) {
- this.dataFormat = new DefaultEventDataFormat();
- }
- }
-
- public String getConnectionString() {
- return connectionString;
- }
-
- public String getEntityPath() {
- return entityPath;
- }
-
- public boolean getPartitionMode() {
- return partitionMode;
- }
-
- public IEventDataFormat getEventDataFormat() {
- return dataFormat;
- }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.bolt;
+
+import com.microsoft.azure.servicebus.ConnectionStringBuilder;
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+
+import java.io.Serializable;
+
+import java.io.Serializable;
+
+/*
+ * EventHubs bolt configurations
+ *
+ * Partition mode:
+ * With partitionMode=true you need to create the same number of tasks as the number of
+ * EventHubs partitions, and each bolt task will only send data to one partition.
+ * The partition ID is the task ID of the bolt.
+ *
+ * Event format:
+ * The formatter to convert tuple to bytes for EventHubs.
+ * if null, the default format is common delimited tuple fields.
+ */
+public class EventHubBoltConfig implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private String connectionString;
+ private final String entityPath;
+ protected boolean partitionMode;
+ protected IEventDataFormat dataFormat;
+
+ public EventHubBoltConfig(String connectionString, String entityPath) {
+ this(connectionString, entityPath, false, null);
+ }
+
+ public EventHubBoltConfig(String connectionString, String entityPath,
+ boolean partitionMode) {
+ this(connectionString, entityPath, partitionMode, null);
+ }
+
+ public EventHubBoltConfig(String userName, String password, String namespace,
+ String entityPath, boolean partitionMode) {
+ this(userName, password, namespace,
+ EventHubSpoutConfig.EH_SERVICE_FQDN_SUFFIX, entityPath, partitionMode);
+ }
+
+ public EventHubBoltConfig(String connectionString, String entityPath,
+ boolean partitionMode, IEventDataFormat dataFormat) {
+ this.connectionString = connectionString;
+ this.entityPath = entityPath;
+ this.partitionMode = partitionMode;
+ this.dataFormat = dataFormat;
+ if(this.dataFormat == null) {
+ this.dataFormat = new DefaultEventDataFormat();
+ }
+ }
+
+ public EventHubBoltConfig(String userName, String password, String namespace,
+ String targetFqnAddress, String entityPath) {
+ this(userName, password, namespace, targetFqnAddress, entityPath, false, null);
+ }
+
+ public EventHubBoltConfig(String userName, String password, String namespace,
+ String targetFqnAddress, String entityPath, boolean partitionMode) {
+ this(userName, password, namespace, targetFqnAddress, entityPath, partitionMode, null);
+ }
+
+ public EventHubBoltConfig(String userName, String password, String namespace,
+ String targetFqnAddress, String entityPath, boolean partitionMode,
+ IEventDataFormat dataFormat) {
+ this.connectionString = new ConnectionStringBuilder(namespace,entityPath,
+ userName,password).toString();
+ this.entityPath = entityPath;
+ this.partitionMode = partitionMode;
+ this.dataFormat = dataFormat;
+ if(this.dataFormat == null) {
+ this.dataFormat = new DefaultEventDataFormat();
+ }
+ }
+
+ public String getConnectionString() {
+ return connectionString;
+ }
+
+ public String getEntityPath() {
+ return entityPath;
+ }
+
+ public boolean getPartitionMode() {
+ return partitionMode;
+ }
+
+ public IEventDataFormat getEventDataFormat() {
+ return dataFormat;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/IEventDataFormat.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/IEventDataFormat.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/IEventDataFormat.java
index 743d5bb..d2aacb7 100644
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/IEventDataFormat.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/IEventDataFormat.java
@@ -1,28 +1,28 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.bolt;
-
-import java.io.Serializable;
-import org.apache.storm.tuple.Tuple;
-
-/**
- * Serialize a tuple to a byte array to be sent to EventHubs
- */
-public interface IEventDataFormat extends Serializable {
- public byte[] serialize(Tuple tuple);
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.bolt;
+
+import java.io.Serializable;
+import org.apache.storm.tuple.Tuple;
+
+/**
+ * Serialize a tuple to a byte array to be sent to EventHubs
+ */
+public interface IEventDataFormat extends Serializable {
+ public byte[] serialize(Tuple tuple);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
index f82ffe6..9fbcecf 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
@@ -1,78 +1,78 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.spout;
-
-import com.microsoft.azure.eventhubs.EventData;
-import org.apache.storm.tuple.Fields;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * An Event Data Scheme which deserializes message payload into the Strings. No
- * encoding is assumed. The receiver will need to handle parsing of the string
- * data in appropriate encoding.
- *
- * The resulting tuple would contain two items: the the message string, and a
- * map of properties that include metadata, which can be used to determine who
- * processes the message, and how it is processed.
- *
- * For passing the raw bytes of a messsage to Bolts, refer to
- * {@link BinaryEventDataScheme}.
- */
-public class EventDataScheme implements IEventDataScheme {
-
- private static final long serialVersionUID = 1L;
- private static final Logger logger = LoggerFactory.getLogger(EventDataScheme.class);
- @Override
- public List<Object> deserialize(EventData eventData) {
- final List<Object> fieldContents = new ArrayList<Object>();
- String messageData = "";
- if (eventData.getBytes()!=null) {
- messageData = new String(eventData.getBytes());
- }
- /*Will only serialize AMQPValue type*/
- else if (eventData.getObject()!=null) {
- try {
- if (!(eventData.getObject() instanceof List)) {
- messageData = eventData.getObject().toString();
- } else {
- throw new RuntimeException("Cannot serialize the given AMQP type");
- }
- } catch (RuntimeException e) {
- logger.error("Failed to serialize EventData payload class"
- + eventData.getObject().getClass());
- logger.error("Exception encountered while serializing EventData payload is"
- + e.toString());
- throw e;
- }
- }
- Map<String, Object> metaDataMap = eventData.getProperties();
- fieldContents.add(messageData);
- fieldContents.add(metaDataMap);
- return fieldContents;
- }
-
- @Override
- public Fields getOutputFields() {
- return new Fields(FieldConstants.Message, FieldConstants.META_DATA);
- }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import com.microsoft.azure.eventhubs.EventData;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * An Event Data Scheme which deserializes message payload into the Strings. No
+ * encoding is assumed. The receiver will need to handle parsing of the string
+ * data in appropriate encoding.
+ *
+ * The resulting tuple would contain two items: the the message string, and a
+ * map of properties that include metadata, which can be used to determine who
+ * processes the message, and how it is processed.
+ *
+ * For passing the raw bytes of a messsage to Bolts, refer to
+ * {@link BinaryEventDataScheme}.
+ */
+public class EventDataScheme implements IEventDataScheme {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger logger = LoggerFactory.getLogger(EventDataScheme.class);
+ @Override
+ public List<Object> deserialize(EventData eventData) {
+ final List<Object> fieldContents = new ArrayList<Object>();
+ String messageData = "";
+ if (eventData.getBytes()!=null) {
+ messageData = new String(eventData.getBytes());
+ }
+ /*Will only serialize AMQPValue type*/
+ else if (eventData.getObject()!=null) {
+ try {
+ if (!(eventData.getObject() instanceof List)) {
+ messageData = eventData.getObject().toString();
+ } else {
+ throw new RuntimeException("Cannot serialize the given AMQP type");
+ }
+ } catch (RuntimeException e) {
+ logger.error("Failed to serialize EventData payload class"
+ + eventData.getObject().getClass());
+ logger.error("Exception encountered while serializing EventData payload is"
+ + e.toString());
+ throw e;
+ }
+ }
+ Map<String, Object> metaDataMap = eventData.getProperties();
+ fieldContents.add(messageData);
+ fieldContents.add(metaDataMap);
+ return fieldContents;
+ }
+
+ @Override
+ public Fields getOutputFields() {
+ return new Fields(FieldConstants.Message, FieldConstants.META_DATA);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java
index 4c1c3e7..88855eb 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java
@@ -1,28 +1,28 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.spout;
-
-public class FieldConstants {
-
- public static final String PartitionKey = "partitionKey";
- public static final String Offset = "offset";
- public static final String Message = "message";
- public static final String META_DATA = "metadata";
- public static final String SYSTEM_META_DATA = "eventdata_system_properties";
- public static final String DefaultStartingOffset = "-1";
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+public class FieldConstants {
+
+ public static final String PartitionKey = "partitionKey";
+ public static final String Offset = "offset";
+ public static final String Message = "message";
+ public static final String META_DATA = "metadata";
+ public static final String SYSTEM_META_DATA = "eventdata_system_properties";
+ public static final String DefaultStartingOffset = "-1";
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java
index 6c78524..854da6f 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java
@@ -1,44 +1,44 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.spout;
-
-import com.microsoft.azure.eventhubs.EventData;
-import org.apache.storm.tuple.Fields;
-
-import java.io.Serializable;
-import java.util.List;
-
-public interface IEventDataScheme extends Serializable {
-
- /**
- * Deserialize an AMQP Message into a Tuple.
- *
- * @see #getOutputFields() for the list of fields the tuple will contain.
- *
- * @param eventData The EventData to Deserialize.
- * @return A tuple containing the deserialized fields of the message.
- */
- List<Object> deserialize(EventData eventData);
-
- /**
- * Retrieve the Fields that are present on tuples created by this object.
- *
- * @return The Fields that are present on tuples created by this object.
- */
- Fields getOutputFields();
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import com.microsoft.azure.eventhubs.EventData;
+import org.apache.storm.tuple.Fields;
+
+import java.io.Serializable;
+import java.util.List;
+
+public interface IEventDataScheme extends Serializable {
+
+ /**
+ * Deserialize an AMQP Message into a Tuple.
+ *
+ * @see #getOutputFields() for the list of fields the tuple will contain.
+ *
+ * @param eventData The EventData to Deserialize.
+ * @return A tuple containing the deserialized fields of the message.
+ */
+ List<Object> deserialize(EventData eventData);
+
+ /**
+ * Retrieve the Fields that are present on tuples created by this object.
+ *
+ * @return The Fields that are present on tuples created by this object.
+ */
+ Fields getOutputFields();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFactory.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFactory.java
index fbebdc8..a4901ae 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFactory.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFactory.java
@@ -1,30 +1,30 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-/**
- *
- */
-package org.apache.storm.eventhubs.spout;
-
-import java.io.Serializable;
-
-/**
- * An abstract factory to generate EventHubReceiver
- */
-public interface IEventHubReceiverFactory extends Serializable {
- IEventHubReceiver create(EventHubSpoutConfig config, String partitionId);
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+/**
+ *
+ */
+package org.apache.storm.eventhubs.spout;
+
+import java.io.Serializable;
+
+/**
+ * An abstract factory to generate EventHubReceiver
+ */
+public interface IEventHubReceiverFactory extends Serializable {
+ IEventHubReceiver create(EventHubSpoutConfig config, String partitionId);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionCoordinator.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionCoordinator.java
index a460681..e99f20a 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionCoordinator.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionCoordinator.java
@@ -1,27 +1,27 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.spout;
-
-import java.util.List;
-
-public interface IPartitionCoordinator {
-
- List<IPartitionManager> getMyPartitionManagers();
-
- IPartitionManager getPartitionManager(String partitionId);
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import java.util.List;
+
+public interface IPartitionCoordinator {
+
+ List<IPartitionManager> getMyPartitionManagers();
+
+ IPartitionManager getPartitionManager(String partitionId);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java
index 123d1c1..d391edd 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java
@@ -1,37 +1,37 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.spout;
-
-import java.util.Map;
-
-public interface IPartitionManager {
-
- void open() throws Exception;
-
- void close();
-
- EventDataWrap receive();
-
- void checkpoint();
-
- void ack(String offset);
-
- void fail(String offset);
-
- Map<String, Object> getMetricsData();
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import java.util.Map;
+
+public interface IPartitionManager {
+
+ void open() throws Exception;
+
+ void close();
+
+ EventDataWrap receive();
+
+ void checkpoint();
+
+ void ack(String offset);
+
+ void fail(String offset);
+
+ Map<String, Object> getMetricsData();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IStateStore.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IStateStore.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IStateStore.java
index f0ee2be..03c7ae8 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IStateStore.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IStateStore.java
@@ -1,31 +1,31 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.spout;
-
-import java.io.Serializable;
-
-public interface IStateStore extends Serializable {
-
- public void open();
-
- public void close();
-
- public void saveData(String path, String data);
-
- public String readData(String path);
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import java.io.Serializable;
+
+public interface IStateStore extends Serializable {
+
+ public void open();
+
+ public void close();
+
+ public void saveData(String path, String data);
+
+ public String readData(String path);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/MessageId.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/MessageId.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/MessageId.java
index 2247f1f..59d5c71 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/MessageId.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/MessageId.java
@@ -1,56 +1,56 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.spout;
-
-public class MessageId {
-
- private final String partitionId;
- private final String offset;
- private final long sequenceNumber;
-
- public MessageId(
- String partitionId,
- String offset,
- long sequenceNumber) {
- this.partitionId = partitionId;
- this.offset = offset;
- this.sequenceNumber = sequenceNumber;
- }
-
- public static MessageId create(String partitionId, String offset, long sequenceNumber) {
- return new MessageId(partitionId, offset, sequenceNumber);
- }
-
- public String getPartitionId() {
- return this.partitionId;
- }
-
- public String getOffset() {
- return this.offset;
- }
-
- public Long getSequenceNumber() {
- return this.sequenceNumber;
- }
-
- @Override
- public String toString() {
- return String.format("PartitionId: %s, Offset: %s, SequenceNumber: %s",
- this.partitionId, this.offset, this.sequenceNumber);
- }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+public class MessageId {
+
+ private final String partitionId;
+ private final String offset;
+ private final long sequenceNumber;
+
+ public MessageId(
+ String partitionId,
+ String offset,
+ long sequenceNumber) {
+ this.partitionId = partitionId;
+ this.offset = offset;
+ this.sequenceNumber = sequenceNumber;
+ }
+
+ public static MessageId create(String partitionId, String offset, long sequenceNumber) {
+ return new MessageId(partitionId, offset, sequenceNumber);
+ }
+
+ public String getPartitionId() {
+ return this.partitionId;
+ }
+
+ public String getOffset() {
+ return this.offset;
+ }
+
+ public Long getSequenceNumber() {
+ return this.sequenceNumber;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("PartitionId: %s, Offset: %s, SequenceNumber: %s",
+ this.partitionId, this.offset, this.sequenceNumber);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/ZookeeperStateStore.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/ZookeeperStateStore.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/ZookeeperStateStore.java
index 6af9df6..063aa4d 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/ZookeeperStateStore.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/ZookeeperStateStore.java
@@ -1,95 +1,95 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.spout;
-
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.RetryNTimes;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ZookeeperStateStore implements IStateStore {
- private static final long serialVersionUID = 1L;
- private static final Logger logger = LoggerFactory.getLogger(ZookeeperStateStore.class);
-
- private final String zookeeperConnectionString;
- private final CuratorFramework curatorFramework;
-
- public ZookeeperStateStore(String zookeeperConnectionString) {
- this(zookeeperConnectionString, 3, 100);
- }
-
- public ZookeeperStateStore(String connectionString, int retries, int retryInterval) {
- if (connectionString == null) {
- zookeeperConnectionString = "localhost:2181";
- } else {
- zookeeperConnectionString = connectionString;
- }
-
- RetryPolicy retryPolicy = new RetryNTimes(retries, retryInterval);
- curatorFramework = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
- }
-
- @Override
- public void open() {
- curatorFramework.start();
- }
-
- @Override
- public void close() {
- curatorFramework.close();
- }
-
- @Override
- public void saveData(String statePath, String data) {
- data = data == null ? "" : data;
- byte[] bytes = data.getBytes();
-
- try {
- if (curatorFramework.checkExists().forPath(statePath) == null) {
- curatorFramework.create().creatingParentsIfNeeded().forPath(statePath, bytes);
- } else {
- curatorFramework.setData().forPath(statePath, bytes);
- }
-
- logger.info(String.format("data was saved. path: %s, data: %s.", statePath, data));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public String readData(String statePath) {
- try {
- if (curatorFramework.checkExists().forPath(statePath) == null) {
- // do we want to throw an exception if path doesn't exist??
- return null;
- } else {
- byte[] bytes = curatorFramework.getData().forPath(statePath);
- String data = new String(bytes);
-
- logger.info(String.format("data was retrieved. path: %s, data: %s.", statePath, data));
-
- return data;
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZookeeperStateStore implements IStateStore {
+ private static final long serialVersionUID = 1L;
+ private static final Logger logger = LoggerFactory.getLogger(ZookeeperStateStore.class);
+
+ private final String zookeeperConnectionString;
+ private final CuratorFramework curatorFramework;
+
+ public ZookeeperStateStore(String zookeeperConnectionString) {
+ this(zookeeperConnectionString, 3, 100);
+ }
+
+ public ZookeeperStateStore(String connectionString, int retries, int retryInterval) {
+ if (connectionString == null) {
+ zookeeperConnectionString = "localhost:2181";
+ } else {
+ zookeeperConnectionString = connectionString;
+ }
+
+ RetryPolicy retryPolicy = new RetryNTimes(retries, retryInterval);
+ curatorFramework = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
+ }
+
+ @Override
+ public void open() {
+ curatorFramework.start();
+ }
+
+ @Override
+ public void close() {
+ curatorFramework.close();
+ }
+
+ @Override
+ public void saveData(String statePath, String data) {
+ data = data == null ? "" : data;
+ byte[] bytes = data.getBytes();
+
+ try {
+ if (curatorFramework.checkExists().forPath(statePath) == null) {
+ curatorFramework.create().creatingParentsIfNeeded().forPath(statePath, bytes);
+ } else {
+ curatorFramework.setData().forPath(statePath, bytes);
+ }
+
+ logger.info(String.format("data was saved. path: %s, data: %s.", statePath, data));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public String readData(String statePath) {
+ try {
+ if (curatorFramework.checkExists().forPath(statePath) == null) {
+ // do we want to throw an exception if path doesn't exist??
+ return null;
+ } else {
+ byte[] bytes = curatorFramework.getData().forPath(statePath);
+ String data = new String(bytes);
+
+ logger.info(String.format("data was retrieved. path: %s, data: %s.", statePath, data));
+
+ return data;
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Coordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Coordinator.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Coordinator.java
index ad3c75e..253b317 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Coordinator.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Coordinator.java
@@ -1,60 +1,60 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.trident;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
-import org.apache.storm.trident.spout.IPartitionedTridentSpout;
-
-import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
-
-public class Coordinator implements IPartitionedTridentSpout.Coordinator<Partitions>,
- IOpaquePartitionedTridentSpout.Coordinator<Partitions> {
- private static final Logger logger = LoggerFactory.getLogger(Coordinator.class);
- private final EventHubSpoutConfig spoutConfig;
- Partitions partitions;
-
- public Coordinator(EventHubSpoutConfig spoutConfig) {
- this.spoutConfig = spoutConfig;
- }
-
- @Override
- public void close() {
- }
-
- @Override
- public Partitions getPartitionsForBatch() {
- if(partitions != null) {
- return partitions;
- }
-
- partitions = new Partitions();
- for(int i=0; i<spoutConfig.getPartitionCount(); ++i) {
- partitions.addPartition(new Partition(spoutConfig, Integer.toString(i)));
- }
- logger.info("created partitions, size=" + spoutConfig.getPartitionCount());
- return partitions;
- }
-
- @Override
- public boolean isReady(long txid) {
- return true;
- }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
+import org.apache.storm.trident.spout.IPartitionedTridentSpout;
+
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+
+public class Coordinator implements IPartitionedTridentSpout.Coordinator<Partitions>,
+ IOpaquePartitionedTridentSpout.Coordinator<Partitions> {
+ private static final Logger logger = LoggerFactory.getLogger(Coordinator.class);
+ private final EventHubSpoutConfig spoutConfig;
+ Partitions partitions;
+
+ public Coordinator(EventHubSpoutConfig spoutConfig) {
+ this.spoutConfig = spoutConfig;
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public Partitions getPartitionsForBatch() {
+ if(partitions != null) {
+ return partitions;
+ }
+
+ partitions = new Partitions();
+ for(int i=0; i<spoutConfig.getPartitionCount(); ++i) {
+ partitions.addPartition(new Partition(spoutConfig, Integer.toString(i)));
+ }
+ logger.info("created partitions, size=" + spoutConfig.getPartitionCount());
+ return partitions;
+ }
+
+ @Override
+ public boolean isReady(long txid) {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java
index 069d819..d1e8b9e 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java
@@ -1,35 +1,35 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.trident;
-
-import org.apache.storm.eventhubs.spout.EventDataWrap;
-
-import java.util.List;
-
-public interface ITridentPartitionManager {
- boolean open(String offset);
- void close();
-
- /**
- * receive a batch of messages from EvenHub up to "count" messages
- * @param offset the starting offset
- * @param count max number of messages in this batch
- * @return list of EventData, if failed to receive, return empty list
- */
- public List<EventDataWrap> receiveBatch(String offset, int count);
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import org.apache.storm.eventhubs.spout.EventDataWrap;
+
+import java.util.List;
+
+public interface ITridentPartitionManager {
+ boolean open(String offset);
+ void close();
+
+ /**
+ * receive a batch of messages from EvenHub up to "count" messages
+ * @param offset the starting offset
+ * @param count max number of messages in this batch
+ * @return list of EventData, if failed to receive, return empty list
+ */
+ public List<EventDataWrap> receiveBatch(String offset, int count);
+}
[3/5] storm git commit: set '* text=auto' in .gitattributes in order
to avoid merge work because of line feed changes
Posted by sr...@apache.org.
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManagerFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManagerFactory.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManagerFactory.java
index 5804e28..701bd46 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManagerFactory.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManagerFactory.java
@@ -1,26 +1,26 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.trident;
-
-import java.io.Serializable;
-
-import org.apache.storm.eventhubs.spout.IEventHubReceiver;
-
-public interface ITridentPartitionManagerFactory extends Serializable {
- ITridentPartitionManager create(IEventHubReceiver receiver);
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import java.io.Serializable;
+
+import org.apache.storm.eventhubs.spout.IEventHubReceiver;
+
+public interface ITridentPartitionManagerFactory extends Serializable {
+ ITridentPartitionManager create(IEventHubReceiver receiver);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java
index 5b6b642..0da421c 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java
@@ -1,68 +1,68 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.trident;
-
-import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
-import org.apache.storm.eventhubs.spout.IEventHubReceiverFactory;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
-import org.apache.storm.trident.topology.TransactionAttempt;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * A thin wrapper of TransactionalTridentEventHubEmitter for OpaqueTridentEventHubSpout
- */
-public class OpaqueTridentEventHubEmitter implements IOpaquePartitionedTridentSpout.Emitter<Partitions, Partition, Map> {
- private final TransactionalTridentEventHubEmitter transactionalEmitter;
- public OpaqueTridentEventHubEmitter(EventHubSpoutConfig spoutConfig) {
- transactionalEmitter = new TransactionalTridentEventHubEmitter(spoutConfig);
- }
-
- public OpaqueTridentEventHubEmitter(EventHubSpoutConfig spoutConfig,
- int batchSize,
- ITridentPartitionManagerFactory pmFactory,
- IEventHubReceiverFactory recvFactory) {
- transactionalEmitter = new TransactionalTridentEventHubEmitter(spoutConfig,
- batchSize,
- pmFactory,
- recvFactory);
- }
-
- @Override
- public void close() {
- transactionalEmitter.close();
- }
-
- @Override
- public Map emitPartitionBatch(TransactionAttempt attempt, TridentCollector collector,
- Partition partition, Map meta) {
- return transactionalEmitter.emitPartitionBatchNew(attempt, collector, partition, meta);
- }
-
- @Override
- public List<Partition> getOrderedPartitions(Partitions partitions) {
- return transactionalEmitter.getOrderedPartitions(partitions);
- }
-
- @Override
- public void refreshPartitions(List<Partition> partitionList) {
- transactionalEmitter.refreshPartitions(partitionList);
- }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+import org.apache.storm.eventhubs.spout.IEventHubReceiverFactory;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
+import org.apache.storm.trident.topology.TransactionAttempt;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A thin wrapper of TransactionalTridentEventHubEmitter for OpaqueTridentEventHubSpout
+ */
+public class OpaqueTridentEventHubEmitter implements IOpaquePartitionedTridentSpout.Emitter<Partitions, Partition, Map> {
+ private final TransactionalTridentEventHubEmitter transactionalEmitter;
+ public OpaqueTridentEventHubEmitter(EventHubSpoutConfig spoutConfig) {
+ transactionalEmitter = new TransactionalTridentEventHubEmitter(spoutConfig);
+ }
+
+ public OpaqueTridentEventHubEmitter(EventHubSpoutConfig spoutConfig,
+ int batchSize,
+ ITridentPartitionManagerFactory pmFactory,
+ IEventHubReceiverFactory recvFactory) {
+ transactionalEmitter = new TransactionalTridentEventHubEmitter(spoutConfig,
+ batchSize,
+ pmFactory,
+ recvFactory);
+ }
+
+ @Override
+ public void close() {
+ transactionalEmitter.close();
+ }
+
+ @Override
+ public Map emitPartitionBatch(TransactionAttempt attempt, TridentCollector collector,
+ Partition partition, Map meta) {
+ return transactionalEmitter.emitPartitionBatchNew(attempt, collector, partition, meta);
+ }
+
+ @Override
+ public List<Partition> getOrderedPartitions(Partitions partitions) {
+ return transactionalEmitter.getOrderedPartitions(partitions);
+ }
+
+ @Override
+ public void refreshPartitions(List<Partition> partitionList) {
+ transactionalEmitter.refreshPartitions(partitionList);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubSpout.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubSpout.java
index f50ffb4..7123304 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubSpout.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubSpout.java
@@ -1,64 +1,64 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.trident;
-
-import java.util.Map;
-
-import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
-import org.apache.storm.eventhubs.spout.IEventDataScheme;
-
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
-
-/**
- * Opaque Trident EventHubs Spout
- */
-public class OpaqueTridentEventHubSpout implements IOpaquePartitionedTridentSpout<Partitions, Partition, Map> {
- private static final long serialVersionUID = 1L;
- private final IEventDataScheme scheme;
- private final EventHubSpoutConfig spoutConfig;
-
- public OpaqueTridentEventHubSpout(EventHubSpoutConfig config) {
- spoutConfig = config;
- scheme = spoutConfig.getEventDataScheme();
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
-
- @Override
- public IOpaquePartitionedTridentSpout.Coordinator<Partitions> getCoordinator(
- Map<String, Object> conf, TopologyContext context) {
- return new org.apache.storm.eventhubs.trident.Coordinator(spoutConfig);
- }
-
- @Override
- public IOpaquePartitionedTridentSpout.Emitter<Partitions, Partition, Map> getEmitter(
- Map<String, Object> conf, TopologyContext context) {
- return new OpaqueTridentEventHubEmitter(spoutConfig);
- }
-
- @Override
- public Fields getOutputFields() {
- return scheme.getOutputFields();
- }
-
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import java.util.Map;
+
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+import org.apache.storm.eventhubs.spout.IEventDataScheme;
+
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
+
+/**
+ * Opaque Trident EventHubs Spout
+ */
+public class OpaqueTridentEventHubSpout implements IOpaquePartitionedTridentSpout<Partitions, Partition, Map> {
+ private static final long serialVersionUID = 1L;
+ private final IEventDataScheme scheme;
+ private final EventHubSpoutConfig spoutConfig;
+
+ public OpaqueTridentEventHubSpout(EventHubSpoutConfig config) {
+ spoutConfig = config;
+ scheme = spoutConfig.getEventDataScheme();
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+ @Override
+ public IOpaquePartitionedTridentSpout.Coordinator<Partitions> getCoordinator(
+ Map<String, Object> conf, TopologyContext context) {
+ return new org.apache.storm.eventhubs.trident.Coordinator(spoutConfig);
+ }
+
+ @Override
+ public IOpaquePartitionedTridentSpout.Emitter<Partitions, Partition, Map> getEmitter(
+ Map<String, Object> conf, TopologyContext context) {
+ return new OpaqueTridentEventHubEmitter(spoutConfig);
+ }
+
+ @Override
+ public Fields getOutputFields() {
+ return scheme.getOutputFields();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partition.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partition.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partition.java
index b726e7f..8857eec 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partition.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partition.java
@@ -1,39 +1,39 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.trident;
-
-import java.io.Serializable;
-import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
-import org.apache.storm.trident.spout.ISpoutPartition;
-
-/**
- * Represents an EventHub partition
- */
-public class Partition implements ISpoutPartition, Serializable {
- private static final long serialVersionUID = 1L;
- String partitionId;
-
- public Partition(EventHubSpoutConfig config, String partitionId) {
- this.partitionId = partitionId;
- }
-
- @Override
- public String getId() {
- return partitionId;
- }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import java.io.Serializable;
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+import org.apache.storm.trident.spout.ISpoutPartition;
+
+/**
+ * Represents an EventHub partition
+ */
+public class Partition implements ISpoutPartition, Serializable {
+ private static final long serialVersionUID = 1L;
+ String partitionId;
+
+ public Partition(EventHubSpoutConfig config, String partitionId) {
+ this.partitionId = partitionId;
+ }
+
+ @Override
+ public String getId() {
+ return partitionId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partitions.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partitions.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partitions.java
index 235f5b6..c3317d9 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partitions.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partitions.java
@@ -1,41 +1,41 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.trident;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Represents all EventHub partitions a spout is receiving messages from.
- */
-public class Partitions implements Serializable {
- private static final long serialVersionUID = 1L;
- private List<Partition> partitionList;
- public Partitions() {
- partitionList = new ArrayList<Partition>();
- }
-
- public void addPartition(Partition partition) {
- partitionList.add(partition);
- }
-
- public List<Partition> getPartitions() {
- return partitionList;
- }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Represents all EventHub partitions a spout is receiving messages from.
+ */
+public class Partitions implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private List<Partition> partitionList;
+ public Partitions() {
+ partitionList = new ArrayList<Partition>();
+ }
+
+ public void addPartition(Partition partition) {
+ partitionList.add(partition);
+ }
+
+ public List<Partition> getPartitions() {
+ return partitionList;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubSpout.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubSpout.java
index 4d4de16..ee3242a 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubSpout.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubSpout.java
@@ -1,66 +1,66 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.trident;
-
-import java.util.Map;
-
-import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
-import org.apache.storm.eventhubs.spout.IEventDataScheme;
-
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.trident.spout.IPartitionedTridentSpout;
-import org.apache.storm.eventhubs.trident.Partition;
-
-/**
- * Transactional Trident EventHub Spout
- */
-public class TransactionalTridentEventHubSpout implements
- IPartitionedTridentSpout<Partitions, Partition, Map<String, Object>> {
- private static final long serialVersionUID = 1L;
- private final IEventDataScheme scheme;
- private final EventHubSpoutConfig spoutConfig;
-
- public TransactionalTridentEventHubSpout(EventHubSpoutConfig config) {
- spoutConfig = config;
- scheme = spoutConfig.getEventDataScheme();
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
-
- @Override
- public IPartitionedTridentSpout.Coordinator<Partitions> getCoordinator(
- Map<String, Object> conf, TopologyContext context) {
- return new org.apache.storm.eventhubs.trident.Coordinator(spoutConfig);
- }
-
- @Override
- public IPartitionedTridentSpout.Emitter<Partitions, Partition, Map<String, Object>> getEmitter(
- Map<String, Object> conf, TopologyContext context) {
- return new TransactionalTridentEventHubEmitter(spoutConfig);
- }
-
- @Override
- public Fields getOutputFields() {
- return scheme.getOutputFields();
- }
-
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import java.util.Map;
+
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+import org.apache.storm.eventhubs.spout.IEventDataScheme;
+
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.trident.spout.IPartitionedTridentSpout;
+import org.apache.storm.eventhubs.trident.Partition;
+
+/**
+ * Transactional Trident EventHub Spout
+ */
+public class TransactionalTridentEventHubSpout implements
+ IPartitionedTridentSpout<Partitions, Partition, Map<String, Object>> {
+ private static final long serialVersionUID = 1L;
+ private final IEventDataScheme scheme;
+ private final EventHubSpoutConfig spoutConfig;
+
+ public TransactionalTridentEventHubSpout(EventHubSpoutConfig config) {
+ spoutConfig = config;
+ scheme = spoutConfig.getEventDataScheme();
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+ @Override
+ public IPartitionedTridentSpout.Coordinator<Partitions> getCoordinator(
+ Map<String, Object> conf, TopologyContext context) {
+ return new org.apache.storm.eventhubs.trident.Coordinator(spoutConfig);
+ }
+
+ @Override
+ public IPartitionedTridentSpout.Emitter<Partitions, Partition, Map<String, Object>> getEmitter(
+ Map<String, Object> conf, TopologyContext context) {
+ return new TransactionalTridentEventHubEmitter(spoutConfig);
+ }
+
+ @Override
+ public Fields getOutputFields() {
+ return scheme.getOutputFields();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventHubLoop.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
index 665fef9..32dc0d5 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
@@ -1,52 +1,52 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.samples;
-
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.topology.TopologyBuilder;
-
-import org.apache.storm.eventhubs.bolt.EventHubBolt;
-import org.apache.storm.eventhubs.bolt.EventHubBoltConfig;
-import org.apache.storm.eventhubs.spout.EventHubSpout;
-
-/**
- * A sample topology that loops message back to EventHub
- */
-public class EventHubLoop extends EventCount {
-
- @Override
- protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
- TopologyBuilder topologyBuilder = new TopologyBuilder();
-
- topologyBuilder.setSpout("EventHubsSpout", eventHubSpout, spoutConfig.getPartitionCount())
- .setNumTasks(spoutConfig.getPartitionCount());
- EventHubBoltConfig boltConfig = new EventHubBoltConfig(spoutConfig.getConnectionString(),
- spoutConfig.getEntityPath(), true);
-
- EventHubBolt eventHubBolt = new EventHubBolt(boltConfig);
- int boltTasks = spoutConfig.getPartitionCount();
- topologyBuilder.setBolt("EventHubsBolt", eventHubBolt, boltTasks)
- .localOrShuffleGrouping("EventHubsSpout").setNumTasks(boltTasks);
- return topologyBuilder.createTopology();
- }
-
- public static void main(String[] args) throws Exception {
- EventHubLoop scenario = new EventHubLoop();
- scenario.runScenario(args);
- }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples;
+
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.TopologyBuilder;
+
+import org.apache.storm.eventhubs.bolt.EventHubBolt;
+import org.apache.storm.eventhubs.bolt.EventHubBoltConfig;
+import org.apache.storm.eventhubs.spout.EventHubSpout;
+
+/**
+ * A sample topology that loops message back to EventHub
+ */
+public class EventHubLoop extends EventCount {
+
+ @Override
+ protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
+ TopologyBuilder topologyBuilder = new TopologyBuilder();
+
+ topologyBuilder.setSpout("EventHubsSpout", eventHubSpout, spoutConfig.getPartitionCount())
+ .setNumTasks(spoutConfig.getPartitionCount());
+ EventHubBoltConfig boltConfig = new EventHubBoltConfig(spoutConfig.getConnectionString(),
+ spoutConfig.getEntityPath(), true);
+
+ EventHubBolt eventHubBolt = new EventHubBolt(boltConfig);
+ int boltTasks = spoutConfig.getPartitionCount();
+ topologyBuilder.setBolt("EventHubsBolt", eventHubBolt, boltTasks)
+ .localOrShuffleGrouping("EventHubsSpout").setNumTasks(boltTasks);
+ return topologyBuilder.createTopology();
+ }
+
+ public static void main(String[] args) throws Exception {
+ EventHubLoop scenario = new EventHubLoop();
+ scenario.runScenario(args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java
index e8538c1..a433b8b 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java
@@ -1,53 +1,53 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.samples;
-
-import org.apache.storm.trident.TridentState;
-import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.operation.builtin.Count;
-import org.apache.storm.trident.operation.builtin.Sum;
-import org.apache.storm.trident.testing.MemoryMapState;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.tuple.Fields;
-
-import org.apache.storm.eventhubs.samples.TransactionalTridentEventCount.LoggingFilter;
-import org.apache.storm.eventhubs.spout.EventHubSpout;
-import org.apache.storm.eventhubs.trident.OpaqueTridentEventHubSpout;
-
-/**
- * A simple Trident topology uses OpaqueTridentEventHubSpout
- */
-public class OpaqueTridentEventCount extends EventCount {
- @Override
- protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
- TridentTopology topology = new TridentTopology();
-
- OpaqueTridentEventHubSpout spout = new OpaqueTridentEventHubSpout(spoutConfig);
- TridentState state = topology.newStream("stream-" + spoutConfig.getTopologyName(), spout)
- .parallelismHint(spoutConfig.getPartitionCount())
- .aggregate(new Count(), new Fields("partial-count"))
- .persistentAggregate(new MemoryMapState.Factory(), new Fields("partial-count"), new Sum(), new Fields("count"));
- state.newValuesStream().each(new Fields("count"), new LoggingFilter("got count: ", 10000));
- return topology.build();
- }
-
- public static void main(String[] args) throws Exception {
- OpaqueTridentEventCount scenario = new OpaqueTridentEventCount();
- scenario.runScenario(args);
- }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples;
+
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.builtin.Count;
+import org.apache.storm.trident.operation.builtin.Sum;
+import org.apache.storm.trident.testing.MemoryMapState;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.tuple.Fields;
+
+import org.apache.storm.eventhubs.samples.TransactionalTridentEventCount.LoggingFilter;
+import org.apache.storm.eventhubs.spout.EventHubSpout;
+import org.apache.storm.eventhubs.trident.OpaqueTridentEventHubSpout;
+
+/**
+ * A simple Trident topology uses OpaqueTridentEventHubSpout
+ */
+public class OpaqueTridentEventCount extends EventCount {
+ @Override
+ protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
+ TridentTopology topology = new TridentTopology();
+
+ OpaqueTridentEventHubSpout spout = new OpaqueTridentEventHubSpout(spoutConfig);
+ TridentState state = topology.newStream("stream-" + spoutConfig.getTopologyName(), spout)
+ .parallelismHint(spoutConfig.getPartitionCount())
+ .aggregate(new Count(), new Fields("partial-count"))
+ .persistentAggregate(new MemoryMapState.Factory(), new Fields("partial-count"), new Sum(), new Fields("count"));
+ state.newValuesStream().each(new Fields("count"), new LoggingFilter("got count: ", 10000));
+ return topology.build();
+ }
+
+ public static void main(String[] args) throws Exception {
+ OpaqueTridentEventCount scenario = new OpaqueTridentEventCount();
+ scenario.runScenario(args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java
index 0a5295f..718c229 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java
@@ -1,81 +1,81 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.samples;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.tuple.Fields;
-
-import org.apache.storm.eventhubs.spout.EventHubSpout;
-import org.apache.storm.eventhubs.trident.TransactionalTridentEventHubSpout;
-
-import org.apache.storm.trident.TridentState;
-import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.operation.BaseFilter;
-import org.apache.storm.trident.operation.builtin.Count;
-import org.apache.storm.trident.operation.builtin.Sum;
-import org.apache.storm.trident.testing.MemoryMapState;
-import org.apache.storm.trident.tuple.TridentTuple;
-
-/**
- * A simple Trident topology uses TransactionalTridentEventHubSpout
- */
-public class TransactionalTridentEventCount extends EventCount {
- public static class LoggingFilter extends BaseFilter {
- private static final long serialVersionUID = 1L;
- private static final Logger logger = LoggerFactory.getLogger(LoggingFilter.class);
- private final String prefix;
- private final long logIntervalMs;
- private long lastTime;
- public LoggingFilter(String prefix, int logIntervalMs) {
- this.prefix = prefix;
- this.logIntervalMs = logIntervalMs;
- lastTime = System.nanoTime();
- }
-
- @Override
- public boolean isKeep(TridentTuple tuple) {
- long now = System.nanoTime();
- if(logIntervalMs < (now - lastTime) / 1000000) {
- logger.info(prefix + tuple.toString());
- lastTime = now;
- }
- return false;
- }
- }
-
- @Override
- protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
- TridentTopology topology = new TridentTopology();
-
- TransactionalTridentEventHubSpout spout = new TransactionalTridentEventHubSpout(spoutConfig);
- TridentState state = topology.newStream("stream-" + spoutConfig.getTopologyName(), spout)
- .parallelismHint(spoutConfig.getPartitionCount())
- .aggregate(new Count(), new Fields("partial-count"))
- .persistentAggregate(new MemoryMapState.Factory(), new Fields("partial-count"), new Sum(), new Fields("count"));
- state.newValuesStream().each(new Fields("count"), new LoggingFilter("got count: ", 10000));
- return topology.build();
- }
-
- public static void main(String[] args) throws Exception {
- TransactionalTridentEventCount scenario = new TransactionalTridentEventCount();
- scenario.runScenario(args);
- }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.tuple.Fields;
+
+import org.apache.storm.eventhubs.spout.EventHubSpout;
+import org.apache.storm.eventhubs.trident.TransactionalTridentEventHubSpout;
+
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.BaseFilter;
+import org.apache.storm.trident.operation.builtin.Count;
+import org.apache.storm.trident.operation.builtin.Sum;
+import org.apache.storm.trident.testing.MemoryMapState;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+/**
+ * A simple Trident topology uses TransactionalTridentEventHubSpout
+ */
+public class TransactionalTridentEventCount extends EventCount {
+ public static class LoggingFilter extends BaseFilter {
+ private static final long serialVersionUID = 1L;
+ private static final Logger logger = LoggerFactory.getLogger(LoggingFilter.class);
+ private final String prefix;
+ private final long logIntervalMs;
+ private long lastTime;
+ public LoggingFilter(String prefix, int logIntervalMs) {
+ this.prefix = prefix;
+ this.logIntervalMs = logIntervalMs;
+ lastTime = System.nanoTime();
+ }
+
+ @Override
+ public boolean isKeep(TridentTuple tuple) {
+ long now = System.nanoTime();
+ if(logIntervalMs < (now - lastTime) / 1000000) {
+ logger.info(prefix + tuple.toString());
+ lastTime = now;
+ }
+ return false;
+ }
+ }
+
+ @Override
+ protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
+ TridentTopology topology = new TridentTopology();
+
+ TransactionalTridentEventHubSpout spout = new TransactionalTridentEventHubSpout(spoutConfig);
+ TridentState state = topology.newStream("stream-" + spoutConfig.getTopologyName(), spout)
+ .parallelismHint(spoutConfig.getPartitionCount())
+ .aggregate(new Count(), new Fields("partial-count"))
+ .persistentAggregate(new MemoryMapState.Factory(), new Fields("partial-count"), new Sum(), new Fields("count"));
+ state.newValuesStream().each(new Fields("count"), new LoggingFilter("got count: ", 10000));
+ return topology.build();
+ }
+
+ public static void main(String[] args) throws Exception {
+ TransactionalTridentEventCount scenario = new TransactionalTridentEventCount();
+ scenario.runScenario(args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java
index 4bed2e3..9d94f5f 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java
@@ -1,88 +1,88 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.samples.bolt;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.storm.utils.TupleUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.storm.Config;
-import org.apache.storm.metric.api.IMetric;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseBasicBolt;
-import org.apache.storm.tuple.Tuple;
-
-/**
- * Globally count number of messages
- */
-public class GlobalCountBolt extends BaseBasicBolt {
- private static final long serialVersionUID = 1L;
- private static final Logger logger = LoggerFactory
- .getLogger(GlobalCountBolt.class);
- private long globalCount;
- private long globalCountDiff;
- private long lastMetricsTime;
- private long throughput;
-
- @Override
- public void prepare(Map<String, Object> config, TopologyContext context) {
- globalCount = 0;
- globalCountDiff = 0;
- lastMetricsTime = System.nanoTime();
- context.registerMetric("GlobalMessageCount", new IMetric() {
- @Override
- public Object getValueAndReset() {
- long now = System.nanoTime();
- long millis = (now - lastMetricsTime) / 1000000;
- throughput = globalCountDiff / millis * 1000;
- Map<String, Object> values = new HashMap<>();
- values.put("global_count", globalCount);
- values.put("throughput", throughput);
- lastMetricsTime = now;
- globalCountDiff = 0;
- return values;
- }
- }, (Integer)config.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS));
- }
-
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- if (TupleUtils.isTick(tuple)) {
- return;
- }
-
- int partial = (Integer)tuple.getValueByField("partial_count");
- globalCount += partial;
- globalCountDiff += partial;
- if((globalCountDiff == partial) && (globalCount != globalCountDiff)) {
- //metrics has just been collected, let's also log it
- logger.info("Current throughput (messages/second): " + throughput);
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
- }
-
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples.bolt;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.storm.utils.TupleUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.Config;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Tuple;
+
+/**
+ * Globally count number of messages
+ */
+public class GlobalCountBolt extends BaseBasicBolt {
+ private static final long serialVersionUID = 1L;
+ private static final Logger logger = LoggerFactory
+ .getLogger(GlobalCountBolt.class);
+ private long globalCount;
+ private long globalCountDiff;
+ private long lastMetricsTime;
+ private long throughput;
+
+ @Override
+ public void prepare(Map<String, Object> config, TopologyContext context) {
+ globalCount = 0;
+ globalCountDiff = 0;
+ lastMetricsTime = System.nanoTime();
+ context.registerMetric("GlobalMessageCount", new IMetric() {
+ @Override
+ public Object getValueAndReset() {
+ long now = System.nanoTime();
+ long millis = (now - lastMetricsTime) / 1000000;
+ throughput = globalCountDiff / millis * 1000;
+ Map<String, Object> values = new HashMap<>();
+ values.put("global_count", globalCount);
+ values.put("throughput", throughput);
+ lastMetricsTime = now;
+ globalCountDiff = 0;
+ return values;
+ }
+ }, (Integer)config.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS));
+ }
+
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ if (TupleUtils.isTick(tuple)) {
+ return;
+ }
+
+ int partial = (Integer)tuple.getValueByField("partial_count");
+ globalCount += partial;
+ globalCountDiff += partial;
+ if((globalCountDiff == partial) && (globalCount != globalCountDiff)) {
+ //metrics has just been collected, let's also log it
+ logger.info("Current throughput (messages/second): " + throughput);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java
index 544b6c8..3763c69 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java
@@ -1,68 +1,68 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.samples.bolt;
-
-import java.util.Map;
-
-import org.apache.storm.utils.TupleUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseBasicBolt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-
-/**
- * Partially count number of messages from EventHubs
- */
-public class PartialCountBolt extends BaseBasicBolt {
- private static final long serialVersionUID = 1L;
- private static final Logger logger = LoggerFactory
- .getLogger(PartialCountBolt.class);
- private static final int PartialCountBatchSize = 1000;
-
- private int partialCount;
-
- @Override
- public void prepare(Map<String, Object> topoConf, TopologyContext context) {
- partialCount = 0;
- }
-
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- if (TupleUtils.isTick(tuple)) {
- return;
- }
-
- partialCount++;
- if(partialCount == PartialCountBatchSize) {
- collector.emit(new Values(PartialCountBatchSize));
- partialCount = 0;
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("partial_count"));
- }
-
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples.bolt;
+
+import java.util.Map;
+
+import org.apache.storm.utils.TupleUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+/**
+ * Partially count number of messages from EventHubs
+ */
+public class PartialCountBolt extends BaseBasicBolt {
+ private static final long serialVersionUID = 1L;
+ private static final Logger logger = LoggerFactory
+ .getLogger(PartialCountBolt.class);
+ private static final int PartialCountBatchSize = 1000;
+
+ private int partialCount;
+
+ @Override
+ public void prepare(Map<String, Object> topoConf, TopologyContext context) {
+ partialCount = 0;
+ }
+
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ if (TupleUtils.isTick(tuple)) {
+ return;
+ }
+
+ partialCount++;
+ if(partialCount == PartialCountBatchSize) {
+ collector.emit(new Values(PartialCountBatchSize));
+ partialCount = 0;
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("partial_count"));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
index ac724e8..621d6a8 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
@@ -1,71 +1,71 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.spout;
-
-import java.util.List;
-
-import org.apache.storm.spout.ISpoutOutputCollector;
-
-/**
- * Mock of ISpoutOutputCollector
- */
-public class SpoutOutputCollectorMock implements ISpoutOutputCollector {
- //comma separated offsets
- StringBuilder emittedOffset;
-
- public SpoutOutputCollectorMock() {
- emittedOffset = new StringBuilder();
- }
-
- public String getOffsetSequenceAndReset() {
- String ret = null;
- if(emittedOffset.length() > 0) {
- emittedOffset.setLength(emittedOffset.length()-1);
- ret = emittedOffset.toString();
- emittedOffset.setLength(0);
- }
- return ret;
- }
-
- @Override
- public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
- MessageId mid = (MessageId)messageId;
- String pid = mid.getPartitionId();
- String offset = mid.getOffset();
- emittedOffset.append(pid+"_"+offset+",");
- return null;
- }
-
- @Override
- public void emitDirect(int arg0, String arg1, List<Object> arg2, Object arg3) {
- }
-
- @Override
- public void flush() {
- // NO-OP
- }
-
- @Override
- public void reportError(Throwable arg0) {
- }
-
- @Override
- public long getPendingCount() {
- return 0;
- }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import java.util.List;
+
+import org.apache.storm.spout.ISpoutOutputCollector;
+
+/**
+ * Mock of ISpoutOutputCollector
+ */
+public class SpoutOutputCollectorMock implements ISpoutOutputCollector {
+ //comma separated offsets
+ StringBuilder emittedOffset;
+
+ public SpoutOutputCollectorMock() {
+ emittedOffset = new StringBuilder();
+ }
+
+ public String getOffsetSequenceAndReset() {
+ String ret = null;
+ if(emittedOffset.length() > 0) {
+ emittedOffset.setLength(emittedOffset.length()-1);
+ ret = emittedOffset.toString();
+ emittedOffset.setLength(0);
+ }
+ return ret;
+ }
+
+ @Override
+ public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
+ MessageId mid = (MessageId)messageId;
+ String pid = mid.getPartitionId();
+ String offset = mid.getOffset();
+ emittedOffset.append(pid+"_"+offset+",");
+ return null;
+ }
+
+ @Override
+ public void emitDirect(int arg0, String arg1, List<Object> arg2, Object arg3) {
+ }
+
+ @Override
+ public void flush() {
+ // NO-OP
+ }
+
+ @Override
+ public void reportError(Throwable arg0) {
+ }
+
+ @Override
+ public long getPendingCount() {
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/StateStoreMock.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/StateStoreMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/StateStoreMock.java
index cd6e13e..0abe3a4 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/StateStoreMock.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/StateStoreMock.java
@@ -1,54 +1,54 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.spout;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.storm.eventhubs.spout.IStateStore;
-
-/**
- * A state store mocker
- */
-public class StateStoreMock implements IStateStore {
- Map<String, String> myDataMap;
- @Override
- public void open() {
- myDataMap = new HashMap<String, String>();
- }
-
- @Override
- public void close() {
- myDataMap = null;
- }
-
- @Override
- public void saveData(String path, String data) {
- if(myDataMap != null) {
- myDataMap.put(path, data);
- }
- }
-
- @Override
- public String readData(String path) {
- if(myDataMap != null) {
- return myDataMap.get(path);
- }
- return null;
- }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.storm.eventhubs.spout.IStateStore;
+
+/**
+ * A state store mocker
+ */
+public class StateStoreMock implements IStateStore {
+ Map<String, String> myDataMap;
+ @Override
+ public void open() {
+ myDataMap = new HashMap<String, String>();
+ }
+
+ @Override
+ public void close() {
+ myDataMap = null;
+ }
+
+ @Override
+ public void saveData(String path, String data) {
+ if(myDataMap != null) {
+ myDataMap.put(path, data);
+ }
+ }
+
+ @Override
+ public String readData(String path) {
+ if(myDataMap != null) {
+ return myDataMap.get(path);
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java
index f260dea..926337b 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java
@@ -1,47 +1,47 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.spout;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertTrue;
-
-public class TestEventData {
-
- @Before
- public void setUp() throws Exception {
- }
-
- @After
- public void tearDown() throws Exception {
- }
-
- @Test
- public void testEventDataComparision() {
-
- MessageId messageId1 = MessageId.create(null, "3", 1);
- EventDataWrap eventData1 = EventDataWrap.create(null, messageId1);
-
- MessageId messageId2 = MessageId.create(null, "13", 2);
- EventDataWrap eventData2 = EventDataWrap.create(null, messageId2);
-
- assertTrue(eventData2.compareTo(eventData1) > 0);
- }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestEventData {
+
+ @Before
+ public void setUp() throws Exception {
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testEventDataComparision() {
+
+ MessageId messageId1 = MessageId.create(null, "3", 1);
+ EventDataWrap eventData1 = EventDataWrap.create(null, messageId1);
+
+ MessageId messageId2 = MessageId.create(null, "13", 2);
+ EventDataWrap eventData2 = EventDataWrap.create(null, messageId2);
+
+ assertTrue(eventData2.compareTo(eventData1) > 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java
index c42f769..bd5b07f 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java
@@ -1,57 +1,57 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.trident;
-
-import java.util.List;
-
-import org.apache.storm.trident.operation.TridentCollector;
-
-/**
- * A mock of TridentCollector
- */
-public class TridentCollectorMock implements TridentCollector {
- StringBuilder buffer;
-
- public TridentCollectorMock() {
- buffer = new StringBuilder();
- }
-
- @Override
- public void emit(List<Object> tuples) {
- for(Object o: tuples) {
- buffer.append(o.toString());
- }
- }
-
- @Override
- public void flush() {
- // NO-OP
- }
-
- @Override
- public void reportError(Throwable arg0) {
- }
-
- public void clear() {
- buffer.setLength(0);
- }
-
- public String getBuffer() {
- return buffer.toString();
- }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import java.util.List;
+
+import org.apache.storm.trident.operation.TridentCollector;
+
+/**
+ * A mock of TridentCollector
+ */
+public class TridentCollectorMock implements TridentCollector {
+ StringBuilder buffer;
+
+ public TridentCollectorMock() {
+ buffer = new StringBuilder();
+ }
+
+ @Override
+ public void emit(List<Object> tuples) {
+ for(Object o: tuples) {
+ buffer.append(o.toString());
+ }
+ }
+
+ @Override
+ public void flush() {
+ // NO-OP
+ }
+
+ @Override
+ public void reportError(Throwable arg0) {
+ }
+
+ public void clear() {
+ buffer.setLength(0);
+ }
+
+ public String getBuffer() {
+ return buffer.toString();
+ }
+}