You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2018/03/23 17:10:06 UTC
[3/5] storm git commit: set '* text=auto' in .gitattributes in order
to avoid merge work because of line feed changes
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManagerFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManagerFactory.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManagerFactory.java
index 5804e28..701bd46 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManagerFactory.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManagerFactory.java
@@ -1,26 +1,26 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.trident;
-
-import java.io.Serializable;
-
-import org.apache.storm.eventhubs.spout.IEventHubReceiver;
-
-public interface ITridentPartitionManagerFactory extends Serializable {
- ITridentPartitionManager create(IEventHubReceiver receiver);
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import java.io.Serializable;
+
+import org.apache.storm.eventhubs.spout.IEventHubReceiver;
+
+public interface ITridentPartitionManagerFactory extends Serializable {
+ ITridentPartitionManager create(IEventHubReceiver receiver);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java
index 5b6b642..0da421c 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java
@@ -1,68 +1,68 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.trident;
-
-import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
-import org.apache.storm.eventhubs.spout.IEventHubReceiverFactory;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
-import org.apache.storm.trident.topology.TransactionAttempt;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * A thin wrapper of TransactionalTridentEventHubEmitter for OpaqueTridentEventHubSpout
- */
-public class OpaqueTridentEventHubEmitter implements IOpaquePartitionedTridentSpout.Emitter<Partitions, Partition, Map> {
- private final TransactionalTridentEventHubEmitter transactionalEmitter;
- public OpaqueTridentEventHubEmitter(EventHubSpoutConfig spoutConfig) {
- transactionalEmitter = new TransactionalTridentEventHubEmitter(spoutConfig);
- }
-
- public OpaqueTridentEventHubEmitter(EventHubSpoutConfig spoutConfig,
- int batchSize,
- ITridentPartitionManagerFactory pmFactory,
- IEventHubReceiverFactory recvFactory) {
- transactionalEmitter = new TransactionalTridentEventHubEmitter(spoutConfig,
- batchSize,
- pmFactory,
- recvFactory);
- }
-
- @Override
- public void close() {
- transactionalEmitter.close();
- }
-
- @Override
- public Map emitPartitionBatch(TransactionAttempt attempt, TridentCollector collector,
- Partition partition, Map meta) {
- return transactionalEmitter.emitPartitionBatchNew(attempt, collector, partition, meta);
- }
-
- @Override
- public List<Partition> getOrderedPartitions(Partitions partitions) {
- return transactionalEmitter.getOrderedPartitions(partitions);
- }
-
- @Override
- public void refreshPartitions(List<Partition> partitionList) {
- transactionalEmitter.refreshPartitions(partitionList);
- }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+import org.apache.storm.eventhubs.spout.IEventHubReceiverFactory;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
+import org.apache.storm.trident.topology.TransactionAttempt;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A thin wrapper of TransactionalTridentEventHubEmitter for OpaqueTridentEventHubSpout
+ */
+public class OpaqueTridentEventHubEmitter implements IOpaquePartitionedTridentSpout.Emitter<Partitions, Partition, Map> {
+ private final TransactionalTridentEventHubEmitter transactionalEmitter;
+ public OpaqueTridentEventHubEmitter(EventHubSpoutConfig spoutConfig) {
+ transactionalEmitter = new TransactionalTridentEventHubEmitter(spoutConfig);
+ }
+
+ public OpaqueTridentEventHubEmitter(EventHubSpoutConfig spoutConfig,
+ int batchSize,
+ ITridentPartitionManagerFactory pmFactory,
+ IEventHubReceiverFactory recvFactory) {
+ transactionalEmitter = new TransactionalTridentEventHubEmitter(spoutConfig,
+ batchSize,
+ pmFactory,
+ recvFactory);
+ }
+
+ @Override
+ public void close() {
+ transactionalEmitter.close();
+ }
+
+ @Override
+ public Map emitPartitionBatch(TransactionAttempt attempt, TridentCollector collector,
+ Partition partition, Map meta) {
+ return transactionalEmitter.emitPartitionBatchNew(attempt, collector, partition, meta);
+ }
+
+ @Override
+ public List<Partition> getOrderedPartitions(Partitions partitions) {
+ return transactionalEmitter.getOrderedPartitions(partitions);
+ }
+
+ @Override
+ public void refreshPartitions(List<Partition> partitionList) {
+ transactionalEmitter.refreshPartitions(partitionList);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubSpout.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubSpout.java
index f50ffb4..7123304 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubSpout.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubSpout.java
@@ -1,64 +1,64 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.trident;
-
-import java.util.Map;
-
-import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
-import org.apache.storm.eventhubs.spout.IEventDataScheme;
-
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
-
-/**
- * Opaque Trident EventHubs Spout
- */
-public class OpaqueTridentEventHubSpout implements IOpaquePartitionedTridentSpout<Partitions, Partition, Map> {
- private static final long serialVersionUID = 1L;
- private final IEventDataScheme scheme;
- private final EventHubSpoutConfig spoutConfig;
-
- public OpaqueTridentEventHubSpout(EventHubSpoutConfig config) {
- spoutConfig = config;
- scheme = spoutConfig.getEventDataScheme();
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
-
- @Override
- public IOpaquePartitionedTridentSpout.Coordinator<Partitions> getCoordinator(
- Map<String, Object> conf, TopologyContext context) {
- return new org.apache.storm.eventhubs.trident.Coordinator(spoutConfig);
- }
-
- @Override
- public IOpaquePartitionedTridentSpout.Emitter<Partitions, Partition, Map> getEmitter(
- Map<String, Object> conf, TopologyContext context) {
- return new OpaqueTridentEventHubEmitter(spoutConfig);
- }
-
- @Override
- public Fields getOutputFields() {
- return scheme.getOutputFields();
- }
-
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import java.util.Map;
+
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+import org.apache.storm.eventhubs.spout.IEventDataScheme;
+
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
+
+/**
+ * Opaque Trident EventHubs Spout
+ */
+public class OpaqueTridentEventHubSpout implements IOpaquePartitionedTridentSpout<Partitions, Partition, Map> {
+ private static final long serialVersionUID = 1L;
+ private final IEventDataScheme scheme;
+ private final EventHubSpoutConfig spoutConfig;
+
+ public OpaqueTridentEventHubSpout(EventHubSpoutConfig config) {
+ spoutConfig = config;
+ scheme = spoutConfig.getEventDataScheme();
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+ @Override
+ public IOpaquePartitionedTridentSpout.Coordinator<Partitions> getCoordinator(
+ Map<String, Object> conf, TopologyContext context) {
+ return new org.apache.storm.eventhubs.trident.Coordinator(spoutConfig);
+ }
+
+ @Override
+ public IOpaquePartitionedTridentSpout.Emitter<Partitions, Partition, Map> getEmitter(
+ Map<String, Object> conf, TopologyContext context) {
+ return new OpaqueTridentEventHubEmitter(spoutConfig);
+ }
+
+ @Override
+ public Fields getOutputFields() {
+ return scheme.getOutputFields();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partition.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partition.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partition.java
index b726e7f..8857eec 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partition.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partition.java
@@ -1,39 +1,39 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.trident;
-
-import java.io.Serializable;
-import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
-import org.apache.storm.trident.spout.ISpoutPartition;
-
-/**
- * Represents an EventHub partition
- */
-public class Partition implements ISpoutPartition, Serializable {
- private static final long serialVersionUID = 1L;
- String partitionId;
-
- public Partition(EventHubSpoutConfig config, String partitionId) {
- this.partitionId = partitionId;
- }
-
- @Override
- public String getId() {
- return partitionId;
- }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import java.io.Serializable;
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+import org.apache.storm.trident.spout.ISpoutPartition;
+
+/**
+ * Represents an EventHub partition
+ */
+public class Partition implements ISpoutPartition, Serializable {
+ private static final long serialVersionUID = 1L;
+ String partitionId;
+
+ public Partition(EventHubSpoutConfig config, String partitionId) {
+ this.partitionId = partitionId;
+ }
+
+ @Override
+ public String getId() {
+ return partitionId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partitions.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partitions.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partitions.java
index 235f5b6..c3317d9 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partitions.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partitions.java
@@ -1,41 +1,41 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.trident;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Represents all EventHub partitions a spout is receiving messages from.
- */
-public class Partitions implements Serializable {
- private static final long serialVersionUID = 1L;
- private List<Partition> partitionList;
- public Partitions() {
- partitionList = new ArrayList<Partition>();
- }
-
- public void addPartition(Partition partition) {
- partitionList.add(partition);
- }
-
- public List<Partition> getPartitions() {
- return partitionList;
- }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Represents all EventHub partitions a spout is receiving messages from.
+ */
+public class Partitions implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private List<Partition> partitionList;
+ public Partitions() {
+ partitionList = new ArrayList<Partition>();
+ }
+
+ public void addPartition(Partition partition) {
+ partitionList.add(partition);
+ }
+
+ public List<Partition> getPartitions() {
+ return partitionList;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubSpout.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubSpout.java
index 4d4de16..ee3242a 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubSpout.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubSpout.java
@@ -1,66 +1,66 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.trident;
-
-import java.util.Map;
-
-import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
-import org.apache.storm.eventhubs.spout.IEventDataScheme;
-
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.trident.spout.IPartitionedTridentSpout;
-import org.apache.storm.eventhubs.trident.Partition;
-
-/**
- * Transactional Trident EventHub Spout
- */
-public class TransactionalTridentEventHubSpout implements
- IPartitionedTridentSpout<Partitions, Partition, Map<String, Object>> {
- private static final long serialVersionUID = 1L;
- private final IEventDataScheme scheme;
- private final EventHubSpoutConfig spoutConfig;
-
- public TransactionalTridentEventHubSpout(EventHubSpoutConfig config) {
- spoutConfig = config;
- scheme = spoutConfig.getEventDataScheme();
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
-
- @Override
- public IPartitionedTridentSpout.Coordinator<Partitions> getCoordinator(
- Map<String, Object> conf, TopologyContext context) {
- return new org.apache.storm.eventhubs.trident.Coordinator(spoutConfig);
- }
-
- @Override
- public IPartitionedTridentSpout.Emitter<Partitions, Partition, Map<String, Object>> getEmitter(
- Map<String, Object> conf, TopologyContext context) {
- return new TransactionalTridentEventHubEmitter(spoutConfig);
- }
-
- @Override
- public Fields getOutputFields() {
- return scheme.getOutputFields();
- }
-
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import java.util.Map;
+
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+import org.apache.storm.eventhubs.spout.IEventDataScheme;
+
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.trident.spout.IPartitionedTridentSpout;
+import org.apache.storm.eventhubs.trident.Partition;
+
+/**
+ * Transactional Trident EventHub Spout
+ */
+public class TransactionalTridentEventHubSpout implements
+ IPartitionedTridentSpout<Partitions, Partition, Map<String, Object>> {
+ private static final long serialVersionUID = 1L;
+ private final IEventDataScheme scheme;
+ private final EventHubSpoutConfig spoutConfig;
+
+ public TransactionalTridentEventHubSpout(EventHubSpoutConfig config) {
+ spoutConfig = config;
+ scheme = spoutConfig.getEventDataScheme();
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+ @Override
+ public IPartitionedTridentSpout.Coordinator<Partitions> getCoordinator(
+ Map<String, Object> conf, TopologyContext context) {
+ return new org.apache.storm.eventhubs.trident.Coordinator(spoutConfig);
+ }
+
+ @Override
+ public IPartitionedTridentSpout.Emitter<Partitions, Partition, Map<String, Object>> getEmitter(
+ Map<String, Object> conf, TopologyContext context) {
+ return new TransactionalTridentEventHubEmitter(spoutConfig);
+ }
+
+ @Override
+ public Fields getOutputFields() {
+ return scheme.getOutputFields();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventHubLoop.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
index 665fef9..32dc0d5 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
@@ -1,52 +1,52 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.samples;
-
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.topology.TopologyBuilder;
-
-import org.apache.storm.eventhubs.bolt.EventHubBolt;
-import org.apache.storm.eventhubs.bolt.EventHubBoltConfig;
-import org.apache.storm.eventhubs.spout.EventHubSpout;
-
-/**
- * A sample topology that loops message back to EventHub
- */
-public class EventHubLoop extends EventCount {
-
- @Override
- protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
- TopologyBuilder topologyBuilder = new TopologyBuilder();
-
- topologyBuilder.setSpout("EventHubsSpout", eventHubSpout, spoutConfig.getPartitionCount())
- .setNumTasks(spoutConfig.getPartitionCount());
- EventHubBoltConfig boltConfig = new EventHubBoltConfig(spoutConfig.getConnectionString(),
- spoutConfig.getEntityPath(), true);
-
- EventHubBolt eventHubBolt = new EventHubBolt(boltConfig);
- int boltTasks = spoutConfig.getPartitionCount();
- topologyBuilder.setBolt("EventHubsBolt", eventHubBolt, boltTasks)
- .localOrShuffleGrouping("EventHubsSpout").setNumTasks(boltTasks);
- return topologyBuilder.createTopology();
- }
-
- public static void main(String[] args) throws Exception {
- EventHubLoop scenario = new EventHubLoop();
- scenario.runScenario(args);
- }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples;
+
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.TopologyBuilder;
+
+import org.apache.storm.eventhubs.bolt.EventHubBolt;
+import org.apache.storm.eventhubs.bolt.EventHubBoltConfig;
+import org.apache.storm.eventhubs.spout.EventHubSpout;
+
+/**
+ * A sample topology that loops message back to EventHub
+ */
+public class EventHubLoop extends EventCount {
+
+ @Override
+ protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
+ TopologyBuilder topologyBuilder = new TopologyBuilder();
+
+ topologyBuilder.setSpout("EventHubsSpout", eventHubSpout, spoutConfig.getPartitionCount())
+ .setNumTasks(spoutConfig.getPartitionCount());
+ EventHubBoltConfig boltConfig = new EventHubBoltConfig(spoutConfig.getConnectionString(),
+ spoutConfig.getEntityPath(), true);
+
+ EventHubBolt eventHubBolt = new EventHubBolt(boltConfig);
+ int boltTasks = spoutConfig.getPartitionCount();
+ topologyBuilder.setBolt("EventHubsBolt", eventHubBolt, boltTasks)
+ .localOrShuffleGrouping("EventHubsSpout").setNumTasks(boltTasks);
+ return topologyBuilder.createTopology();
+ }
+
+ public static void main(String[] args) throws Exception {
+ EventHubLoop scenario = new EventHubLoop();
+ scenario.runScenario(args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java
index e8538c1..a433b8b 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java
@@ -1,53 +1,53 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.samples;
-
-import org.apache.storm.trident.TridentState;
-import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.operation.builtin.Count;
-import org.apache.storm.trident.operation.builtin.Sum;
-import org.apache.storm.trident.testing.MemoryMapState;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.tuple.Fields;
-
-import org.apache.storm.eventhubs.samples.TransactionalTridentEventCount.LoggingFilter;
-import org.apache.storm.eventhubs.spout.EventHubSpout;
-import org.apache.storm.eventhubs.trident.OpaqueTridentEventHubSpout;
-
-/**
- * A simple Trident topology uses OpaqueTridentEventHubSpout
- */
-public class OpaqueTridentEventCount extends EventCount {
- @Override
- protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
- TridentTopology topology = new TridentTopology();
-
- OpaqueTridentEventHubSpout spout = new OpaqueTridentEventHubSpout(spoutConfig);
- TridentState state = topology.newStream("stream-" + spoutConfig.getTopologyName(), spout)
- .parallelismHint(spoutConfig.getPartitionCount())
- .aggregate(new Count(), new Fields("partial-count"))
- .persistentAggregate(new MemoryMapState.Factory(), new Fields("partial-count"), new Sum(), new Fields("count"));
- state.newValuesStream().each(new Fields("count"), new LoggingFilter("got count: ", 10000));
- return topology.build();
- }
-
- public static void main(String[] args) throws Exception {
- OpaqueTridentEventCount scenario = new OpaqueTridentEventCount();
- scenario.runScenario(args);
- }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples;
+
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.builtin.Count;
+import org.apache.storm.trident.operation.builtin.Sum;
+import org.apache.storm.trident.testing.MemoryMapState;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.tuple.Fields;
+
+import org.apache.storm.eventhubs.samples.TransactionalTridentEventCount.LoggingFilter;
+import org.apache.storm.eventhubs.spout.EventHubSpout;
+import org.apache.storm.eventhubs.trident.OpaqueTridentEventHubSpout;
+
+/**
+ * A simple Trident topology uses OpaqueTridentEventHubSpout
+ */
+public class OpaqueTridentEventCount extends EventCount {
+ @Override
+ protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
+ TridentTopology topology = new TridentTopology();
+
+ OpaqueTridentEventHubSpout spout = new OpaqueTridentEventHubSpout(spoutConfig);
+ TridentState state = topology.newStream("stream-" + spoutConfig.getTopologyName(), spout)
+ .parallelismHint(spoutConfig.getPartitionCount())
+ .aggregate(new Count(), new Fields("partial-count"))
+ .persistentAggregate(new MemoryMapState.Factory(), new Fields("partial-count"), new Sum(), new Fields("count"));
+ state.newValuesStream().each(new Fields("count"), new LoggingFilter("got count: ", 10000));
+ return topology.build();
+ }
+
+ public static void main(String[] args) throws Exception {
+ OpaqueTridentEventCount scenario = new OpaqueTridentEventCount();
+ scenario.runScenario(args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java
index 0a5295f..718c229 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java
@@ -1,81 +1,81 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.samples;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.tuple.Fields;
-
-import org.apache.storm.eventhubs.spout.EventHubSpout;
-import org.apache.storm.eventhubs.trident.TransactionalTridentEventHubSpout;
-
-import org.apache.storm.trident.TridentState;
-import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.operation.BaseFilter;
-import org.apache.storm.trident.operation.builtin.Count;
-import org.apache.storm.trident.operation.builtin.Sum;
-import org.apache.storm.trident.testing.MemoryMapState;
-import org.apache.storm.trident.tuple.TridentTuple;
-
-/**
- * A simple Trident topology uses TransactionalTridentEventHubSpout
- */
-public class TransactionalTridentEventCount extends EventCount {
- public static class LoggingFilter extends BaseFilter {
- private static final long serialVersionUID = 1L;
- private static final Logger logger = LoggerFactory.getLogger(LoggingFilter.class);
- private final String prefix;
- private final long logIntervalMs;
- private long lastTime;
- public LoggingFilter(String prefix, int logIntervalMs) {
- this.prefix = prefix;
- this.logIntervalMs = logIntervalMs;
- lastTime = System.nanoTime();
- }
-
- @Override
- public boolean isKeep(TridentTuple tuple) {
- long now = System.nanoTime();
- if(logIntervalMs < (now - lastTime) / 1000000) {
- logger.info(prefix + tuple.toString());
- lastTime = now;
- }
- return false;
- }
- }
-
- @Override
- protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
- TridentTopology topology = new TridentTopology();
-
- TransactionalTridentEventHubSpout spout = new TransactionalTridentEventHubSpout(spoutConfig);
- TridentState state = topology.newStream("stream-" + spoutConfig.getTopologyName(), spout)
- .parallelismHint(spoutConfig.getPartitionCount())
- .aggregate(new Count(), new Fields("partial-count"))
- .persistentAggregate(new MemoryMapState.Factory(), new Fields("partial-count"), new Sum(), new Fields("count"));
- state.newValuesStream().each(new Fields("count"), new LoggingFilter("got count: ", 10000));
- return topology.build();
- }
-
- public static void main(String[] args) throws Exception {
- TransactionalTridentEventCount scenario = new TransactionalTridentEventCount();
- scenario.runScenario(args);
- }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.tuple.Fields;
+
+import org.apache.storm.eventhubs.spout.EventHubSpout;
+import org.apache.storm.eventhubs.trident.TransactionalTridentEventHubSpout;
+
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.BaseFilter;
+import org.apache.storm.trident.operation.builtin.Count;
+import org.apache.storm.trident.operation.builtin.Sum;
+import org.apache.storm.trident.testing.MemoryMapState;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+/**
+ * A simple Trident topology uses TransactionalTridentEventHubSpout
+ */
+public class TransactionalTridentEventCount extends EventCount {
+ public static class LoggingFilter extends BaseFilter {
+ private static final long serialVersionUID = 1L;
+ private static final Logger logger = LoggerFactory.getLogger(LoggingFilter.class);
+ private final String prefix;
+ private final long logIntervalMs;
+ private long lastTime;
+ public LoggingFilter(String prefix, int logIntervalMs) {
+ this.prefix = prefix;
+ this.logIntervalMs = logIntervalMs;
+ lastTime = System.nanoTime();
+ }
+
+ @Override
+ public boolean isKeep(TridentTuple tuple) {
+ long now = System.nanoTime();
+ if(logIntervalMs < (now - lastTime) / 1000000) {
+ logger.info(prefix + tuple.toString());
+ lastTime = now;
+ }
+ return false;
+ }
+ }
+
+ @Override
+ protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
+ TridentTopology topology = new TridentTopology();
+
+ TransactionalTridentEventHubSpout spout = new TransactionalTridentEventHubSpout(spoutConfig);
+ TridentState state = topology.newStream("stream-" + spoutConfig.getTopologyName(), spout)
+ .parallelismHint(spoutConfig.getPartitionCount())
+ .aggregate(new Count(), new Fields("partial-count"))
+ .persistentAggregate(new MemoryMapState.Factory(), new Fields("partial-count"), new Sum(), new Fields("count"));
+ state.newValuesStream().each(new Fields("count"), new LoggingFilter("got count: ", 10000));
+ return topology.build();
+ }
+
+ public static void main(String[] args) throws Exception {
+ TransactionalTridentEventCount scenario = new TransactionalTridentEventCount();
+ scenario.runScenario(args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java
index 4bed2e3..9d94f5f 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java
@@ -1,88 +1,88 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.samples.bolt;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.storm.utils.TupleUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.storm.Config;
-import org.apache.storm.metric.api.IMetric;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseBasicBolt;
-import org.apache.storm.tuple.Tuple;
-
-/**
- * Globally count number of messages
- */
-public class GlobalCountBolt extends BaseBasicBolt {
- private static final long serialVersionUID = 1L;
- private static final Logger logger = LoggerFactory
- .getLogger(GlobalCountBolt.class);
- private long globalCount;
- private long globalCountDiff;
- private long lastMetricsTime;
- private long throughput;
-
- @Override
- public void prepare(Map<String, Object> config, TopologyContext context) {
- globalCount = 0;
- globalCountDiff = 0;
- lastMetricsTime = System.nanoTime();
- context.registerMetric("GlobalMessageCount", new IMetric() {
- @Override
- public Object getValueAndReset() {
- long now = System.nanoTime();
- long millis = (now - lastMetricsTime) / 1000000;
- throughput = globalCountDiff / millis * 1000;
- Map<String, Object> values = new HashMap<>();
- values.put("global_count", globalCount);
- values.put("throughput", throughput);
- lastMetricsTime = now;
- globalCountDiff = 0;
- return values;
- }
- }, (Integer)config.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS));
- }
-
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- if (TupleUtils.isTick(tuple)) {
- return;
- }
-
- int partial = (Integer)tuple.getValueByField("partial_count");
- globalCount += partial;
- globalCountDiff += partial;
- if((globalCountDiff == partial) && (globalCount != globalCountDiff)) {
- //metrics has just been collected, let's also log it
- logger.info("Current throughput (messages/second): " + throughput);
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
- }
-
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples.bolt;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.storm.utils.TupleUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.Config;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Tuple;
+
+/**
+ * Globally count number of messages
+ */
+public class GlobalCountBolt extends BaseBasicBolt {
+ private static final long serialVersionUID = 1L;
+ private static final Logger logger = LoggerFactory
+ .getLogger(GlobalCountBolt.class);
+ private long globalCount;
+ private long globalCountDiff;
+ private long lastMetricsTime;
+ private long throughput;
+
+ @Override
+ public void prepare(Map<String, Object> config, TopologyContext context) {
+ globalCount = 0;
+ globalCountDiff = 0;
+ lastMetricsTime = System.nanoTime();
+ context.registerMetric("GlobalMessageCount", new IMetric() {
+ @Override
+ public Object getValueAndReset() {
+ long now = System.nanoTime();
+ long millis = (now - lastMetricsTime) / 1000000;
+ throughput = globalCountDiff / millis * 1000;
+ Map<String, Object> values = new HashMap<>();
+ values.put("global_count", globalCount);
+ values.put("throughput", throughput);
+ lastMetricsTime = now;
+ globalCountDiff = 0;
+ return values;
+ }
+ }, (Integer)config.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS));
+ }
+
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ if (TupleUtils.isTick(tuple)) {
+ return;
+ }
+
+ int partial = (Integer)tuple.getValueByField("partial_count");
+ globalCount += partial;
+ globalCountDiff += partial;
+ if((globalCountDiff == partial) && (globalCount != globalCountDiff)) {
+ //metrics has just been collected, let's also log it
+ logger.info("Current throughput (messages/second): " + throughput);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java
index 544b6c8..3763c69 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java
@@ -1,68 +1,68 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.samples.bolt;
-
-import java.util.Map;
-
-import org.apache.storm.utils.TupleUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseBasicBolt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-
-/**
- * Partially count number of messages from EventHubs
- */
-public class PartialCountBolt extends BaseBasicBolt {
- private static final long serialVersionUID = 1L;
- private static final Logger logger = LoggerFactory
- .getLogger(PartialCountBolt.class);
- private static final int PartialCountBatchSize = 1000;
-
- private int partialCount;
-
- @Override
- public void prepare(Map<String, Object> topoConf, TopologyContext context) {
- partialCount = 0;
- }
-
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- if (TupleUtils.isTick(tuple)) {
- return;
- }
-
- partialCount++;
- if(partialCount == PartialCountBatchSize) {
- collector.emit(new Values(PartialCountBatchSize));
- partialCount = 0;
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("partial_count"));
- }
-
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples.bolt;
+
+import java.util.Map;
+
+import org.apache.storm.utils.TupleUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+/**
+ * Partially count number of messages from EventHubs
+ */
+public class PartialCountBolt extends BaseBasicBolt {
+ private static final long serialVersionUID = 1L;
+ private static final Logger logger = LoggerFactory
+ .getLogger(PartialCountBolt.class);
+ private static final int PartialCountBatchSize = 1000;
+
+ private int partialCount;
+
+ @Override
+ public void prepare(Map<String, Object> topoConf, TopologyContext context) {
+ partialCount = 0;
+ }
+
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ if (TupleUtils.isTick(tuple)) {
+ return;
+ }
+
+ partialCount++;
+ if(partialCount == PartialCountBatchSize) {
+ collector.emit(new Values(PartialCountBatchSize));
+ partialCount = 0;
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("partial_count"));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
index ac724e8..621d6a8 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
@@ -1,71 +1,71 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.spout;
-
-import java.util.List;
-
-import org.apache.storm.spout.ISpoutOutputCollector;
-
-/**
- * Mock of ISpoutOutputCollector
- */
-public class SpoutOutputCollectorMock implements ISpoutOutputCollector {
- //comma separated offsets
- StringBuilder emittedOffset;
-
- public SpoutOutputCollectorMock() {
- emittedOffset = new StringBuilder();
- }
-
- public String getOffsetSequenceAndReset() {
- String ret = null;
- if(emittedOffset.length() > 0) {
- emittedOffset.setLength(emittedOffset.length()-1);
- ret = emittedOffset.toString();
- emittedOffset.setLength(0);
- }
- return ret;
- }
-
- @Override
- public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
- MessageId mid = (MessageId)messageId;
- String pid = mid.getPartitionId();
- String offset = mid.getOffset();
- emittedOffset.append(pid+"_"+offset+",");
- return null;
- }
-
- @Override
- public void emitDirect(int arg0, String arg1, List<Object> arg2, Object arg3) {
- }
-
- @Override
- public void flush() {
- // NO-OP
- }
-
- @Override
- public void reportError(Throwable arg0) {
- }
-
- @Override
- public long getPendingCount() {
- return 0;
- }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import java.util.List;
+
+import org.apache.storm.spout.ISpoutOutputCollector;
+
+/**
+ * Mock of ISpoutOutputCollector
+ */
+public class SpoutOutputCollectorMock implements ISpoutOutputCollector {
+ //comma separated offsets
+ StringBuilder emittedOffset;
+
+ public SpoutOutputCollectorMock() {
+ emittedOffset = new StringBuilder();
+ }
+
+ public String getOffsetSequenceAndReset() {
+ String ret = null;
+ if(emittedOffset.length() > 0) {
+ emittedOffset.setLength(emittedOffset.length()-1);
+ ret = emittedOffset.toString();
+ emittedOffset.setLength(0);
+ }
+ return ret;
+ }
+
+ @Override
+ public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
+ MessageId mid = (MessageId)messageId;
+ String pid = mid.getPartitionId();
+ String offset = mid.getOffset();
+ emittedOffset.append(pid+"_"+offset+",");
+ return null;
+ }
+
+ @Override
+ public void emitDirect(int arg0, String arg1, List<Object> arg2, Object arg3) {
+ }
+
+ @Override
+ public void flush() {
+ // NO-OP
+ }
+
+ @Override
+ public void reportError(Throwable arg0) {
+ }
+
+ @Override
+ public long getPendingCount() {
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/StateStoreMock.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/StateStoreMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/StateStoreMock.java
index cd6e13e..0abe3a4 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/StateStoreMock.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/StateStoreMock.java
@@ -1,54 +1,54 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.spout;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.storm.eventhubs.spout.IStateStore;
-
-/**
- * A state store mocker
- */
-public class StateStoreMock implements IStateStore {
- Map<String, String> myDataMap;
- @Override
- public void open() {
- myDataMap = new HashMap<String, String>();
- }
-
- @Override
- public void close() {
- myDataMap = null;
- }
-
- @Override
- public void saveData(String path, String data) {
- if(myDataMap != null) {
- myDataMap.put(path, data);
- }
- }
-
- @Override
- public String readData(String path) {
- if(myDataMap != null) {
- return myDataMap.get(path);
- }
- return null;
- }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.storm.eventhubs.spout.IStateStore;
+
+/**
+ * A state store mocker
+ */
+public class StateStoreMock implements IStateStore {
+ Map<String, String> myDataMap;
+ @Override
+ public void open() {
+ myDataMap = new HashMap<String, String>();
+ }
+
+ @Override
+ public void close() {
+ myDataMap = null;
+ }
+
+ @Override
+ public void saveData(String path, String data) {
+ if(myDataMap != null) {
+ myDataMap.put(path, data);
+ }
+ }
+
+ @Override
+ public String readData(String path) {
+ if(myDataMap != null) {
+ return myDataMap.get(path);
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java
index f260dea..926337b 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java
@@ -1,47 +1,47 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.spout;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertTrue;
-
-public class TestEventData {
-
- @Before
- public void setUp() throws Exception {
- }
-
- @After
- public void tearDown() throws Exception {
- }
-
- @Test
- public void testEventDataComparision() {
-
- MessageId messageId1 = MessageId.create(null, "3", 1);
- EventDataWrap eventData1 = EventDataWrap.create(null, messageId1);
-
- MessageId messageId2 = MessageId.create(null, "13", 2);
- EventDataWrap eventData2 = EventDataWrap.create(null, messageId2);
-
- assertTrue(eventData2.compareTo(eventData1) > 0);
- }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestEventData {
+
+ @Before
+ public void setUp() throws Exception {
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testEventDataComparision() {
+
+ MessageId messageId1 = MessageId.create(null, "3", 1);
+ EventDataWrap eventData1 = EventDataWrap.create(null, messageId1);
+
+ MessageId messageId2 = MessageId.create(null, "13", 2);
+ EventDataWrap eventData2 = EventDataWrap.create(null, messageId2);
+
+ assertTrue(eventData2.compareTo(eventData1) > 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java
index c42f769..bd5b07f 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java
@@ -1,57 +1,57 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.trident;
-
-import java.util.List;
-
-import org.apache.storm.trident.operation.TridentCollector;
-
-/**
- * A mock of TridentCollector
- */
-public class TridentCollectorMock implements TridentCollector {
- StringBuilder buffer;
-
- public TridentCollectorMock() {
- buffer = new StringBuilder();
- }
-
- @Override
- public void emit(List<Object> tuples) {
- for(Object o: tuples) {
- buffer.append(o.toString());
- }
- }
-
- @Override
- public void flush() {
- // NO-OP
- }
-
- @Override
- public void reportError(Throwable arg0) {
- }
-
- public void clear() {
- buffer.setLength(0);
- }
-
- public String getBuffer() {
- return buffer.toString();
- }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import java.util.List;
+
+import org.apache.storm.trident.operation.TridentCollector;
+
+/**
+ * A mock of TridentCollector
+ */
+public class TridentCollectorMock implements TridentCollector {
+ StringBuilder buffer;
+
+ public TridentCollectorMock() {
+ buffer = new StringBuilder();
+ }
+
+ @Override
+ public void emit(List<Object> tuples) {
+ for(Object o: tuples) {
+ buffer.append(o.toString());
+ }
+ }
+
+ @Override
+ public void flush() {
+ // NO-OP
+ }
+
+ @Override
+ public void reportError(Throwable arg0) {
+ }
+
+ public void clear() {
+ buffer.setLength(0);
+ }
+
+ public String getBuffer() {
+ return buffer.toString();
+ }
+}