You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/04/24 22:32:39 UTC
[1/4] storm git commit: STORM-583: initial import of donated code
Repository: storm
Updated Branches:
refs/heads/master b4351ede1 -> 38638474e
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubSpout.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubSpout.java
new file mode 100755
index 0000000..66ad425
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubSpout.java
@@ -0,0 +1,66 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import java.util.Map;
+
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+import org.apache.storm.eventhubs.spout.IEventDataScheme;
+
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Fields;
+import storm.trident.spout.IPartitionedTridentSpout;
+import org.apache.storm.eventhubs.trident.Partition;
+
+/**
+ * Transactional Trident EventHub Spout
+ */
+public class TransactionalTridentEventHubSpout implements
+ IPartitionedTridentSpout<Partitions, Partition, Map> {
+ private static final long serialVersionUID = 1L;
+ private final IEventDataScheme scheme;
+ private final EventHubSpoutConfig spoutConfig;
+
+ public TransactionalTridentEventHubSpout(EventHubSpoutConfig config) {
+ spoutConfig = config;
+ scheme = spoutConfig.getEventDataScheme();
+ }
+
+ @Override
+ public Map getComponentConfiguration() {
+ return null;
+ }
+
+ @Override
+ public IPartitionedTridentSpout.Coordinator<Partitions> getCoordinator(
+ Map conf, TopologyContext context) {
+ return new org.apache.storm.eventhubs.trident.Coordinator(spoutConfig);
+ }
+
+ @Override
+ public IPartitionedTridentSpout.Emitter<Partitions, Partition, Map> getEmitter(
+ Map conf, TopologyContext context) {
+ return new TransactionalTridentEventHubEmitter(spoutConfig);
+ }
+
+ @Override
+ public Fields getOutputFields() {
+ return scheme.getOutputFields();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java
new file mode 100755
index 0000000..60391c3
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java
@@ -0,0 +1,91 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.eventhubs.client.Constants;
+import org.apache.storm.eventhubs.client.EventHubException;
+import org.apache.storm.eventhubs.spout.EventData;
+import org.apache.storm.eventhubs.spout.EventHubReceiverFilter;
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+import org.apache.storm.eventhubs.spout.IEventHubReceiver;
+
+public class TridentPartitionManager implements ITridentPartitionManager {
+ private static final Logger logger = LoggerFactory.getLogger(TridentPartitionManager.class);
+ private final int receiveTimeoutMs = 5000;
+ private final IEventHubReceiver receiver;
+ private final EventHubSpoutConfig spoutConfig;
+ private String lastOffset = Constants.DefaultStartingOffset;
+
+ public TridentPartitionManager(EventHubSpoutConfig spoutConfig, IEventHubReceiver receiver) {
+ this.receiver = receiver;
+ this.spoutConfig = spoutConfig;
+ }
+
+ @Override
+ public boolean open(String offset) {
+ try {
+ if((offset == null || offset.equals(Constants.DefaultStartingOffset))
+ && spoutConfig.getEnqueueTimeFilter() != 0) {
+ receiver.open(new EventHubReceiverFilter(spoutConfig.getEnqueueTimeFilter()));
+ }
+ else {
+ receiver.open(new EventHubReceiverFilter(offset));
+ }
+ lastOffset = offset;
+ return true;
+ }
+ catch(EventHubException ex) {
+ logger.error("failed to open eventhub receiver: " + ex.getMessage());
+ return false;
+ }
+ }
+
+ @Override
+ public void close() {
+ receiver.close();
+ }
+
+ @Override
+ public List<EventData> receiveBatch(String offset, int count) {
+ List<EventData> batch = new ArrayList<EventData>(count);
+ if(!offset.equals(lastOffset) || !receiver.isOpen()) {
+ //re-establish connection to eventhub servers using the right offset
+ //TBD: might be optimized with cache.
+ close();
+ if(!open(offset)) {
+ return batch;
+ }
+ }
+
+ for(int i=0; i<count; ++i) {
+ EventData ed = receiver.receive(receiveTimeoutMs);
+ if(ed == null) {
+ break;
+ }
+ batch.add(ed);
+ lastOffset = ed.getMessageId().getOffset();
+ }
+ return batch;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/resources/Config.properties
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/resources/Config.properties b/external/storm-eventhubs/src/main/resources/Config.properties
new file mode 100755
index 0000000..82abb48
--- /dev/null
+++ b/external/storm-eventhubs/src/main/resources/Config.properties
@@ -0,0 +1,27 @@
+eventhubspout.username = [username]
+
+eventhubspout.password = [password]
+
+eventhubspout.namespace = [namespace]
+
+eventhubspout.entitypath = [entitypath]
+
+eventhubspout.partitions.count = [partitioncount]
+
+# if not provided, will use storm's zookeeper settings
+# zookeeper.connectionstring=localhost:2181
+
+eventhubspout.checkpoint.interval = 10
+
+eventhub.receiver.credits = 1024
+
+# Defines how many messages can be pending (sent but not acked by storm)
+# per partition in eventhubspout. If set too large, your spout may throw
+# OutOfMemoryError because all pending messages are cached in the spout.
+eventhubspout.max.pending.messages.per.partition = 1024
+
+# Defines how many seconds in the past the spout uses to filter events in
+# the EventHubs entity when we first create the Storm topology. If offsets
+# have been saved in Zookeeper, we'll ignore this configuration.
+# A value of 0 means disable time based filtering when creating the receiver.
+eventhub.receiver.filter.timediff = 0
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubReceiverMock.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubReceiverMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubReceiverMock.java
new file mode 100755
index 0000000..740ef63
--- /dev/null
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubReceiverMock.java
@@ -0,0 +1,105 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.eventhubs.spout.MessageId;
+import org.apache.storm.eventhubs.spout.EventData;
+import org.apache.storm.eventhubs.spout.IEventHubReceiver;
+
+import org.apache.qpid.amqp_1_0.client.Message;
+import org.apache.qpid.amqp_1_0.jms.impl.TextMessageImpl;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.messaging.Data;
+
+import org.apache.storm.eventhubs.client.EventHubException;
+
+/**
+ * A mock receiver that emits fake data with offset starting from given offset
+ * and increase by 1 each time.
+ */
+public class EventHubReceiverMock implements IEventHubReceiver {
+ private static boolean isPaused = false;
+ private final String partitionId;
+ private long currentOffset;
+ private boolean isOpen;
+
+ public EventHubReceiverMock(String pid) {
+ partitionId = pid;
+ isPaused = false;
+ }
+
+ /**
+ * Use this method to pause/resume all the receivers.
+ * If paused all receiver will return null.
+ * @param val
+ */
+ public static void setPause(boolean val) {
+ isPaused = val;
+ }
+
+ @Override
+ public void open(IEventHubReceiverFilter filter) throws EventHubException {
+ if(filter.getOffset() != null) {
+ currentOffset = Long.parseLong(filter.getOffset());
+ }
+ else if(filter.getEnqueueTime() != 0) {
+ //assume if it's time based filter the offset matches the enqueue time.
+ currentOffset = filter.getEnqueueTime();
+ }
+ else {
+ throw new EventHubException("Invalid IEventHubReceiverFilter");
+ }
+ isOpen = true;
+ }
+
+ @Override
+ public void close() {
+ isOpen = false;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return isOpen;
+ }
+
+ @Override
+ public EventData receive(long timeoutInMilliseconds) {
+ if(isPaused) {
+ return null;
+ }
+
+ currentOffset++;
+ List<Section> body = new ArrayList<Section>();
+ //the body of the message is "message" + currentOffset, e.g. "message123"
+ body.add(new Data(new Binary(("message" + currentOffset).getBytes())));
+ Message m = new Message(body);
+ MessageId mid = new MessageId(partitionId, "" + currentOffset, currentOffset);
+ EventData ed = new EventData(m, mid);
+ return ed;
+ }
+
+ @Override
+ public Map getMetricsData() {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubSpoutCallerMock.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubSpoutCallerMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubSpoutCallerMock.java
new file mode 100755
index 0000000..d5ba90a
--- /dev/null
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubSpoutCallerMock.java
@@ -0,0 +1,96 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import backtype.storm.spout.SpoutOutputCollector;
+
+/**
+ * Mocks EventHubSpout's caller (storm framework)
+ */
+public class EventHubSpoutCallerMock {
+ public static final String statePathPrefix = "/eventhubspout/TestTopo/namespace/entityname/partitions/";
+ EventHubSpout spout;
+ private IStateStore stateStore;
+ private SpoutOutputCollectorMock collector;
+
+ public EventHubSpoutCallerMock(int totalPartitions,
+ int totalTasks, int taskIndex, int checkpointInterval) {
+ stateStore = new StateStoreMock();
+ EventHubSpoutConfig conf = new EventHubSpoutConfig("username", "password",
+ "namespace", "entityname", totalPartitions, "zookeeper", checkpointInterval, 1024);
+ conf.setTopologyName("TestTopo");
+
+ IEventHubReceiverFactory recvFactory = new IEventHubReceiverFactory() {
+ @Override
+ public IEventHubReceiver create(EventHubSpoutConfig config,
+ String partitionId) {
+ return new EventHubReceiverMock(partitionId);
+ }
+ };
+ //mock state store and receiver
+ spout = new EventHubSpout(conf, stateStore, null, recvFactory);
+
+ collector = new SpoutOutputCollectorMock();
+
+ try {
+ spout.preparePartitions(null, totalTasks, taskIndex, new SpoutOutputCollector(collector));
+ }
+ catch(Exception ex) {
+ }
+ }
+
+ /**
+ * Execute a sequence of calls to EventHubSpout.
+ *
+ * @param callSequence: is represented as a string of commands,
+ * e.g. "r,r,r,r,a1,f2,...". The commands are:
+ * r[N]: receive() called N times
+ * aP_X: ack(P_X), partition: P, offset: X
+ * fP_Y: fail(P_Y), partition: P, offset: Y
+ */
+ public String execute(String callSequence) {
+ String[] cmds = callSequence.split(",");
+ for(String cmd : cmds) {
+ if(cmd.startsWith("r")) {
+ int count = 1;
+ if(cmd.length() > 1) {
+ count = Integer.parseInt(cmd.substring(1));
+ }
+ for(int i=0; i<count; ++i) {
+ spout.nextTuple();
+ }
+ }
+ else if(cmd.startsWith("a")) {
+ String[] midStrs = cmd.substring(1).split("_");
+ MessageId msgId = new MessageId(midStrs[0], midStrs[1], Long.parseLong(midStrs[1]));
+ spout.ack(msgId);
+ }
+ else if(cmd.startsWith("f")) {
+ String[] midStrs = cmd.substring(1).split("_");
+ MessageId msgId = new MessageId(midStrs[0], midStrs[1], Long.parseLong(midStrs[1]));
+ spout.fail(msgId);
+ }
+ }
+ return collector.getOffsetSequenceAndReset();
+ }
+
+ public String getCheckpoint(int partitionIndex) {
+ String statePath = statePathPrefix + partitionIndex;
+ return stateStore.readData(statePath);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/PartitionManagerCallerMock.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/PartitionManagerCallerMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/PartitionManagerCallerMock.java
new file mode 100755
index 0000000..dd63d5d
--- /dev/null
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/PartitionManagerCallerMock.java
@@ -0,0 +1,105 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import org.apache.storm.eventhubs.spout.PartitionManager;
+import org.apache.storm.eventhubs.spout.EventData;
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+
+/**
+ * This mock exercises PartitionManager
+ */
+public class PartitionManagerCallerMock {
+ public static final String statePath = "/eventhubspout/TestTopo/namespace/entityname/partitions/1";
+ private IPartitionManager pm;
+ private IStateStore stateStore;
+
+ public PartitionManagerCallerMock(String partitionId) {
+ this(partitionId, 0);
+ }
+
+ public PartitionManagerCallerMock(String partitionId, long enqueueTimeFilter) {
+ EventHubReceiverMock receiver = new EventHubReceiverMock(partitionId);
+ EventHubSpoutConfig conf = new EventHubSpoutConfig("username", "password",
+ "namespace", "entityname", 16, "zookeeper", 10, 1024, 1024, enqueueTimeFilter);
+ conf.setTopologyName("TestTopo");
+ stateStore = new StateStoreMock();
+ this.pm = new PartitionManager(conf, partitionId, stateStore, receiver);
+
+ stateStore.open();
+ try {
+ pm.open();
+ }
+ catch (Exception ex) {
+ }
+ }
+
+ /**
+ * Execute a sequence of calls to Partition Manager.
+ *
+ * @param callSequence: is represented as a string of commands,
+ * e.g. "r,r,r,r,a1,f2,...". The commands are:
+ * r[N]: receive() called N times
+ * aX: ack(X)
+ * fY: fail(Y)
+ *
+ * @return the sequence of messages the receive call returns
+ */
+ public String execute(String callSequence) {
+
+ String[] cmds = callSequence.split(",");
+ StringBuilder ret = new StringBuilder();
+ for(String cmd : cmds) {
+ if(cmd.startsWith("r")) {
+ int count = 1;
+ if(cmd.length() > 1) {
+ count = Integer.parseInt(cmd.substring(1));
+ }
+ for(int i=0; i<count; ++i) {
+ EventData ed = pm.receive();
+ if(ed == null) {
+ ret.append("null,");
+ }
+ else {
+ ret.append(ed.getMessageId().getOffset());
+ ret.append(",");
+ }
+ }
+ }
+ else if(cmd.startsWith("a")) {
+ pm.ack(cmd.substring(1));
+ }
+ else if(cmd.startsWith("f")) {
+ pm.fail(cmd.substring(1));
+ }
+ }
+ if(ret.length() > 0) {
+ ret.setLength(ret.length()-1);
+ }
+ return ret.toString();
+ }
+
+ /**
+ * Exercise the IPartitionManager.checkpoint() method
+ * @return the offset that we write to state store
+ */
+ public String checkpoint() {
+ pm.checkpoint();
+ return stateStore.readData(statePath);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
new file mode 100755
index 0000000..02e6830
--- /dev/null
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
@@ -0,0 +1,61 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import java.util.List;
+
+import backtype.storm.spout.ISpoutOutputCollector;
+
+/**
+ * Mock of ISpoutOutputCollector
+ */
+public class SpoutOutputCollectorMock implements ISpoutOutputCollector {
+ //comma separated offsets
+ StringBuilder emittedOffset;
+
+ public SpoutOutputCollectorMock() {
+ emittedOffset = new StringBuilder();
+ }
+
+ public String getOffsetSequenceAndReset() {
+ String ret = null;
+ if(emittedOffset.length() > 0) {
+ emittedOffset.setLength(emittedOffset.length()-1);
+ ret = emittedOffset.toString();
+ emittedOffset.setLength(0);
+ }
+ return ret;
+ }
+
+ @Override
+ public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
+ MessageId mid = (MessageId)messageId;
+ String pid = mid.getPartitionId();
+ String offset = mid.getOffset();
+ emittedOffset.append(pid+"_"+offset+",");
+ return null;
+ }
+
+ @Override
+ public void emitDirect(int arg0, String arg1, List<Object> arg2, Object arg3) {
+ }
+
+ @Override
+ public void reportError(Throwable arg0) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/StateStoreMock.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/StateStoreMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/StateStoreMock.java
new file mode 100755
index 0000000..cd6e13e
--- /dev/null
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/StateStoreMock.java
@@ -0,0 +1,54 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.storm.eventhubs.spout.IStateStore;
+
+/**
+ * A state store mocker
+ */
+public class StateStoreMock implements IStateStore {
+ Map<String, String> myDataMap;
+ @Override
+ public void open() {
+ myDataMap = new HashMap<String, String>();
+ }
+
+ @Override
+ public void close() {
+ myDataMap = null;
+ }
+
+ @Override
+ public void saveData(String path, String data) {
+ if(myDataMap != null) {
+ myDataMap.put(path, data);
+ }
+ }
+
+ @Override
+ public String readData(String path) {
+ if(myDataMap != null) {
+ return myDataMap.get(path);
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java
new file mode 100755
index 0000000..aa8d097
--- /dev/null
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java
@@ -0,0 +1,47 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import static org.junit.Assert.*;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestEventData {
+
+ @Before
+ public void setUp() throws Exception {
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testEventDataComparision() {
+
+ MessageId messageId1 = MessageId.create(null, "3", 1);
+ EventData eventData1 = EventData.create(null, messageId1);
+
+ MessageId messageId2 = MessageId.create(null, "13", 2);
+ EventData eventData2 = EventData.create(null, messageId2);
+
+ assertTrue(eventData2.compareTo(eventData1) > 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java
new file mode 100755
index 0000000..6a0d163
--- /dev/null
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java
@@ -0,0 +1,70 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import static org.junit.Assert.*;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class TestEventHubSpout {
+
+ @Before
+ public void setUp() throws Exception {
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testSpoutConfig() {
+ EventHubSpoutConfig conf = new EventHubSpoutConfig("username", "pas\\s+w/ord",
+ "namespace", "entityname", 16, "zookeeper");
+ assertEquals(conf.getConnectionString(), "amqps://username:pas%5Cs%2Bw%2Ford@namespace.servicebus.windows.net");
+ }
+
+ @Test
+ public void testSpoutBasic() {
+ //This spout owns 2 partitions: 6 and 14
+ EventHubSpoutCallerMock mock = new EventHubSpoutCallerMock(16, 8, 6,10);
+ String result = mock.execute("r6,f6_0,a6_1,a6_2,a14_0,a14_2,r4,f14_1,r2");
+ assertEquals("6_0,14_0,6_1,14_1,6_2,14_2,6_0,14_3,6_3,14_4,6_4,14_1", result);
+ }
+
+ @Test
+ public void testSpoutCheckpoint() {
+ //Make sure that even though nextTuple() doesn't receive valid data,
+ //the offset will be checkpointed after checkpointInterval seconds.
+
+ //This spout owns 1 partitions: 6
+ EventHubSpoutCallerMock mock = new EventHubSpoutCallerMock(8, 8, 6, 1);
+ String result = mock.execute("r6,a6_0,a6_1,a6_2");
+ try {
+ Thread.sleep(2000);
+ }
+ catch(InterruptedException ex) {}
+ EventHubReceiverMock.setPause(true);
+ result = mock.execute("r3");
+ EventHubReceiverMock.setPause(false);
+ assertEquals("3", mock.getCheckpoint(6));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestPartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestPartitionManager.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestPartitionManager.java
new file mode 100755
index 0000000..5b28f1d
--- /dev/null
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestPartitionManager.java
@@ -0,0 +1,117 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestPartitionManager {
+
+ @Before
+ public void setUp() throws Exception {
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testPartitionManagerNoFail() {
+ PartitionManagerCallerMock mock
+ = new PartitionManagerCallerMock("1");
+ String result = mock.execute("r,r,r,a0,a1,a2,r");
+ assertEquals("0,1,2,3", result);
+ }
+
+ @Test
+ public void testPartitionManagerResend() {
+ PartitionManagerCallerMock mock
+ = new PartitionManagerCallerMock("1");
+ String result = mock.execute("r,a0,r,r,r,f3,r,f2,f1,r,r,a1,a2,a3,r");
+ assertEquals("0,1,2,3,3,1,2,4", result);
+ }
+
+ @Test
+ public void testPMCheckpointWithPending() {
+ PartitionManagerCallerMock mock
+ = new PartitionManagerCallerMock("1");
+ mock.execute("r,r,r");
+ //no ack, so return the first of pending list
+ assertEquals("0", mock.checkpoint());
+ mock.execute("a0,a2");
+ //still need to return the first of pending list
+ assertEquals("1", mock.checkpoint());
+ }
+
+ @Test
+ public void testPMCheckpointWithResend() {
+ PartitionManagerCallerMock mock
+ = new PartitionManagerCallerMock("1");
+ mock.execute("r,r,r,f2,f1,f0");
+ //pending is empty, return the smallest in toResend
+ assertEquals("0", mock.checkpoint());
+ mock.execute("r,a0");
+ //pending is still empty
+ assertEquals("1", mock.checkpoint());
+ }
+
+ @Test
+ public void testPMCheckpointWithPendingAndResend() {
+ PartitionManagerCallerMock mock
+ = new PartitionManagerCallerMock("1");
+ mock.execute("r,r,r,f2,f1");
+ //return the smaller of pending and toResend
+ assertEquals("0", mock.checkpoint());
+ mock.execute("a0,r");
+ //now pending: [3], toResend: [1,2]
+ assertEquals("1", mock.checkpoint());
+ }
+
+ @Test
+ public void testPMCheckpointWithNoPendingAndNoResend() {
+ PartitionManagerCallerMock mock
+ = new PartitionManagerCallerMock("1");
+ //if no event sent, no checkpoint shall be created
+ assertEquals(null, mock.checkpoint());
+ mock.execute("r,r,r,f2,f1,r,r,a2,a1,a0");
+ //all events are sent successfully, return last sent offset
+ assertEquals("2", mock.checkpoint());
+ }
+
+ @Test
+ public void testPartitionManagerMaxPendingMessages() {
+ PartitionManagerCallerMock mock
+ = new PartitionManagerCallerMock("1");
+ String result = mock.execute("r1024");
+ //any receive call after exceeding max pending messages results in null
+ result = mock.execute("r2");
+ assertEquals("null,null", result);
+ result = mock.execute("a0,a1,r2");
+ assertEquals("1024,1025", result);
+ }
+
+ @Test
+ public void testPartitionManagerEnqueueTimeFilter() {
+ PartitionManagerCallerMock mock
+ = new PartitionManagerCallerMock("1", 123456);
+ String result = mock.execute("r2");
+ assertEquals("123457,123458", result);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TestTransactionalTridentEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TestTransactionalTridentEmitter.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TestTransactionalTridentEmitter.java
new file mode 100755
index 0000000..03696a2
--- /dev/null
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TestTransactionalTridentEmitter.java
@@ -0,0 +1,93 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import static org.junit.Assert.*;
+
+import java.util.Map;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.storm.eventhubs.spout.EventHubReceiverMock;
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+import org.apache.storm.eventhubs.spout.IEventHubReceiver;
+import org.apache.storm.eventhubs.spout.IEventHubReceiverFactory;
+
+public class TestTransactionalTridentEmitter {
+ private TransactionalTridentEventHubEmitter emitter;
+ private Partition partition;
+ private TridentCollectorMock collectorMock;
+ private final int batchSize = 32;
+
+ @Before
+ public void setUp() throws Exception {
+ EventHubSpoutConfig conf = new EventHubSpoutConfig("username", "password",
+ "namespace", "entityname", 16, "zookeeper");
+ conf.setTopologyName("TestTopo");
+ IEventHubReceiverFactory recvFactory = new IEventHubReceiverFactory() {
+ @Override
+ public IEventHubReceiver create(EventHubSpoutConfig config,
+ String partitionId) {
+ return new EventHubReceiverMock(partitionId);
+ }
+ };
+ partition = new Partition(conf, "0");
+ emitter = new TransactionalTridentEventHubEmitter(conf, batchSize, null, recvFactory);
+ collectorMock = new TridentCollectorMock();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ emitter.close();
+ emitter = null;
+ }
+
+ @Test
+ public void testEmitInSequence() {
+ //test the happy path, emit batches in sequence
+ Map meta = emitter.emitPartitionBatchNew(null, collectorMock, partition, null);
+ String collected = collectorMock.getBuffer();
+ assertTrue(collected.startsWith("message"+0));
+ //System.out.println("collected: " + collected);
+ collectorMock.clear();
+
+ emitter.emitPartitionBatchNew(null, collectorMock, partition, meta);
+ collected = collectorMock.getBuffer();
+ //System.out.println("collected: " + collected);
+ assertTrue(collected.startsWith("message"+batchSize));
+ }
+
+ @Test
+ public void testReEmit() {
+ //test we can re-emit the second batch
+ Map meta = emitter.emitPartitionBatchNew(null, collectorMock, partition, null);
+ collectorMock.clear();
+
+ //emit second batch
+ Map meta1 = emitter.emitPartitionBatchNew(null, collectorMock, partition, meta);
+ String collected0 = collectorMock.getBuffer();
+ collectorMock.clear();
+
+ //re-emit second batch
+ emitter.emitPartitionBatch(null, collectorMock, partition, meta1);
+ String collected1 = collectorMock.getBuffer();
+ assertTrue(collected0.equals(collected1));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java
new file mode 100755
index 0000000..cc4686e
--- /dev/null
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java
@@ -0,0 +1,52 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import java.util.List;
+
+import storm.trident.operation.TridentCollector;
+
+/**
+ * A mock of TridentCollector
+ */
+public class TridentCollectorMock implements TridentCollector {
+ StringBuilder buffer;
+
+ public TridentCollectorMock() {
+ buffer = new StringBuilder();
+ }
+
+ @Override
+ public void emit(List<Object> tuples) {
+ for(Object o: tuples) {
+ buffer.append(o.toString());
+ }
+ }
+
+ @Override
+ public void reportError(Throwable arg0) {
+ }
+
+ public void clear() {
+ buffer.setLength(0);
+ }
+
+ public String getBuffer() {
+ return buffer.toString();
+ }
+}
[2/4] storm git commit: STORM-583: initial import of donated code
Posted by pt...@apache.org.
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
new file mode 100755
index 0000000..5600873
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
@@ -0,0 +1,150 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import org.apache.qpid.amqp_1_0.client.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.metric.api.CountMetric;
+import backtype.storm.metric.api.MeanReducer;
+import backtype.storm.metric.api.ReducedMetric;
+
+import org.apache.storm.eventhubs.client.Constants;
+import org.apache.storm.eventhubs.client.EventHubClient;
+import org.apache.storm.eventhubs.client.EventHubException;
+import org.apache.storm.eventhubs.client.EventHubReceiver;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.messaging.MessageAnnotations;
+
+public class EventHubReceiverImpl implements IEventHubReceiver {
+ private static final Logger logger = LoggerFactory.getLogger(EventHubReceiverImpl.class);
+ private static final Symbol OffsetKey = Symbol.valueOf("x-opt-offset");
+ private static final Symbol SequenceNumberKey = Symbol.valueOf("x-opt-sequence-number");
+
+ private final String connectionString;
+ private final String entityName;
+ private final String partitionId;
+ private final int defaultCredits;
+
+ private EventHubReceiver receiver;
+ private String lastOffset = null;
+ private ReducedMetric receiveApiLatencyMean;
+ private CountMetric receiveApiCallCount;
+ private CountMetric receiveMessageCount;
+
+ public EventHubReceiverImpl(EventHubSpoutConfig config, String partitionId) {
+ this.connectionString = config.getConnectionString();
+ this.entityName = config.getEntityPath();
+ this.defaultCredits = config.getReceiverCredits();
+ this.partitionId = partitionId;
+ receiveApiLatencyMean = new ReducedMetric(new MeanReducer());
+ receiveApiCallCount = new CountMetric();
+ receiveMessageCount = new CountMetric();
+ }
+
+ @Override
+ public void open(IEventHubReceiverFilter filter) throws EventHubException {
+ logger.info("creating eventhub receiver: partitionId=" + partitionId + ", offset=" + filter.getOffset()
+ + ", enqueueTime=" + filter.getEnqueueTime());
+ long start = System.currentTimeMillis();
+ EventHubClient eventHubClient = EventHubClient.create(connectionString, entityName);
+ if(filter.getOffset() != null) {
+ receiver = eventHubClient.getDefaultConsumerGroup().createReceiver(partitionId, filter.getOffset(), defaultCredits);
+ }
+ else if(filter.getEnqueueTime() != 0) {
+ receiver = eventHubClient.getDefaultConsumerGroup().createReceiver(partitionId, filter.getEnqueueTime(), defaultCredits);
+ }
+ else {
+ logger.error("Invalid IEventHubReceiverFilter, use default offset as filter");
+ receiver = eventHubClient.getDefaultConsumerGroup().createReceiver(partitionId, Constants.DefaultStartingOffset, defaultCredits);
+ }
+ long end = System.currentTimeMillis();
+ logger.info("created eventhub receiver, time taken(ms): " + (end-start));
+ }
+
+ @Override
+ public void close() {
+ if(receiver != null) {
+ receiver.close();
+ logger.info("closed eventhub receiver: partitionId=" + partitionId );
+ receiver = null;
+ }
+ }
+
+ @Override
+ public boolean isOpen() {
+ return (receiver != null);
+ }
+
+ @Override
+ public EventData receive(long timeoutInMilliseconds) {
+ long start = System.currentTimeMillis();
+ Message message = receiver.receive(timeoutInMilliseconds);
+ long end = System.currentTimeMillis();
+ long millis = (end - start);
+ receiveApiLatencyMean.update(millis);
+ receiveApiCallCount.incr();
+
+ if (message == null) {
+ return null;
+ }
+ receiveMessageCount.incr();
+
+ //logger.info(String.format("received a message. PartitionId: %s, Offset: %s", partitionId, this.lastOffset));
+ MessageId messageId = createMessageId(message);
+
+ return EventData.create(message, messageId);
+ }
+
+ private MessageId createMessageId(Message message) {
+ String offset = null;
+ long sequenceNumber = 0;
+
+ for (Section section : message.getPayload()) {
+ if (section instanceof MessageAnnotations) {
+ MessageAnnotations annotations = (MessageAnnotations) section;
+ HashMap annonationMap = (HashMap) annotations.getValue();
+
+ if (annonationMap.containsKey(OffsetKey)) {
+ offset = (String) annonationMap.get(OffsetKey);
+ }
+
+ if (annonationMap.containsKey(SequenceNumberKey)) {
+ sequenceNumber = (Long) annonationMap.get(SequenceNumberKey);
+ }
+ }
+ }
+
+ return MessageId.create(partitionId, offset, sequenceNumber);
+ }
+
+ @Override
+ public Map getMetricsData() {
+ Map ret = new HashMap();
+ ret.put(partitionId + "/receiveApiLatencyMean", receiveApiLatencyMean.getValueAndReset());
+ ret.put(partitionId + "/receiveApiCallCount", receiveApiCallCount.getValueAndReset());
+ ret.put(partitionId + "/receiveMessageCount", receiveMessageCount.getValueAndReset());
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java
new file mode 100755
index 0000000..9290e6e
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java
@@ -0,0 +1,258 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import backtype.storm.Config;
+import backtype.storm.metric.api.IMetric;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.amqp_1_0.client.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EventHubSpout extends BaseRichSpout {
+
+ private static final Logger logger = LoggerFactory.getLogger(EventHubSpout.class);
+
+ private final UUID instanceId;
+ private final EventHubSpoutConfig eventHubConfig;
+ private final IEventDataScheme scheme;
+ private final int checkpointIntervalInSeconds;
+
+ private IStateStore stateStore;
+ private IPartitionCoordinator partitionCoordinator;
+ private IPartitionManagerFactory pmFactory;
+ private IEventHubReceiverFactory recvFactory;
+ private SpoutOutputCollector collector;
+ private long lastCheckpointTime;
+ private int currentPartitionIndex = -1;
+
+ public EventHubSpout(EventHubSpoutConfig spoutConfig) {
+ this(spoutConfig, null, null, null);
+ }
+
+ public EventHubSpout(EventHubSpoutConfig spoutConfig,
+ IStateStore store,
+ IPartitionManagerFactory pmFactory,
+ IEventHubReceiverFactory recvFactory) {
+ this.eventHubConfig = spoutConfig;
+ this.scheme = spoutConfig.getEventDataScheme();
+ this.instanceId = UUID.randomUUID();
+ this.checkpointIntervalInSeconds = spoutConfig.getCheckpointIntervalInSeconds();
+ this.lastCheckpointTime = System.currentTimeMillis();
+ stateStore = store;
+ this.pmFactory = pmFactory;
+ if(this.pmFactory == null) {
+ this.pmFactory = new IPartitionManagerFactory() {
+ @Override
+ public IPartitionManager create(EventHubSpoutConfig spoutConfig,
+ String partitionId, IStateStore stateStore,
+ IEventHubReceiver receiver) {
+ return new PartitionManager(spoutConfig, partitionId,
+ stateStore, receiver);
+ }
+ };
+ }
+ this.recvFactory = recvFactory;
+ if(this.recvFactory == null) {
+ this.recvFactory = new IEventHubReceiverFactory() {
+ @Override
+ public IEventHubReceiver create(EventHubSpoutConfig spoutConfig,
+ String partitionId) {
+ return new EventHubReceiverImpl(spoutConfig, partitionId);
+ }
+ };
+ }
+
+ }
+
+ /**
+ * This is a extracted method that is easy to test
+ * @param config
+ * @param totalTasks
+ * @param taskIndex
+ * @param collector
+ * @throws Exception
+ */
+ public void preparePartitions(Map config, int totalTasks, int taskIndex, SpoutOutputCollector collector) throws Exception {
+ this.collector = collector;
+ if(stateStore == null) {
+ String zkEndpointAddress = eventHubConfig.getZkConnectionString();
+ if (zkEndpointAddress == null || zkEndpointAddress.length() == 0) {
+ //use storm's zookeeper servers if not specified.
+ @SuppressWarnings("unchecked")
+ List<String> zkServers = (List<String>) config.get(Config.STORM_ZOOKEEPER_SERVERS);
+ Integer zkPort = ((Number) config.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
+ StringBuilder sb = new StringBuilder();
+ for (String zk : zkServers) {
+ if (sb.length() > 0) {
+ sb.append(',');
+ }
+ sb.append(zk+":"+zkPort);
+ }
+ zkEndpointAddress = sb.toString();
+ }
+ stateStore = new ZookeeperStateStore(zkEndpointAddress,
+ (Integer)config.get(Config.STORM_ZOOKEEPER_RETRY_TIMES),
+ (Integer)config.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL));
+ }
+ stateStore.open();
+
+ partitionCoordinator = new StaticPartitionCoordinator(
+ eventHubConfig, taskIndex, totalTasks, stateStore, pmFactory, recvFactory);
+
+ for (IPartitionManager partitionManager :
+ partitionCoordinator.getMyPartitionManagers()) {
+ partitionManager.open();
+ }
+ }
+
+ @Override
+ public void open(Map config, TopologyContext context, SpoutOutputCollector collector) {
+ logger.info("begin: open()");
+ String topologyName = (String) config.get(Config.TOPOLOGY_NAME);
+ eventHubConfig.setTopologyName(topologyName);
+
+ int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
+ int taskIndex = context.getThisTaskIndex();
+ if (totalTasks > eventHubConfig.getPartitionCount()) {
+ throw new RuntimeException("total tasks of EventHubSpout is greater than partition count.");
+ }
+
+ logger.info(String.format("topologyName: %s, totalTasks: %d, taskIndex: %d", topologyName, totalTasks, taskIndex));
+
+ try {
+ preparePartitions(config, totalTasks, taskIndex, collector);
+ } catch (Exception e) {
+ logger.error(e.getMessage());
+ throw new RuntimeException(e);
+ }
+
+ //register metrics
+ context.registerMetric("EventHubReceiver", new IMetric() {
+ @Override
+ public Object getValueAndReset() {
+ Map concatMetricsDataMaps = new HashMap();
+ for (IPartitionManager partitionManager :
+ partitionCoordinator.getMyPartitionManagers()) {
+ concatMetricsDataMaps.putAll(partitionManager.getMetricsData());
+ }
+ return concatMetricsDataMaps;
+ }
+ }, (Integer)config.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS));
+ logger.info("end open()");
+ }
+
+ @Override
+ public void nextTuple() {
+ EventData eventData = null;
+
+ List<IPartitionManager> partitionManagers = partitionCoordinator.getMyPartitionManagers();
+ for (int i = 0; i < partitionManagers.size(); i++) {
+ currentPartitionIndex = (currentPartitionIndex + 1) % partitionManagers.size();
+ IPartitionManager partitionManager = partitionManagers.get(currentPartitionIndex);
+
+ if (partitionManager == null) {
+ throw new RuntimeException("partitionManager doesn't exist.");
+ }
+
+ eventData = partitionManager.receive();
+
+ if (eventData != null) {
+ break;
+ }
+ }
+
+
+ if (eventData != null) {
+ MessageId messageId = eventData.getMessageId();
+ Message message = eventData.getMessage();
+
+ List<Object> tuples = scheme.deserialize(message);
+
+ if (tuples != null) {
+ collector.emit(tuples, messageId);
+ }
+ }
+
+ checkpointIfNeeded();
+
+ // We don't need to sleep here because the IPartitionManager.receive() is
+ // a blocked call so it's fine to call this function in a tight loop.
+ }
+
+ @Override
+ public void ack(Object msgId) {
+ MessageId messageId = (MessageId) msgId;
+ IPartitionManager partitionManager = partitionCoordinator.getPartitionManager(messageId.getPartitionId());
+ String offset = messageId.getOffset();
+ partitionManager.ack(offset);
+ }
+
+ @Override
+ public void fail(Object msgId) {
+ MessageId messageId = (MessageId) msgId;
+ IPartitionManager partitionManager = partitionCoordinator.getPartitionManager(messageId.getPartitionId());
+ String offset = messageId.getOffset();
+ partitionManager.fail(offset);
+ }
+
+ @Override
+ public void deactivate() {
+ // let's checkpoint so that we can get the last checkpoint when restarting.
+ checkpoint();
+ }
+
+ @Override
+ public void close() {
+ for (IPartitionManager partitionManager :
+ partitionCoordinator.getMyPartitionManagers()) {
+ partitionManager.close();
+ }
+ stateStore.close();
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(scheme.getOutputFields());
+ }
+
+ private void checkpointIfNeeded() {
+ long nextCheckpointTime = lastCheckpointTime + checkpointIntervalInSeconds * 1000;
+ if (nextCheckpointTime < System.currentTimeMillis()) {
+
+ checkpoint();
+ lastCheckpointTime = System.currentTimeMillis();
+ }
+ }
+
+ private void checkpoint() {
+ for (IPartitionManager partitionManager :
+ partitionCoordinator.getMyPartitionManagers()) {
+ partitionManager.checkpoint();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
new file mode 100755
index 0000000..ae11680
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
@@ -0,0 +1,165 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.List;
+
+public class EventHubSpoutConfig implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ private final String userName;
+ private final String password;
+ private final String namespace;
+ private final String entityPath;
+ private final String zkConnectionString;
+ private final int partitionCount;
+ private final int checkpointIntervalInSeconds;
+ private final int receiverCredits;
+ private final int maxPendingMsgsPerPartition;
+ private final long enqueueTimeFilter; //timestamp in millisecond
+
+ private String connectionString;
+ private String targetFqnAddress;
+ private String topologyName;
+ private IEventDataScheme scheme;
+
+ public EventHubSpoutConfig(String username, String password, String namespace,
+ String entityPath, int partitionCount, String zkConnectionString) {
+ this(username, password, namespace, entityPath, partitionCount,
+ zkConnectionString, 10, 1024, 1024, 0);
+ }
+
+ //Keep this constructor for backward compatibility
+ public EventHubSpoutConfig(String username, String password, String namespace,
+ String entityPath, int partitionCount, String zkConnectionString,
+ int checkpointIntervalInSeconds, int receiverCredits) {
+ this(username, password, namespace, entityPath, partitionCount,
+ zkConnectionString, checkpointIntervalInSeconds, receiverCredits, 1024, 0);
+ }
+
+ public EventHubSpoutConfig(String username, String password, String namespace,
+ String entityPath, int partitionCount, String zkConnectionString,
+ int checkpointIntervalInSeconds, int receiverCredits, int maxPendingMsgsPerPartition, long enqueueTimeFilter) {
+ this.userName = username;
+ this.password = password;
+ this.connectionString = buildConnectionString(username, password, namespace);
+ this.namespace = namespace;
+ this.entityPath = entityPath;
+ this.partitionCount = partitionCount;
+ this.zkConnectionString = zkConnectionString;
+ this.checkpointIntervalInSeconds = checkpointIntervalInSeconds;
+ this.receiverCredits = receiverCredits;
+ this.maxPendingMsgsPerPartition = maxPendingMsgsPerPartition;
+ this.enqueueTimeFilter = enqueueTimeFilter;
+ this.scheme = new EventDataScheme();
+ }
+
+ public String getConnectionString() {
+ return connectionString;
+ }
+
+ public String getNamespace() {
+ return namespace;
+ }
+
+ public String getEntityPath() {
+ return entityPath;
+ }
+
+ public String getZkConnectionString() {
+ return zkConnectionString;
+ }
+
+ public int getCheckpointIntervalInSeconds() {
+ return checkpointIntervalInSeconds;
+ }
+
+ public int getPartitionCount() {
+ return partitionCount;
+ }
+
+ public int getReceiverCredits() {
+ return receiverCredits;
+ }
+
+ public int getMaxPendingMsgsPerPartition() {
+ return maxPendingMsgsPerPartition;
+ }
+
+ public long getEnqueueTimeFilter() {
+ return enqueueTimeFilter;
+ }
+
+ public String getTopologyName() {
+ return topologyName;
+ }
+
+ public void setTopologyName(String value) {
+ topologyName = value;
+ }
+
+ public IEventDataScheme getEventDataScheme() {
+ return scheme;
+ }
+
+ public void setEventDataScheme(IEventDataScheme scheme) {
+ this.scheme = scheme;
+ }
+
+ public List<String> getPartitionList() {
+ List<String> partitionList = new ArrayList<String>();
+
+ for (int i = 0; i < this.partitionCount; i++) {
+ partitionList.add(Integer.toString(i));
+ }
+
+ return partitionList;
+ }
+
+ public void setTargetAddress(String targetFqnAddress) {
+ this.targetFqnAddress = targetFqnAddress;
+ this.connectionString = buildConnectionString(
+ this.userName, this.password, this.namespace, this.targetFqnAddress);
+ }
+
+ public static String buildConnectionString(String username, String password, String namespace) {
+ String targetFqnAddress = "servicebus.windows.net";
+ return buildConnectionString(username, password, namespace, targetFqnAddress);
+ }
+
+ public static String buildConnectionString(String username, String password,
+ String namespace, String targetFqnAddress) {
+ return "amqps://" + username + ":" + encodeString(password)
+ + "@" + namespace + "." + targetFqnAddress;
+ }
+
+ private static String encodeString(String input) {
+ try {
+ return URLEncoder.encode(input, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ //We don't need to throw this exception because the exception won't
+ //happen because of user input. Our unit tests will catch this error.
+ return "";
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutException.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutException.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutException.java
new file mode 100755
index 0000000..0fd6ac4
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutException.java
@@ -0,0 +1,37 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+public class EventHubSpoutException extends Exception {
+
+ public EventHubSpoutException() {
+ super();
+ }
+
+ public EventHubSpoutException(String message) {
+ super(message);
+ }
+
+ public EventHubSpoutException(Throwable cause) {
+ super(cause);
+ }
+
+ public EventHubSpoutException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java
new file mode 100755
index 0000000..bd655d6
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java
@@ -0,0 +1,25 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+public class FieldConstants {
+
+ public static final String PartitionKey = "partitionKey";
+ public static final String Offset = "offset";
+ public static final String Message = "message";
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java
new file mode 100755
index 0000000..c96767d
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java
@@ -0,0 +1,30 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import backtype.storm.tuple.Fields;
+import java.io.Serializable;
+import java.util.List;
+import org.apache.qpid.amqp_1_0.client.Message;
+
+public interface IEventDataScheme extends Serializable {
+
+ List<Object> deserialize(Message message);
+
+ Fields getOutputFields();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java
new file mode 100755
index 0000000..45e9e57
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java
@@ -0,0 +1,35 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import java.util.Map;
+
+import org.apache.storm.eventhubs.client.EventHubException;
+
+public interface IEventHubReceiver {
+
+ void open(IEventHubReceiverFilter filter) throws EventHubException;
+
+ void close();
+
+ boolean isOpen();
+
+ EventData receive(long timeoutInMilliseconds);
+
+ Map getMetricsData();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFactory.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFactory.java
new file mode 100755
index 0000000..fbebdc8
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFactory.java
@@ -0,0 +1,30 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+/**
+ *
+ */
+package org.apache.storm.eventhubs.spout;
+
+import java.io.Serializable;
+
+/**
+ * An abstract factory to generate EventHubReceiver
+ */
+public interface IEventHubReceiverFactory extends Serializable {
+ IEventHubReceiver create(EventHubSpoutConfig config, String partitionId);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFilter.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFilter.java
new file mode 100755
index 0000000..e5b93cf
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFilter.java
@@ -0,0 +1,35 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+/**
+ * The filter to create an EventHubs receiver
+ */
+public interface IEventHubReceiverFilter {
+ /**
+ * Get offset to filter events based on offset
+ * @return null if offset not set
+ */
+ String getOffset();
+
+ /**
+ * Get timestamp to filter events based on enqueue time.
+ * @return 0 if enqueue time is not set
+ */
+ long getEnqueueTime();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionCoordinator.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionCoordinator.java
new file mode 100755
index 0000000..a460681
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionCoordinator.java
@@ -0,0 +1,27 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import java.util.List;
+
+public interface IPartitionCoordinator {
+
+ List<IPartitionManager> getMyPartitionManagers();
+
+ IPartitionManager getPartitionManager(String partitionId);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java
new file mode 100755
index 0000000..ac986d3
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java
@@ -0,0 +1,37 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import java.util.Map;
+
+public interface IPartitionManager {
+
+ void open() throws Exception;
+
+ void close();
+
+ EventData receive();
+
+ void checkpoint();
+
+ void ack(String offset);
+
+ void fail(String offset);
+
+ Map getMetricsData();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManagerFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManagerFactory.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManagerFactory.java
new file mode 100755
index 0000000..dc136eb
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManagerFactory.java
@@ -0,0 +1,33 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+/**
+ *
+ */
+package org.apache.storm.eventhubs.spout;
+
+import java.io.Serializable;
+
+/**
+ * An interface of factory method to create IPartitionManager
+ */
+public interface IPartitionManagerFactory extends Serializable {
+ IPartitionManager create(EventHubSpoutConfig spoutConfig,
+ String partitionId,
+ IStateStore stateStore,
+ IEventHubReceiver receiver);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IStateStore.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IStateStore.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IStateStore.java
new file mode 100755
index 0000000..f0ee2be
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IStateStore.java
@@ -0,0 +1,31 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import java.io.Serializable;
+
+public interface IStateStore extends Serializable {
+
+ public void open();
+
+ public void close();
+
+ public void saveData(String path, String data);
+
+ public String readData(String path);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/MessageId.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/MessageId.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/MessageId.java
new file mode 100755
index 0000000..2247f1f
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/MessageId.java
@@ -0,0 +1,56 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+public class MessageId {
+
+ private final String partitionId;
+ private final String offset;
+ private final long sequenceNumber;
+
+ public MessageId(
+ String partitionId,
+ String offset,
+ long sequenceNumber) {
+ this.partitionId = partitionId;
+ this.offset = offset;
+ this.sequenceNumber = sequenceNumber;
+ }
+
+ public static MessageId create(String partitionId, String offset, long sequenceNumber) {
+ return new MessageId(partitionId, offset, sequenceNumber);
+ }
+
+ public String getPartitionId() {
+ return this.partitionId;
+ }
+
+ public String getOffset() {
+ return this.offset;
+ }
+
+ public Long getSequenceNumber() {
+ return this.sequenceNumber;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("PartitionId: %s, Offset: %s, SequenceNumber: %s",
+ this.partitionId, this.offset, this.sequenceNumber);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/PartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/PartitionManager.java
new file mode 100755
index 0000000..1054742
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/PartitionManager.java
@@ -0,0 +1,101 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.TreeSet;
+
+public class PartitionManager extends SimplePartitionManager {
+ private static final Logger logger = LoggerFactory.getLogger(PartitionManager.class);
+ private final int ehReceiveTimeoutMs = 5000;
+
+ //all sent events are stored in pending
+ private final Map<String, EventData> pending;
+ //all failed events are put in toResend, which is sorted by event's offset
+ private final TreeSet<EventData> toResend;
+
+ public PartitionManager(
+ EventHubSpoutConfig spoutConfig,
+ String partitionId,
+ IStateStore stateStore,
+ IEventHubReceiver receiver) {
+
+ super(spoutConfig, partitionId, stateStore, receiver);
+
+ this.pending = new LinkedHashMap<String, EventData>();
+ this.toResend = new TreeSet<EventData>();
+ }
+
+ @Override
+ public EventData receive() {
+ if(pending.size() >= config.getMaxPendingMsgsPerPartition()) {
+ return null;
+ }
+
+ EventData eventData;
+ if (toResend.isEmpty()) {
+ eventData = receiver.receive(ehReceiveTimeoutMs);
+ } else {
+ eventData = toResend.pollFirst();
+ }
+
+ if (eventData != null) {
+ lastOffset = eventData.getMessageId().getOffset();
+ pending.put(lastOffset, eventData);
+ }
+
+ return eventData;
+ }
+
+ @Override
+ public void ack(String offset) {
+ pending.remove(offset);
+ }
+
+ @Override
+ public void fail(String offset) {
+ logger.warn("fail on " + offset);
+ EventData eventData = pending.remove(offset);
+ toResend.add(eventData);
+ }
+
+ @Override
+ protected String getCompletedOffset() {
+ String offset = null;
+
+ if(pending.size() > 0) {
+ //find the smallest offset in pending list
+ offset = pending.keySet().iterator().next();
+ }
+ if(toResend.size() > 0) {
+ //find the smallest offset in toResend list
+ String offset2 = toResend.first().getMessageId().getOffset();
+ if(offset == null || offset2.compareTo(offset) < 0) {
+ offset = offset2;
+ }
+ }
+ if(offset == null) {
+ offset = lastOffset;
+ }
+ return offset;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java
new file mode 100755
index 0000000..bcbcbac
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java
@@ -0,0 +1,136 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.eventhubs.client.Constants;
+
+/**
+ * A simple partition manager that does not re-send failed messages
+ */
+public class SimplePartitionManager implements IPartitionManager {
+ private static final Logger logger = LoggerFactory.getLogger(SimplePartitionManager.class);
+ private static final String statePathPrefix = "/eventhubspout";
+
+ protected final IEventHubReceiver receiver;
+ protected String lastOffset = "-1";
+ protected String committedOffset = "-1";
+
+ protected final EventHubSpoutConfig config;
+ private final String partitionId;
+ private final IStateStore stateStore;
+ private final String statePath;
+
+ public SimplePartitionManager(
+ EventHubSpoutConfig spoutConfig,
+ String partitionId,
+ IStateStore stateStore,
+ IEventHubReceiver receiver) {
+ this.receiver = receiver;
+ this.config = spoutConfig;
+ this.partitionId = partitionId;
+ this.statePath = this.getPartitionStatePath();
+ this.stateStore = stateStore;
+ }
+
+ @Override
+ public void open() throws Exception {
+
+ //read from state store, if not found, use startingOffset
+ String offset = stateStore.readData(statePath);
+ logger.info("read offset from state store: " + offset);
+ if(offset == null) {
+ offset = Constants.DefaultStartingOffset;
+ }
+
+ EventHubReceiverFilter filter = new EventHubReceiverFilter();
+ if (offset.equals(Constants.DefaultStartingOffset)
+ && config.getEnqueueTimeFilter() != 0) {
+ filter.setEnqueueTime(config.getEnqueueTimeFilter());
+ }
+ else {
+ filter.setOffset(offset);
+ }
+
+ receiver.open(filter);
+ }
+
+ @Override
+ public void close() {
+ this.receiver.close();
+ this.checkpoint();
+ }
+
+ @Override
+ public void checkpoint() {
+ String completedOffset = getCompletedOffset();
+ if(!committedOffset.equals(completedOffset)) {
+ logger.info("saving state " + completedOffset);
+ stateStore.saveData(statePath, completedOffset);
+ committedOffset = completedOffset;
+ }
+ }
+
+ protected String getCompletedOffset() {
+ return lastOffset;
+ }
+
+ @Override
+ public EventData receive() {
+ EventData eventData = receiver.receive(5000);
+ if (eventData != null) {
+ lastOffset = eventData.getMessageId().getOffset();
+ }
+ return eventData;
+ }
+
+ @Override
+ public void ack(String offset) {
+ //do nothing
+ }
+
+ @Override
+ public void fail(String offset) {
+ logger.warn("fail on " + offset);
+ //do nothing
+ }
+
+ private String getPartitionStatePath() {
+
+ // Partition state path =
+ // "/{prefix}/{topologyName}/{namespace}/{entityPath}/partitions/{partitionId}/state";
+ String namespace = config.getNamespace();
+ String entityPath = config.getEntityPath();
+ String topologyName = config.getTopologyName();
+
+ String partitionStatePath = statePathPrefix + "/" + topologyName + "/" + namespace + "/" + entityPath + "/partitions/" + this.partitionId;
+
+ logger.info("partition state path: " + partitionStatePath);
+
+ return partitionStatePath;
+ }
+
+ @Override
+ public Map getMetricsData() {
+ return receiver.getMetricsData();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StaticPartitionCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StaticPartitionCoordinator.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StaticPartitionCoordinator.java
new file mode 100755
index 0000000..3f5f156
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StaticPartitionCoordinator.java
@@ -0,0 +1,85 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.eventhubs.client.Constants;
+
+public class StaticPartitionCoordinator implements IPartitionCoordinator {
+
+ private static final Logger logger = LoggerFactory.getLogger(StaticPartitionCoordinator.class);
+
+ protected final EventHubSpoutConfig config;
+ protected final int taskIndex;
+ protected final int totalTasks;
+ protected final List<IPartitionManager> partitionManagers;
+ protected final Map<String, IPartitionManager> partitionManagerMap;
+ protected final IStateStore stateStore;
+
+ public StaticPartitionCoordinator(
+ EventHubSpoutConfig spoutConfig,
+ int taskIndex,
+ int totalTasks,
+ IStateStore stateStore,
+ IPartitionManagerFactory pmFactory,
+ IEventHubReceiverFactory recvFactory) {
+
+ this.config = spoutConfig;
+ this.taskIndex = taskIndex;
+ this.totalTasks = totalTasks;
+ this.stateStore = stateStore;
+ List<String> partitionIds = calculateParititionIdsToOwn();
+ partitionManagerMap = new HashMap<String, IPartitionManager>();
+ partitionManagers = new ArrayList<IPartitionManager>();
+
+ for (String partitionId : partitionIds) {
+ IEventHubReceiver receiver = recvFactory.create(config, partitionId);
+ IPartitionManager partitionManager = pmFactory.create(
+ config, partitionId, stateStore, receiver);
+ partitionManagerMap.put(partitionId, partitionManager);
+ partitionManagers.add(partitionManager);
+ }
+ }
+
+ @Override
+ public List<IPartitionManager> getMyPartitionManagers() {
+ return partitionManagers;
+ }
+
+ @Override
+ public IPartitionManager getPartitionManager(String partitionId) {
+ return partitionManagerMap.get(partitionId);
+ }
+
+ protected List<String> calculateParititionIdsToOwn() {
+ List<String> taskPartitions = new ArrayList<String>();
+ for (int i = this.taskIndex; i < config.getPartitionCount(); i += this.totalTasks) {
+ taskPartitions.add(Integer.toString(i));
+ logger.info(String.format("taskIndex %d owns partitionId %d.", this.taskIndex, i));
+ }
+
+ return taskPartitions;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/ZookeeperStateStore.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/ZookeeperStateStore.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/ZookeeperStateStore.java
new file mode 100755
index 0000000..6af9df6
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/ZookeeperStateStore.java
@@ -0,0 +1,95 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZookeeperStateStore implements IStateStore {
+ private static final long serialVersionUID = 1L;
+ private static final Logger logger = LoggerFactory.getLogger(ZookeeperStateStore.class);
+
+ private final String zookeeperConnectionString;
+ private final CuratorFramework curatorFramework;
+
+ public ZookeeperStateStore(String zookeeperConnectionString) {
+ this(zookeeperConnectionString, 3, 100);
+ }
+
+ public ZookeeperStateStore(String connectionString, int retries, int retryInterval) {
+ if (connectionString == null) {
+ zookeeperConnectionString = "localhost:2181";
+ } else {
+ zookeeperConnectionString = connectionString;
+ }
+
+ RetryPolicy retryPolicy = new RetryNTimes(retries, retryInterval);
+ curatorFramework = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
+ }
+
+ @Override
+ public void open() {
+ curatorFramework.start();
+ }
+
+ @Override
+ public void close() {
+ curatorFramework.close();
+ }
+
+ @Override
+ public void saveData(String statePath, String data) {
+ data = data == null ? "" : data;
+ byte[] bytes = data.getBytes();
+
+ try {
+ if (curatorFramework.checkExists().forPath(statePath) == null) {
+ curatorFramework.create().creatingParentsIfNeeded().forPath(statePath, bytes);
+ } else {
+ curatorFramework.setData().forPath(statePath, bytes);
+ }
+
+ logger.info(String.format("data was saved. path: %s, data: %s.", statePath, data));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public String readData(String statePath) {
+ try {
+ if (curatorFramework.checkExists().forPath(statePath) == null) {
+ // do we want to throw an exception if path doesn't exist??
+ return null;
+ } else {
+ byte[] bytes = curatorFramework.getData().forPath(statePath);
+ String data = new String(bytes);
+
+ logger.info(String.format("data was retrieved. path: %s, data: %s.", statePath, data));
+
+ return data;
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Coordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Coordinator.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Coordinator.java
new file mode 100755
index 0000000..a43193d
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Coordinator.java
@@ -0,0 +1,60 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import storm.trident.spout.IOpaquePartitionedTridentSpout;
+import storm.trident.spout.IPartitionedTridentSpout;
+
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+
+public class Coordinator implements IPartitionedTridentSpout.Coordinator<Partitions>,
+ IOpaquePartitionedTridentSpout.Coordinator<Partitions> {
+ private static final Logger logger = LoggerFactory.getLogger(Coordinator.class);
+ private final EventHubSpoutConfig spoutConfig;
+ Partitions partitions;
+
+ public Coordinator(EventHubSpoutConfig spoutConfig) {
+ this.spoutConfig = spoutConfig;
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public Partitions getPartitionsForBatch() {
+ if(partitions != null) {
+ return partitions;
+ }
+
+ partitions = new Partitions();
+ for(int i=0; i<spoutConfig.getPartitionCount(); ++i) {
+ partitions.addPartition(new Partition(spoutConfig, Integer.toString(i)));
+ }
+ logger.info("created partitions, size=" + spoutConfig.getPartitionCount());
+ return partitions;
+ }
+
+ @Override
+ public boolean isReady(long txid) {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java
new file mode 100755
index 0000000..fbe779d
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java
@@ -0,0 +1,35 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import java.util.List;
+
+import org.apache.storm.eventhubs.spout.EventData;
+
+public interface ITridentPartitionManager {
+ boolean open(String offset);
+ void close();
+
+ /**
+ * receive a batch of messages from EvenHub up to "count" messages
+ * @param offset the starting offset
+ * @param count max number of messages in this batch
+ * @return list of EventData, if failed to receive, return empty list
+ */
+ public List<EventData> receiveBatch(String offset, int count);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManagerFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManagerFactory.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManagerFactory.java
new file mode 100755
index 0000000..5804e28
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManagerFactory.java
@@ -0,0 +1,26 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import java.io.Serializable;
+
+import org.apache.storm.eventhubs.spout.IEventHubReceiver;
+
+public interface ITridentPartitionManagerFactory extends Serializable {
+ ITridentPartitionManager create(IEventHubReceiver receiver);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java
new file mode 100755
index 0000000..c7bd8c3
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java
@@ -0,0 +1,69 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+import org.apache.storm.eventhubs.spout.IEventHubReceiverFactory;
+
+import storm.trident.operation.TridentCollector;
+import storm.trident.spout.IOpaquePartitionedTridentSpout;
+import storm.trident.topology.TransactionAttempt;
+
+/**
+ * A thin wrapper of TransactionalTridentEventHubEmitter for OpaqueTridentEventHubSpout
+ */
+public class OpaqueTridentEventHubEmitter implements IOpaquePartitionedTridentSpout.Emitter<Partitions, Partition, Map> {
+ private final TransactionalTridentEventHubEmitter transactionalEmitter;
+ public OpaqueTridentEventHubEmitter(EventHubSpoutConfig spoutConfig) {
+ transactionalEmitter = new TransactionalTridentEventHubEmitter(spoutConfig);
+ }
+
+ public OpaqueTridentEventHubEmitter(EventHubSpoutConfig spoutConfig,
+ int batchSize,
+ ITridentPartitionManagerFactory pmFactory,
+ IEventHubReceiverFactory recvFactory) {
+ transactionalEmitter = new TransactionalTridentEventHubEmitter(spoutConfig,
+ batchSize,
+ pmFactory,
+ recvFactory);
+ }
+
+ @Override
+ public void close() {
+ transactionalEmitter.close();
+ }
+
+ @Override
+ public Map emitPartitionBatch(TransactionAttempt attempt, TridentCollector collector,
+ Partition partition, Map meta) {
+ return transactionalEmitter.emitPartitionBatchNew(attempt, collector, partition, meta);
+ }
+
+ @Override
+ public List<Partition> getOrderedPartitions(Partitions partitions) {
+ return transactionalEmitter.getOrderedPartitions(partitions);
+ }
+
+ @Override
+ public void refreshPartitions(List<Partition> partitionList) {
+ transactionalEmitter.refreshPartitions(partitionList);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubSpout.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubSpout.java
new file mode 100755
index 0000000..46084ef
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubSpout.java
@@ -0,0 +1,64 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import java.util.Map;
+
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+import org.apache.storm.eventhubs.spout.IEventDataScheme;
+
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Fields;
+import storm.trident.spout.IOpaquePartitionedTridentSpout;
+
+/**
+ * Opaque Trident EventHubs Spout
+ */
+public class OpaqueTridentEventHubSpout implements IOpaquePartitionedTridentSpout<Partitions, Partition, Map> {
+ private static final long serialVersionUID = 1L;
+ private final IEventDataScheme scheme;
+ private final EventHubSpoutConfig spoutConfig;
+
+ public OpaqueTridentEventHubSpout(EventHubSpoutConfig config) {
+ spoutConfig = config;
+ scheme = spoutConfig.getEventDataScheme();
+ }
+
+ @Override
+ public Map getComponentConfiguration() {
+ return null;
+ }
+
+ @Override
+ public IOpaquePartitionedTridentSpout.Coordinator<Partitions> getCoordinator(
+ Map conf, TopologyContext context) {
+ return new org.apache.storm.eventhubs.trident.Coordinator(spoutConfig);
+ }
+
+ @Override
+ public IOpaquePartitionedTridentSpout.Emitter<Partitions, Partition, Map> getEmitter(
+ Map conf, TopologyContext context) {
+ return new OpaqueTridentEventHubEmitter(spoutConfig);
+ }
+
+ @Override
+ public Fields getOutputFields() {
+ return scheme.getOutputFields();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partition.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partition.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partition.java
new file mode 100755
index 0000000..8b166cd
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partition.java
@@ -0,0 +1,39 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import java.io.Serializable;
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+import storm.trident.spout.ISpoutPartition;
+
+/**
+ * Represents an EventHub partition
+ */
+public class Partition implements ISpoutPartition, Serializable {
+ private static final long serialVersionUID = 1L;
+ String partitionId;
+
+ public Partition(EventHubSpoutConfig config, String partitionId) {
+ this.partitionId = partitionId;
+ }
+
+ @Override
+ public String getId() {
+ return partitionId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partitions.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partitions.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partitions.java
new file mode 100755
index 0000000..235f5b6
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partitions.java
@@ -0,0 +1,41 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Represents all EventHub partitions a spout is receiving messages from.
+ */
+public class Partitions implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private List<Partition> partitionList;
+ public Partitions() {
+ partitionList = new ArrayList<Partition>();
+ }
+
+ public void addPartition(Partition partition) {
+ partitionList.add(partition);
+ }
+
+ public List<Partition> getPartitions() {
+ return partitionList;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java
new file mode 100755
index 0000000..2b92c3c
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java
@@ -0,0 +1,167 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.eventhubs.spout.EventData;
+import org.apache.storm.eventhubs.spout.EventHubReceiverImpl;
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+import org.apache.storm.eventhubs.spout.IEventHubReceiver;
+import org.apache.storm.eventhubs.spout.IEventHubReceiverFactory;
+import org.apache.storm.eventhubs.client.Constants;
+
+import storm.trident.operation.TridentCollector;
+import storm.trident.spout.IOpaquePartitionedTridentSpout;
+import storm.trident.spout.IPartitionedTridentSpout;
+import storm.trident.topology.TransactionAttempt;
+
+
+public class TransactionalTridentEventHubEmitter
+ implements IPartitionedTridentSpout.Emitter<Partitions, Partition, Map> {
+ private static final Logger logger = LoggerFactory.getLogger(TransactionalTridentEventHubEmitter.class);
+ private final int batchSize;
+ private final EventHubSpoutConfig spoutConfig;
+ private Map<String, ITridentPartitionManager> pmMap;
+ private ITridentPartitionManagerFactory pmFactory;
+ private IEventHubReceiverFactory recvFactory;
+
+ public TransactionalTridentEventHubEmitter(EventHubSpoutConfig spoutConfig) {
+ //use batch size that matches the default credit size
+ this(spoutConfig, spoutConfig.getReceiverCredits(), null, null);
+ }
+
+ public TransactionalTridentEventHubEmitter(final EventHubSpoutConfig spoutConfig,
+ int batchSize,
+ ITridentPartitionManagerFactory pmFactory,
+ IEventHubReceiverFactory recvFactory) {
+ this.spoutConfig = spoutConfig;
+ this.batchSize = batchSize;
+ this.pmFactory = pmFactory;
+ this.recvFactory = recvFactory;
+ pmMap = new HashMap<String, ITridentPartitionManager>();
+ if(this.pmFactory == null) {
+ this.pmFactory = new ITridentPartitionManagerFactory() {
+ @Override
+ public ITridentPartitionManager create(IEventHubReceiver receiver) {
+ return new TridentPartitionManager(spoutConfig, receiver);
+ }
+ };
+ }
+ if(this.recvFactory == null) {
+ this.recvFactory = new IEventHubReceiverFactory() {
+ @Override
+ public IEventHubReceiver create(EventHubSpoutConfig config,
+ String partitionId) {
+ return new EventHubReceiverImpl(config, partitionId);
+ }
+ };
+ }
+ }
+
+ @Override
+ public void close() {
+ for(ITridentPartitionManager pm: pmMap.values()) {
+ pm.close();
+ }
+ }
+
+ /**
+ * Check if partition manager for a given partiton is created
+ * if not, create it.
+ * @param partition
+ */
+ private ITridentPartitionManager getOrCreatePartitionManager(Partition partition) {
+ ITridentPartitionManager pm;
+ if(!pmMap.containsKey(partition.getId())) {
+ IEventHubReceiver receiver = recvFactory.create(spoutConfig, partition.getId());
+ pm = pmFactory.create(receiver);
+ pmMap.put(partition.getId(), pm);
+ }
+ else {
+ pm = pmMap.get(partition.getId());
+ }
+ return pm;
+ }
+
+ @Override
+ public void emitPartitionBatch(TransactionAttempt attempt,
+ TridentCollector collector, Partition partition, Map meta) {
+ String offset = (String)meta.get("offset");
+ int count = Integer.parseInt((String)meta.get("count"));
+ logger.info("re-emit for partition " + partition.getId() + ", offset=" + offset + ", count=" + count);
+ ITridentPartitionManager pm = getOrCreatePartitionManager(partition);
+ List<EventData> listEvents = pm.receiveBatch(offset, count);
+ if(listEvents.size() != count) {
+ logger.error("failed to refetch eventhub messages, new count=" + listEvents.size());
+ return;
+ }
+
+ for(EventData ed: listEvents) {
+ List<Object> tuples =
+ spoutConfig.getEventDataScheme().deserialize(ed.getMessage());
+ collector.emit(tuples);
+ }
+ }
+
+ @Override
+ public Map emitPartitionBatchNew(TransactionAttempt attempt,
+ TridentCollector collector, Partition partition, Map meta) {
+ ITridentPartitionManager pm = getOrCreatePartitionManager(partition);
+ String offset = Constants.DefaultStartingOffset;
+ if(meta != null && meta.containsKey("nextOffset")) {
+ offset = (String)meta.get("nextOffset");
+ }
+ //logger.info("emit for partition " + partition.getId() + ", offset=" + offset);
+ String nextOffset = offset;
+
+ List<EventData> listEvents = pm.receiveBatch(offset, batchSize);
+
+ for(EventData ed: listEvents) {
+ //update nextOffset;
+ nextOffset = ed.getMessageId().getOffset();
+ List<Object> tuples =
+ spoutConfig.getEventDataScheme().deserialize(ed.getMessage());
+ collector.emit(tuples);
+ }
+ //logger.info("emitted new batches: " + listEvents.size());
+
+ Map newMeta = new HashMap();
+ newMeta.put("offset", offset);
+ newMeta.put("nextOffset", nextOffset);
+ newMeta.put("count", ""+listEvents.size());
+ return newMeta;
+ }
+
+ @Override
+ public List<Partition> getOrderedPartitions(Partitions partitions) {
+ return partitions.getPartitions();
+ }
+
+ @Override
+ public void refreshPartitions(List<Partition> partitionList) {
+ //partition info does not change in EventHub
+ return;
+ }
+
+}
[3/4] storm git commit: STORM-583: initial import of donated code
Posted by pt...@apache.org.
STORM-583: initial import of donated code
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/eff6cfa4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/eff6cfa4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/eff6cfa4
Branch: refs/heads/master
Commit: eff6cfa4d9bc6d991fb4e94be72040415f676187
Parents: b4351ed
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri Apr 24 15:50:19 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri Apr 24 15:50:19 2015 -0400
----------------------------------------------------------------------
external/storm-eventhubs/README.md | 37 +++
external/storm-eventhubs/pom.xml | 122 +++++++++
.../storm/eventhubs/bolt/EventHubBolt.java | 81 ++++++
.../client/ConnectionStringBuilder.java | 116 +++++++++
.../storm/eventhubs/client/Constants.java | 32 +++
.../storm/eventhubs/client/EventHubClient.java | 92 +++++++
.../eventhubs/client/EventHubConsumerGroup.java | 72 ++++++
.../eventhubs/client/EventHubException.java | 37 +++
.../eventhubs/client/EventHubReceiver.java | 139 ++++++++++
.../eventhubs/client/EventHubSendClient.java | 70 +++++
.../storm/eventhubs/client/EventHubSender.java | 95 +++++++
.../storm/eventhubs/client/SelectorFilter.java | 38 +++
.../eventhubs/client/SelectorFilterWriter.java | 64 +++++
.../eventhubs/samples/AtMostOnceEventCount.java | 54 ++++
.../storm/eventhubs/samples/EventCount.java | 155 +++++++++++
.../storm/eventhubs/samples/EventHubLoop.java | 51 ++++
.../samples/OpaqueTridentEventCount.java | 53 ++++
.../samples/TransactionalTridentEventCount.java | 81 ++++++
.../eventhubs/samples/bolt/GlobalCountBolt.java | 83 ++++++
.../samples/bolt/PartialCountBolt.java | 63 +++++
.../apache/storm/eventhubs/spout/EventData.java | 48 ++++
.../storm/eventhubs/spout/EventDataScheme.java | 55 ++++
.../eventhubs/spout/EventHubReceiverFilter.java | 56 ++++
.../eventhubs/spout/EventHubReceiverImpl.java | 150 +++++++++++
.../storm/eventhubs/spout/EventHubSpout.java | 258 +++++++++++++++++++
.../eventhubs/spout/EventHubSpoutConfig.java | 165 ++++++++++++
.../eventhubs/spout/EventHubSpoutException.java | 37 +++
.../storm/eventhubs/spout/FieldConstants.java | 25 ++
.../storm/eventhubs/spout/IEventDataScheme.java | 30 +++
.../eventhubs/spout/IEventHubReceiver.java | 35 +++
.../spout/IEventHubReceiverFactory.java | 30 +++
.../spout/IEventHubReceiverFilter.java | 35 +++
.../eventhubs/spout/IPartitionCoordinator.java | 27 ++
.../eventhubs/spout/IPartitionManager.java | 37 +++
.../spout/IPartitionManagerFactory.java | 33 +++
.../storm/eventhubs/spout/IStateStore.java | 31 +++
.../apache/storm/eventhubs/spout/MessageId.java | 56 ++++
.../storm/eventhubs/spout/PartitionManager.java | 101 ++++++++
.../eventhubs/spout/SimplePartitionManager.java | 136 ++++++++++
.../spout/StaticPartitionCoordinator.java | 85 ++++++
.../eventhubs/spout/ZookeeperStateStore.java | 95 +++++++
.../storm/eventhubs/trident/Coordinator.java | 60 +++++
.../trident/ITridentPartitionManager.java | 35 +++
.../ITridentPartitionManagerFactory.java | 26 ++
.../trident/OpaqueTridentEventHubEmitter.java | 69 +++++
.../trident/OpaqueTridentEventHubSpout.java | 64 +++++
.../storm/eventhubs/trident/Partition.java | 39 +++
.../storm/eventhubs/trident/Partitions.java | 41 +++
.../TransactionalTridentEventHubEmitter.java | 167 ++++++++++++
.../TransactionalTridentEventHubSpout.java | 66 +++++
.../trident/TridentPartitionManager.java | 91 +++++++
.../src/main/resources/Config.properties | 27 ++
.../eventhubs/spout/EventHubReceiverMock.java | 105 ++++++++
.../spout/EventHubSpoutCallerMock.java | 96 +++++++
.../spout/PartitionManagerCallerMock.java | 105 ++++++++
.../spout/SpoutOutputCollectorMock.java | 61 +++++
.../storm/eventhubs/spout/StateStoreMock.java | 54 ++++
.../storm/eventhubs/spout/TestEventData.java | 47 ++++
.../eventhubs/spout/TestEventHubSpout.java | 70 +++++
.../eventhubs/spout/TestPartitionManager.java | 117 +++++++++
.../TestTransactionalTridentEmitter.java | 93 +++++++
.../eventhubs/trident/TridentCollectorMock.java | 52 ++++
62 files changed, 4545 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/README.md
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/README.md b/external/storm-eventhubs/README.md
new file mode 100755
index 0000000..ea8076b
--- /dev/null
+++ b/external/storm-eventhubs/README.md
@@ -0,0 +1,37 @@
+storm-eventhubs
+=====================
+
+Storm spout and bolt implementation for Microsoft Azure Eventhubs
+
+### build ###
+ mvn clean package
+
+### run sample topology ###
+To run the sample topology, you need to modify the config.properties file with
+the eventhubs configurations. Here is an example:
+
+ eventhubspout.username = [username: policy name in EventHubs Portal]
+ eventhubspout.password = [password: shared access key in EventHubs Portal]
+ eventhubspout.namespace = [namespace]
+ eventhubspout.entitypath = [entitypath]
+ eventhubspout.partitions.count = [partitioncount]
+
+ # if not provided, will use storm's zookeeper settings
+ # zookeeper.connectionstring=zookeeper0:2181,zookeeper1:2181,zookeeper2:2181
+
+ eventhubspout.checkpoint.interval = 10
+ eventhub.receiver.credits = 1024
+
+Then you can use storm.cmd to submit the sample topology:
+ storm jar {jarfile} com.microsoft.eventhubs.samples.EventCount {topologyname} {spoutconffile}
+ where the {jarfile} should be: eventhubs-storm-spout-{version}-jar-with-dependencies.jar
+
+### Run EventHubSendClient ###
+We have included a simple EventHubs send client for testing purpose. You can run the client like this:
+ java -cp .\target\eventhubs-storm-spout-{version}-jar-with-dependencies.jar com.microsoft.eventhubs.client.EventHubSendClient
+ [username] [password] [entityPath] [partitionId] [messageSize] [messageCount]
+If you want to send messages to all partitions, use "-1" as partitionId.
+
+### Windows Azure Eventhubs ###
+ http://azure.microsoft.com/en-us/services/event-hubs/
+
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/pom.xml b/external/storm-eventhubs/pom.xml
new file mode 100755
index 0000000..bc11dd1
--- /dev/null
+++ b/external/storm-eventhubs/pom.xml
@@ -0,0 +1,122 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>0.10.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>storm-eventhubs</artifactId>
+ <version>0.10.0-SNAPSHOT</version>
+ <packaging>jar</packaging>
+ <name>storm-eventhubs</name>
+ <description>EventHubs Storm Spout</description>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <qpid.version>0.28</qpid.version>
+ </properties>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.4.1</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>attached</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ <archive>
+ <manifest>
+ <mainClass>org.apache.storm.eventhubs.samples.EventCount</mainClass>
+ </manifest>
+ </archive>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <configuration>
+ <tasks>
+ <copy file="src/main/resources/config.properties" tofile="target/eventhubs-config.properties"/>
+ </tasks>
+ </configuration>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-client</artifactId>
+ <version>${qpid.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-amqp-1-0-client-jms</artifactId>
+ <version>${qpid.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${project.version}</version>
+ <!-- keep storm out of the jar-with-dependencies -->
+ <type>jar</type>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ <version>${curator.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.11</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
new file mode 100755
index 0000000..8016be3
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
@@ -0,0 +1,81 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.bolt;
+
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.eventhubs.client.EventHubClient;
+import org.apache.storm.eventhubs.client.EventHubException;
+import org.apache.storm.eventhubs.client.EventHubSender;
+
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.BasicOutputCollector;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseBasicBolt;
+import backtype.storm.tuple.Tuple;
+
+/**
+ * A bolt that writes message to EventHub.
+ * We assume the incoming tuple has only one field which is a string.
+ */
+public class EventHubBolt extends BaseBasicBolt {
+ private static final long serialVersionUID = 1L;
+ private static final Logger logger = LoggerFactory
+ .getLogger(EventHubBolt.class);
+
+ private EventHubSender sender;
+ private String connectionString;
+ private String entityPath;
+
+ public EventHubBolt(String connectionString, String entityPath) {
+ this.connectionString = connectionString;
+ this.entityPath = entityPath;
+ }
+
+ @Override
+ public void prepare(Map config, TopologyContext context) {
+ try {
+ EventHubClient eventHubClient = EventHubClient.create(connectionString, entityPath);
+ sender = eventHubClient.createPartitionSender(null);
+ }
+ catch(Exception ex) {
+ logger.error(ex.getMessage());
+ throw new RuntimeException(ex);
+ }
+
+ }
+
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ try {
+ sender.send((String)tuple.getValue(0));
+ }
+ catch(EventHubException ex) {
+ logger.error(ex.getMessage());
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/ConnectionStringBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/ConnectionStringBuilder.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/ConnectionStringBuilder.java
new file mode 100755
index 0000000..518c88d
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/ConnectionStringBuilder.java
@@ -0,0 +1,116 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.client;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.net.URLDecoder;
+import java.net.URLStreamHandler;
+
+public class ConnectionStringBuilder {
+
+ private final String connectionString;
+
+ private String host;
+ private int port;
+ private String userName;
+ private String password;
+ private boolean ssl;
+
+ // amqps://[username]:[password]@[namespace].servicebus.windows.net/
+ public ConnectionStringBuilder(String connectionString) throws EventHubException {
+ this.connectionString = connectionString;
+ this.initialize();
+ }
+
+ public String getHost() {
+ return this.host;
+ }
+
+ public void setHost(String value) {
+ this.host = value;
+ }
+
+ public int getPort() {
+ return this.port;
+ }
+
+ public void setPort(int value) {
+ this.port = value;
+ }
+
+ public String getUserName() {
+ return this.userName;
+ }
+
+ public void setUserName(String value) {
+ this.userName = value;
+ }
+
+ public String getPassword() {
+ return this.password;
+ }
+
+ public void setPassword(String value) {
+ this.password = value;
+ }
+
+ public boolean getSsl() {
+ return this.ssl;
+ }
+
+ public void setSsl(boolean value) {
+ this.ssl = value;
+ }
+
+ private void initialize() throws EventHubException {
+
+ URL url;
+ try {
+ url = new URL(null, this.connectionString, new NullURLStreamHandler());
+ } catch (MalformedURLException e) {
+ throw new EventHubException("connectionString is not valid.", e);
+ }
+
+ String protocol = url.getProtocol();
+ this.ssl = protocol.equalsIgnoreCase(Constants.SslScheme);
+ this.host = url.getHost();
+ this.port = url.getPort();
+
+ if (this.port == -1) {
+ this.port = this.ssl ? Constants.DefaultSslPort : Constants.DefaultPort;
+ }
+
+ String userInfo = url.getUserInfo();
+ if (userInfo != null) {
+ String[] credentials = userInfo.split(":", 2);
+ this.userName = URLDecoder.decode(credentials[0]);
+ this.password = URLDecoder.decode(credentials[1]);
+ }
+ }
+
+ class NullURLStreamHandler extends URLStreamHandler {
+
+ @Override
+ protected URLConnection openConnection(URL u) throws IOException {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/Constants.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/Constants.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/Constants.java
new file mode 100755
index 0000000..d87ad53
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/Constants.java
@@ -0,0 +1,32 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.client;
+
+public class Constants {
+
+ public static final String DefaultStartingOffset = "-1";
+ public static final String SelectorFilterName = "apache.org:selector-filter:string";
+ public static final String OffsetFilterFormatString = "amqp.annotation.x-opt-offset > '%s'";
+ public static final String EnqueueTimeFilterFormatString = "amqp.annotation.x-opt-enqueuedtimeutc > %d";
+ public static final String ConsumerAddressFormatString = "%s/ConsumerGroups/%s/Partitions/%s";
+ public static final String DestinationAddressFormatString = "%s/Partitions/%s";
+
+ public static final String SslScheme = "amqps";
+ public static final int DefaultPort = 5672;
+ public static final int DefaultSslPort = 5671;
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubClient.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubClient.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubClient.java
new file mode 100755
index 0000000..e06091d
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubClient.java
@@ -0,0 +1,92 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.client;
+
+import org.apache.qpid.amqp_1_0.client.Connection;
+import org.apache.qpid.amqp_1_0.client.ConnectionErrorException;
+import org.apache.qpid.amqp_1_0.client.ConnectionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EventHubClient {
+
+ private static final String DefaultConsumerGroupName = "$default";
+ private static final Logger logger = LoggerFactory.getLogger(EventHubClient.class);
+ private static final long ConnectionSyncTimeout = 60000L;
+
+ private final String connectionString;
+ private final String entityPath;
+ private final Connection connection;
+
+ private EventHubClient(String connectionString, String entityPath) throws EventHubException {
+ this.connectionString = connectionString;
+ this.entityPath = entityPath;
+ this.connection = this.createConnection();
+ }
+
+ /**
+ * creates a new instance of EventHubClient using the supplied connection string and entity path.
+ *
+ * @param connectionString connection string to the namespace of event hubs. connection string format:
+ * amqps://{userId}:{password}@{namespaceName}.servicebus.windows.net
+ * @param entityPath the name of event hub entity.
+ *
+ * @return EventHubClient
+ * @throws org.apache.storm.eventhubs.client.EventHubException
+ */
+ public static EventHubClient create(String connectionString, String entityPath) throws EventHubException {
+ return new EventHubClient(connectionString, entityPath);
+ }
+
+ public EventHubSender createPartitionSender(String partitionId) throws Exception {
+ return new EventHubSender(this.connection.createSession(), this.entityPath, partitionId);
+ }
+
+ public EventHubConsumerGroup getDefaultConsumerGroup() {
+ return new EventHubConsumerGroup(this.connection, this.entityPath, DefaultConsumerGroupName);
+ }
+
+ public void close() {
+ try {
+ this.connection.close();
+ } catch (ConnectionErrorException e) {
+ logger.error(e.toString());
+ }
+ }
+
+ private Connection createConnection() throws EventHubException {
+ ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder(this.connectionString);
+ Connection clientConnection;
+
+ try {
+ clientConnection = new Connection(
+ connectionStringBuilder.getHost(),
+ connectionStringBuilder.getPort(),
+ connectionStringBuilder.getUserName(),
+ connectionStringBuilder.getPassword(),
+ connectionStringBuilder.getHost(),
+ connectionStringBuilder.getSsl());
+ } catch (ConnectionException e) {
+ logger.error(e.toString());
+ throw new EventHubException(e);
+ }
+ clientConnection.getEndpoint().setSyncTimeout(ConnectionSyncTimeout);
+ SelectorFilterWriter.register(clientConnection.getEndpoint().getDescribedTypeRegistry());
+ return clientConnection;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubConsumerGroup.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubConsumerGroup.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubConsumerGroup.java
new file mode 100755
index 0000000..892ff9c
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubConsumerGroup.java
@@ -0,0 +1,72 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.client;
+
+import org.apache.qpid.amqp_1_0.client.Connection;
+import org.apache.qpid.amqp_1_0.client.ConnectionException;
+import org.apache.qpid.amqp_1_0.client.Session;
+
+public class EventHubConsumerGroup {
+
+ private final Connection connection;
+ private final String entityPath;
+ private final String consumerGroupName;
+
+ private Session session;
+
+ public EventHubConsumerGroup(Connection connection, String entityPath, String consumerGroupName) {
+ this.connection = connection;
+ this.entityPath = entityPath;
+ this.consumerGroupName = consumerGroupName;
+ }
+
+ public EventHubReceiver createReceiver(String partitionId, String startingOffset, int defaultCredits) throws EventHubException {
+ this.ensureSessionCreated();
+
+ if (startingOffset == null || startingOffset.equals("")) {
+ startingOffset = Constants.DefaultStartingOffset;
+ }
+
+ String filterStr = String.format(Constants.OffsetFilterFormatString, startingOffset);
+ return new EventHubReceiver(this.session, this.entityPath, this.consumerGroupName, partitionId, filterStr, defaultCredits);
+ }
+
+ public EventHubReceiver createReceiver(String partitionId, long timeAfter, int defaultCredits) throws EventHubException {
+ this.ensureSessionCreated();
+
+ String filterStr = String.format(Constants.EnqueueTimeFilterFormatString, timeAfter);
+ return new EventHubReceiver(this.session, this.entityPath, this.consumerGroupName, partitionId, filterStr, defaultCredits);
+ }
+
+ public void close() {
+ if (this.session != null) {
+ this.session.close();
+ }
+ }
+
+ synchronized void ensureSessionCreated() throws EventHubException {
+
+ try {
+ if (this.session == null) {
+ this.session = this.connection.createSession();
+ }
+ } catch (ConnectionException e) {
+ throw new EventHubException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubException.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubException.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubException.java
new file mode 100755
index 0000000..3e94573
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubException.java
@@ -0,0 +1,37 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.client;
+
+public class EventHubException extends Exception {
+
+ public EventHubException() {
+ super();
+ }
+
+ public EventHubException(String message) {
+ super(message);
+ }
+
+ public EventHubException(Throwable cause) {
+ super(cause);
+ }
+
+ public EventHubException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubReceiver.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubReceiver.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubReceiver.java
new file mode 100755
index 0000000..c8900a8
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubReceiver.java
@@ -0,0 +1,139 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.client;
+
+import java.util.Collections;
+import java.util.Map;
+import org.apache.qpid.amqp_1_0.client.AcknowledgeMode;
+import org.apache.qpid.amqp_1_0.client.ConnectionErrorException;
+import org.apache.qpid.amqp_1_0.client.Message;
+import org.apache.qpid.amqp_1_0.client.Receiver;
+import org.apache.qpid.amqp_1_0.client.Session;
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.messaging.Filter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class EventHubReceiver {
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(EventHubReceiver.class);
+ private static final String linkName = "eventhubs-receiver-link";
+
+ private final Session session;
+ private final String entityPath;
+ private final String consumerGroupName;
+ private final String partitionId;
+ private final String consumerAddress;
+ private final Map<Symbol, Filter> filters;
+ private final int defaultCredits;
+
+ private Receiver receiver;
+ private boolean isClosed;
+
+ public EventHubReceiver(Session session, String entityPath,
+ String consumerGroupName, String partitionId, String filterStr, int defaultCredits)
+ throws EventHubException {
+
+ this.session = session;
+ this.entityPath = entityPath;
+ this.consumerGroupName = consumerGroupName;
+ this.partitionId = partitionId;
+ this.consumerAddress = this.getConsumerAddress();
+ this.filters = Collections.singletonMap(
+ Symbol.valueOf(Constants.SelectorFilterName),
+ (Filter) new SelectorFilter(filterStr));
+ logger.info("receiver filter string: " + filterStr);
+ this.defaultCredits = defaultCredits;
+
+ this.ensureReceiverCreated();
+ }
+
+ // receive without timeout means wait until a message is delivered.
+ public Message receive() {
+ return this.receive(-1L);
+ }
+
+ public Message receive(long waitTimeInMilliseconds) {
+
+ this.checkIfClosed();
+
+ Message message = this.receiver.receive(waitTimeInMilliseconds);
+
+ if (message != null) {
+ // Let's acknowledge a message although EH service doesn't need it
+ // to avoid AMQP flow issue.
+ receiver.acknowledge(message);
+
+ return message;
+ } else {
+ this.checkError();
+ }
+
+ return null;
+ }
+
+ public void close() {
+ if (!isClosed) {
+ receiver.close();
+ isClosed = true;
+ }
+ }
+
+ private String getConsumerAddress() {
+ return String.format(Constants.ConsumerAddressFormatString,
+ entityPath, consumerGroupName, partitionId);
+ }
+
+ private void ensureReceiverCreated() throws EventHubException {
+ try {
+ logger.info("defaultCredits: " + defaultCredits);
+ receiver = session.createReceiver(consumerAddress,
+ AcknowledgeMode.ALO, linkName, false, filters, null);
+ receiver.setCredit(UnsignedInteger.valueOf(defaultCredits), true);
+ } catch (ConnectionErrorException e) {
+ // caller (EventHubSpout) will log the error
+ throw new EventHubException(e);
+ }
+ }
+
+ private void checkError() {
+ org.apache.qpid.amqp_1_0.type.transport.Error error = this.receiver.getError();
+ if (error != null) {
+ String errorMessage = error.toString();
+ logger.error(errorMessage);
+ this.close();
+
+ throw new RuntimeException(errorMessage);
+ } else {
+ // adding a sleep here to avoid any potential tight-loop issue.
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ logger.error(e.toString());
+ }
+ }
+ }
+
+ private void checkIfClosed() {
+ if (this.isClosed) {
+ throw new RuntimeException("receiver was closed.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSendClient.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSendClient.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSendClient.java
new file mode 100755
index 0000000..ad31cc1
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSendClient.java
@@ -0,0 +1,70 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.client;
+
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+
+public class EventHubSendClient {
+
+ public static void main(String[] args) throws Exception {
+
+ if (args == null || args.length < 7) {
+ throw new IllegalArgumentException(
+ "arguments are missing. [username] [password] [namespace] [entityPath] [partitionId] [messageSize] [messageCount] are required.");
+ }
+
+ String username = args[0];
+ String password = args[1];
+ String namespace = args[2];
+ String entityPath = args[3];
+ String partitionId = args[4];
+ int messageSize = Integer.parseInt(args[5]);
+ int messageCount = Integer.parseInt(args[6]);
+ assert(messageSize > 0);
+ assert(messageCount > 0);
+
+ if (partitionId.equals("-1")) {
+ // -1 means we want to send data to partitions in round-robin fashion.
+ partitionId = null;
+ }
+
+ try {
+ String connectionString = EventHubSpoutConfig.buildConnectionString(username, password, namespace);
+ EventHubClient client = EventHubClient.create(connectionString, entityPath);
+ EventHubSender sender = client.createPartitionSender(partitionId);
+
+ StringBuilder sb = new StringBuilder(messageSize);
+ for(int i=1; i<messageCount+1; ++i) {
+ while(sb.length() < messageSize) {
+ sb.append(" current message: " + i);
+ }
+ sb.setLength(messageSize);
+ sender.send(sb.toString());
+ sb.setLength(0);
+ if(i % 1000 == 0) {
+ System.out.println("Number of messages sent: " + i);
+ }
+ }
+ System.out.println("Total Number of messages sent: " + messageCount);
+ } catch (Exception e) {
+ System.out.println("Exception: " + e.getMessage());
+ }
+
+ System.out.println("done");
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSender.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSender.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSender.java
new file mode 100755
index 0000000..41b1d97
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSender.java
@@ -0,0 +1,95 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.client;
+
+import java.util.concurrent.TimeoutException;
+import org.apache.qpid.amqp_1_0.client.LinkDetachedException;
+import org.apache.qpid.amqp_1_0.client.Message;
+import org.apache.qpid.amqp_1_0.client.Sender;
+import org.apache.qpid.amqp_1_0.client.Session;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.messaging.Data;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EventHubSender {
+
+ private static final Logger logger = LoggerFactory.getLogger(EventHubSender.class);
+
+ private final Session session;
+ private final String entityPath;
+ private final String partitionId;
+ private final String destinationAddress;
+
+ private Sender sender;
+
+ public EventHubSender(Session session, String entityPath, String partitionId) {
+ this.session = session;
+ this.entityPath = entityPath;
+ this.partitionId = partitionId;
+ this.destinationAddress = this.getDestinationAddress();
+ }
+
+ public void send(String data) throws EventHubException {
+ try {
+ if (this.sender == null) {
+ this.ensureSenderCreated();
+ }
+
+ //For interop with other language, convert string to bytes
+ Binary bin = new Binary(data.getBytes());
+ Message message = new Message(new Data(bin));
+ this.sender.send(message);
+
+ } catch (LinkDetachedException e) {
+ logger.error(e.getMessage());
+
+ EventHubException eventHubException = new EventHubException("Sender has been closed");
+ throw eventHubException;
+ } catch (TimeoutException e) {
+ logger.error(e.getMessage());
+
+ EventHubException eventHubException = new EventHubException("Timed out while waiting to get credit to send");
+ throw eventHubException;
+ } catch (Exception e) {
+ logger.error(e.getMessage());
+ }
+ }
+
+ public void close() {
+ try {
+ this.sender.close();
+ } catch (Sender.SenderClosingException e) {
+ logger.error("Closing a sender encountered error: " + e.getMessage());
+ }
+ }
+
+ private String getDestinationAddress() {
+ if (this.partitionId == null || this.partitionId.equals("")) {
+ return this.entityPath;
+ } else {
+ return String.format(Constants.DestinationAddressFormatString, this.entityPath, this.partitionId);
+ }
+ }
+
+ private synchronized void ensureSenderCreated() throws Exception {
+ if (this.sender == null) {
+ this.sender = this.session.createSender(this.destinationAddress);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/SelectorFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/SelectorFilter.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/SelectorFilter.java
new file mode 100755
index 0000000..7869cce
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/SelectorFilter.java
@@ -0,0 +1,38 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.client;
+
+import org.apache.qpid.amqp_1_0.type.messaging.Filter;
+
+public class SelectorFilter implements Filter {
+
+ private final String value;
+
+ public SelectorFilter(String value) {
+ this.value = value;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/SelectorFilterWriter.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/SelectorFilterWriter.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/SelectorFilterWriter.java
new file mode 100755
index 0000000..102b6b6
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/SelectorFilterWriter.java
@@ -0,0 +1,64 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.client;
+
+import org.apache.qpid.amqp_1_0.codec.AbstractDescribedTypeWriter;
+import org.apache.qpid.amqp_1_0.codec.ValueWriter;
+import org.apache.qpid.amqp_1_0.type.UnsignedLong;
+
+public class SelectorFilterWriter extends
+ AbstractDescribedTypeWriter<SelectorFilter> {
+
+ private static final ValueWriter.Factory<SelectorFilter> FACTORY = new ValueWriter.Factory<SelectorFilter>() {
+
+ @Override
+ public ValueWriter<SelectorFilter> newInstance(ValueWriter.Registry registry) {
+ return new SelectorFilterWriter(registry);
+ }
+ };
+
+ private SelectorFilter value;
+
+ public SelectorFilterWriter(final ValueWriter.Registry registry) {
+ super(registry);
+ }
+
+ public static void register(ValueWriter.Registry registry) {
+ registry.register(SelectorFilter.class, FACTORY);
+ }
+
+ @Override
+ protected void onSetValue(final SelectorFilter value) {
+ this.value = value;
+ }
+
+ @Override
+ protected void clear() {
+ value = null;
+ }
+
+ @Override
+ protected Object getDescriptor() {
+ return UnsignedLong.valueOf(0x00000137000000AL);
+ }
+
+ @Override
+ protected ValueWriter<String> createDescribedWriter() {
+ return getRegistry().getValueWriter(value.getValue());
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/AtMostOnceEventCount.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/AtMostOnceEventCount.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/AtMostOnceEventCount.java
new file mode 100755
index 0000000..b0dd33a
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/AtMostOnceEventCount.java
@@ -0,0 +1,54 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples;
+
+import java.io.Serializable;
+
+import org.apache.storm.eventhubs.spout.EventHubSpout;
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+import org.apache.storm.eventhubs.spout.IEventHubReceiver;
+import org.apache.storm.eventhubs.spout.IPartitionManager;
+import org.apache.storm.eventhubs.spout.IPartitionManagerFactory;
+import org.apache.storm.eventhubs.spout.IStateStore;
+import org.apache.storm.eventhubs.spout.SimplePartitionManager;
+
+public class AtMostOnceEventCount extends EventCount implements Serializable {
+ @Override
+ protected EventHubSpout createEventHubSpout() {
+ IPartitionManagerFactory pmFactory = new IPartitionManagerFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IPartitionManager create(EventHubSpoutConfig spoutConfig,
+ String partitionId, IStateStore stateStore,
+ IEventHubReceiver receiver) {
+ return new SimplePartitionManager(spoutConfig, partitionId,
+ stateStore, receiver);
+ }
+ };
+ EventHubSpout eventHubSpout = new EventHubSpout(
+ spoutConfig, null, pmFactory, null);
+ return eventHubSpout;
+ }
+
+ public static void main(String[] args) throws Exception {
+ AtMostOnceEventCount scenario = new AtMostOnceEventCount();
+
+ scenario.runScenario(args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventCount.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventCount.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventCount.java
new file mode 100755
index 0000000..dd53e42
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventCount.java
@@ -0,0 +1,155 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples;
+
+import backtype.storm.StormSubmitter;
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.TopologyBuilder;
+
+import org.apache.storm.eventhubs.samples.bolt.GlobalCountBolt;
+import org.apache.storm.eventhubs.samples.bolt.PartialCountBolt;
+import org.apache.storm.eventhubs.spout.EventHubSpout;
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+
+import java.io.FileReader;
+import java.util.Properties;
+
+/**
+ * The basic scenario topology that uses EventHubSpout with PartialCountBolt
+ * and GlobalCountBolt.
+ * To submit this topology:
+ * storm jar {jarfile} {classname} {topologyname} {spoutconffile}
+ */
+public class EventCount {
+ protected EventHubSpoutConfig spoutConfig;
+ protected int numWorkers;
+
+ public EventCount() {
+ }
+
+ protected void readEHConfig(String[] args) throws Exception {
+ Properties properties = new Properties();
+ if(args.length > 1) {
+ properties.load(new FileReader(args[1]));
+ }
+ else {
+ properties.load(EventCount.class.getClassLoader().getResourceAsStream(
+ "Config.properties"));
+ }
+
+ String username = properties.getProperty("eventhubspout.username");
+ String password = properties.getProperty("eventhubspout.password");
+ String namespaceName = properties.getProperty("eventhubspout.namespace");
+ String entityPath = properties.getProperty("eventhubspout.entitypath");
+ String targetFqnAddress = properties.getProperty("eventhubspout.targetfqnaddress");
+ String zkEndpointAddress = properties.getProperty("zookeeper.connectionstring");
+ int partitionCount = Integer.parseInt(properties.getProperty("eventhubspout.partitions.count"));
+ int checkpointIntervalInSeconds = Integer.parseInt(properties.getProperty("eventhubspout.checkpoint.interval"));
+ int receiverCredits = Integer.parseInt(properties.getProperty("eventhub.receiver.credits"));
+ String maxPendingMsgsPerPartitionStr = properties.getProperty("eventhubspout.max.pending.messages.per.partition");
+ if(maxPendingMsgsPerPartitionStr == null) {
+ maxPendingMsgsPerPartitionStr = "1024";
+ }
+ int maxPendingMsgsPerPartition = Integer.parseInt(maxPendingMsgsPerPartitionStr);
+ String enqueueTimeDiffStr = properties.getProperty("eventhub.receiver.filter.timediff");
+ if(enqueueTimeDiffStr == null) {
+ enqueueTimeDiffStr = "0";
+ }
+ int enqueueTimeDiff = Integer.parseInt(enqueueTimeDiffStr);
+ long enqueueTimeFilter = 0;
+ if(enqueueTimeDiff != 0) {
+ enqueueTimeFilter = System.currentTimeMillis() - enqueueTimeDiff*1000;
+ }
+
+ System.out.println("Eventhub spout config: ");
+ System.out.println(" partition count: " + partitionCount);
+ System.out.println(" checkpoint interval: " + checkpointIntervalInSeconds);
+ System.out.println(" receiver credits: " + receiverCredits);
+ spoutConfig = new EventHubSpoutConfig(username, password,
+ namespaceName, entityPath, partitionCount, zkEndpointAddress,
+ checkpointIntervalInSeconds, receiverCredits, maxPendingMsgsPerPartition, enqueueTimeFilter);
+
+ if(targetFqnAddress != null)
+ {
+ spoutConfig.setTargetAddress(targetFqnAddress);
+ }
+
+ //set the number of workers to be the same as partition number.
+ //the idea is to have a spout and a partial count bolt co-exist in one
+ //worker to avoid shuffling messages across workers in storm cluster.
+ numWorkers = spoutConfig.getPartitionCount();
+
+ if(args.length > 0) {
+ //set topology name so that sample Trident topology can use it as stream name.
+ spoutConfig.setTopologyName(args[0]);
+ }
+ }
+
+ protected EventHubSpout createEventHubSpout() {
+ EventHubSpout eventHubSpout = new EventHubSpout(spoutConfig);
+ return eventHubSpout;
+ }
+
+ protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
+ TopologyBuilder topologyBuilder = new TopologyBuilder();
+
+ topologyBuilder.setSpout("EventHubsSpout", eventHubSpout, spoutConfig.getPartitionCount())
+ .setNumTasks(spoutConfig.getPartitionCount());
+ topologyBuilder.setBolt("PartialCountBolt", new PartialCountBolt(), spoutConfig.getPartitionCount())
+ .localOrShuffleGrouping("EventHubsSpout").setNumTasks(spoutConfig.getPartitionCount());
+ topologyBuilder.setBolt("GlobalCountBolt", new GlobalCountBolt(), 1)
+ .globalGrouping("PartialCountBolt").setNumTasks(1);
+ return topologyBuilder.createTopology();
+ }
+
+ protected void submitTopology(String[] args, StormTopology topology) throws Exception {
+ Config config = new Config();
+ config.setDebug(false);
+ //Enable metrics
+ config.registerMetricsConsumer(backtype.storm.metric.LoggingMetricsConsumer.class, 1);
+
+
+ if (args != null && args.length > 0) {
+ config.setNumWorkers(numWorkers);
+ StormSubmitter.submitTopology(args[0], config, topology);
+ } else {
+ config.setMaxTaskParallelism(2);
+
+ LocalCluster localCluster = new LocalCluster();
+ localCluster.submitTopology("test", config, topology);
+
+ Thread.sleep(5000000);
+
+ localCluster.shutdown();
+ }
+ }
+
+ protected void runScenario(String[] args) throws Exception{
+ readEHConfig(args);
+ EventHubSpout eventHubSpout = createEventHubSpout();
+ StormTopology topology = buildTopology(eventHubSpout);
+ submitTopology(args, topology);
+ }
+
+ public static void main(String[] args) throws Exception {
+ EventCount scenario = new EventCount();
+ scenario.runScenario(args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventHubLoop.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
new file mode 100755
index 0000000..cae0573
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
@@ -0,0 +1,51 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.TopologyBuilder;
+
+import org.apache.storm.eventhubs.bolt.EventHubBolt;
+import org.apache.storm.eventhubs.spout.EventHubSpout;
+
+/**
+ * A sample topology that loops message back to EventHub
+ */
+public class EventHubLoop extends EventCount {
+
+ @Override
+ protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
+ TopologyBuilder topologyBuilder = new TopologyBuilder();
+
+ topologyBuilder.setSpout("EventHubsSpout", eventHubSpout, spoutConfig.getPartitionCount())
+ .setNumTasks(spoutConfig.getPartitionCount());
+
+ EventHubBolt eventHubBolt = new EventHubBolt(spoutConfig.getConnectionString(),
+ spoutConfig.getEntityPath());
+ //For every spout, let's create multiple bolts because send is much slower
+ int boltTasks = spoutConfig.getPartitionCount() * 50;
+ topologyBuilder.setBolt("EventHubsBolt", eventHubBolt, boltTasks)
+ .localOrShuffleGrouping("EventHubsSpout").setNumTasks(boltTasks);
+ return topologyBuilder.createTopology();
+ }
+
+ public static void main(String[] args) throws Exception {
+ EventHubLoop scenario = new EventHubLoop();
+ scenario.runScenario(args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java
new file mode 100755
index 0000000..f4fe127
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java
@@ -0,0 +1,53 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples;
+
+import storm.trident.TridentState;
+import storm.trident.TridentTopology;
+import storm.trident.operation.builtin.Count;
+import storm.trident.operation.builtin.Sum;
+import storm.trident.testing.MemoryMapState;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.tuple.Fields;
+
+import org.apache.storm.eventhubs.samples.TransactionalTridentEventCount.LoggingFilter;
+import org.apache.storm.eventhubs.spout.EventHubSpout;
+import org.apache.storm.eventhubs.trident.OpaqueTridentEventHubSpout;
+
+/**
+ * A simple Trident topology uses OpaqueTridentEventHubSpout
+ */
+public class OpaqueTridentEventCount extends EventCount {
+ @Override
+ protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
+ TridentTopology topology = new TridentTopology();
+
+ OpaqueTridentEventHubSpout spout = new OpaqueTridentEventHubSpout(spoutConfig);
+ TridentState state = topology.newStream("stream-" + spoutConfig.getTopologyName(), spout)
+ .parallelismHint(spoutConfig.getPartitionCount())
+ .aggregate(new Count(), new Fields("partial-count"))
+ .persistentAggregate(new MemoryMapState.Factory(), new Fields("partial-count"), new Sum(), new Fields("count"));
+ state.newValuesStream().each(new Fields("count"), new LoggingFilter("got count: ", 10000));
+ return topology.build();
+ }
+
+ public static void main(String[] args) throws Exception {
+ OpaqueTridentEventCount scenario = new OpaqueTridentEventCount();
+ scenario.runScenario(args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java
new file mode 100755
index 0000000..1e7628b
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java
@@ -0,0 +1,81 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.tuple.Fields;
+
+import org.apache.storm.eventhubs.spout.EventHubSpout;
+import org.apache.storm.eventhubs.trident.TransactionalTridentEventHubSpout;
+
+import storm.trident.TridentState;
+import storm.trident.TridentTopology;
+import storm.trident.operation.BaseFilter;
+import storm.trident.operation.builtin.Count;
+import storm.trident.operation.builtin.Sum;
+import storm.trident.testing.MemoryMapState;
+import storm.trident.tuple.TridentTuple;
+
+/**
+ * A simple Trident topology uses TransactionalTridentEventHubSpout
+ */
+public class TransactionalTridentEventCount extends EventCount {
+ public static class LoggingFilter extends BaseFilter {
+ private static final long serialVersionUID = 1L;
+ private static final Logger logger = LoggerFactory.getLogger(LoggingFilter.class);
+ private final String prefix;
+ private final long logIntervalMs;
+ private long lastTime;
+ public LoggingFilter(String prefix, int logIntervalMs) {
+ this.prefix = prefix;
+ this.logIntervalMs = logIntervalMs;
+ lastTime = System.nanoTime();
+ }
+
+ @Override
+ public boolean isKeep(TridentTuple tuple) {
+ long now = System.nanoTime();
+ if(logIntervalMs < (now - lastTime) / 1000000) {
+ logger.info(prefix + tuple.toString());
+ lastTime = now;
+ }
+ return false;
+ }
+ }
+
+ @Override
+ protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
+ TridentTopology topology = new TridentTopology();
+
+ TransactionalTridentEventHubSpout spout = new TransactionalTridentEventHubSpout(spoutConfig);
+ TridentState state = topology.newStream("stream-" + spoutConfig.getTopologyName(), spout)
+ .parallelismHint(spoutConfig.getPartitionCount())
+ .aggregate(new Count(), new Fields("partial-count"))
+ .persistentAggregate(new MemoryMapState.Factory(), new Fields("partial-count"), new Sum(), new Fields("count"));
+ state.newValuesStream().each(new Fields("count"), new LoggingFilter("got count: ", 10000));
+ return topology.build();
+ }
+
+ public static void main(String[] args) throws Exception {
+ TransactionalTridentEventCount scenario = new TransactionalTridentEventCount();
+ scenario.runScenario(args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java
new file mode 100755
index 0000000..16b34a6
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java
@@ -0,0 +1,83 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples.bolt;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.Config;
+import backtype.storm.metric.api.IMetric;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.BasicOutputCollector;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseBasicBolt;
+import backtype.storm.tuple.Tuple;
+
+/**
+ * Globally count number of messages
+ */
+public class GlobalCountBolt extends BaseBasicBolt {
+ private static final long serialVersionUID = 1L;
+ private static final Logger logger = LoggerFactory
+ .getLogger(GlobalCountBolt.class);
+ private long globalCount;
+ private long globalCountDiff;
+ private long lastMetricsTime;
+ private long throughput;
+
+ @Override
+ public void prepare(Map config, TopologyContext context) {
+ globalCount = 0;
+ globalCountDiff = 0;
+ lastMetricsTime = System.nanoTime();
+ context.registerMetric("GlobalMessageCount", new IMetric() {
+ @Override
+ public Object getValueAndReset() {
+ long now = System.nanoTime();
+ long millis = (now - lastMetricsTime) / 1000000;
+ throughput = globalCountDiff / millis * 1000;
+ Map values = new HashMap();
+ values.put("global_count", globalCount);
+ values.put("throughput", throughput);
+ lastMetricsTime = now;
+ globalCountDiff = 0;
+ return values;
+ }
+ }, (Integer)config.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS));
+ }
+
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ int partial = (Integer)tuple.getValueByField("partial_count");
+ globalCount += partial;
+ globalCountDiff += partial;
+ if((globalCountDiff == partial) && (globalCount != globalCountDiff)) {
+ //metrics has just been collected, let's also log it
+ logger.info("Current throughput (messages/second): " + throughput);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java
new file mode 100755
index 0000000..21f1ab4
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java
@@ -0,0 +1,63 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples.bolt;
+
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.BasicOutputCollector;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseBasicBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+/**
+ * Partially count number of messages from EventHubs
+ */
+public class PartialCountBolt extends BaseBasicBolt {
+ private static final long serialVersionUID = 1L;
+ private static final Logger logger = LoggerFactory
+ .getLogger(PartialCountBolt.class);
+ private static final int PartialCountBatchSize = 1000;
+
+ private int partialCount;
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context) {
+ partialCount = 0;
+ }
+
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ partialCount++;
+ if(partialCount == PartialCountBatchSize) {
+ collector.emit(new Values(PartialCountBatchSize));
+ partialCount = 0;
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("partial_count"));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventData.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventData.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventData.java
new file mode 100755
index 0000000..e5834b4
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventData.java
@@ -0,0 +1,48 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import org.apache.qpid.amqp_1_0.client.Message;
+
+public class EventData implements Comparable<EventData> {
+ private final Message message;
+ private final MessageId messageId;
+
+ public EventData(Message message, MessageId messageId) {
+ this.message = message;
+ this.messageId = messageId;
+ }
+
+ public static EventData create(Message message, MessageId messageId) {
+ return new EventData(message, messageId);
+ }
+
+ public Message getMessage() {
+ return this.message;
+ }
+
+ public MessageId getMessageId() {
+ return this.messageId;
+ }
+
+ @Override
+ public int compareTo(EventData ed) {
+ return messageId.getSequenceNumber().
+ compareTo(ed.getMessageId().getSequenceNumber());
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
new file mode 100755
index 0000000..d01050d
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
@@ -0,0 +1,55 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import backtype.storm.tuple.Fields;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.qpid.amqp_1_0.client.Message;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
+import org.apache.qpid.amqp_1_0.type.messaging.Data;
+
+public class EventDataScheme implements IEventDataScheme {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public List<Object> deserialize(Message message) {
+ List<Object> fieldContents = new ArrayList<Object>();
+
+ for (Section section : message.getPayload()) {
+ if (section instanceof Data) {
+ Data data = (Data) section;
+ fieldContents.add(new String(data.getValue().getArray()));
+ return fieldContents;
+ } else if (section instanceof AmqpValue) {
+ AmqpValue amqpValue = (AmqpValue) section;
+ fieldContents.add(amqpValue.getValue().toString());
+ return fieldContents;
+ }
+ }
+
+ return null;
+ }
+
+ @Override
+ public Fields getOutputFields() {
+ return new Fields(FieldConstants.Message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverFilter.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverFilter.java
new file mode 100755
index 0000000..e80cd25
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverFilter.java
@@ -0,0 +1,56 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+
+public class EventHubReceiverFilter implements IEventHubReceiverFilter {
+ String offset = null;
+ long enqueueTime = 0;
+ public EventHubReceiverFilter() {
+
+ }
+
+ public EventHubReceiverFilter(String offset) {
+ //Creates offset only filter
+ this.offset = offset;
+ }
+
+ public EventHubReceiverFilter(long enqueueTime) {
+ //Creates enqueue time only filter
+ this.enqueueTime = enqueueTime;
+ }
+
+ public void setOffset(String offset) {
+ this.offset = offset;
+ }
+
+ public void setEnqueueTime(long enqueueTime) {
+ this.enqueueTime = enqueueTime;
+ }
+
+ @Override
+ public String getOffset() {
+ return offset;
+ }
+
+ @Override
+ public long getEnqueueTime() {
+ return enqueueTime;
+ }
+
+}
[4/4] storm git commit: STORM-583: incorporate storm-eventhubs into
build (this closes #336)
Posted by pt...@apache.org.
STORM-583: incorporate storm-eventhubs into build (this closes #336)
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/38638474
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/38638474
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/38638474
Branch: refs/heads/master
Commit: 38638474ef6b045f9e61942b2fd882a8ed4ed8e9
Parents: eff6cfa
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri Apr 24 16:29:04 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri Apr 24 16:29:04 2015 -0400
----------------------------------------------------------------------
CHANGELOG.md | 1 +
external/storm-eventhubs/README.md | 4 ++++
external/storm-eventhubs/pom.xml | 4 ++--
pom.xml | 1 +
storm-dist/binary/src/main/assembly/binary.xml | 25 +++++++++++++++++++--
5 files changed, 31 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/38638474/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index fd140ea..23920e3 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 0.11.0
+ * STORM-583: Add Microsoft Azure Event Hub spout implementations
* STORM-712: Storm daemons shutdown if OutOfMemoryError occurs in any thread
* STORM-730: remove extra curly brace
* STORM-735: [storm-redis] Upgrade Jedis to 2.7.0
http://git-wip-us.apache.org/repos/asf/storm/blob/38638474/external/storm-eventhubs/README.md
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/README.md b/external/storm-eventhubs/README.md
index ea8076b..1586ca8 100755
--- a/external/storm-eventhubs/README.md
+++ b/external/storm-eventhubs/README.md
@@ -35,3 +35,7 @@ If you want to send messages to all partitions, use "-1" as partitionId.
### Windows Azure Eventhubs ###
http://azure.microsoft.com/en-us/services/event-hubs/
+## Committer Sponsors
+
+ * P. Taylor Goetz ([ptgoetz@apache.org](mailto:ptgoetz@apache.org))
+
http://git-wip-us.apache.org/repos/asf/storm/blob/38638474/external/storm-eventhubs/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/pom.xml b/external/storm-eventhubs/pom.xml
index bc11dd1..5ed65c7 100755
--- a/external/storm-eventhubs/pom.xml
+++ b/external/storm-eventhubs/pom.xml
@@ -21,12 +21,12 @@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
- <version>0.10.0-SNAPSHOT</version>
+ <version>0.11.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<artifactId>storm-eventhubs</artifactId>
- <version>0.10.0-SNAPSHOT</version>
+ <version>0.11.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>storm-eventhubs</name>
<description>EventHubs Storm Spout</description>
http://git-wip-us.apache.org/repos/asf/storm/blob/38638474/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 95aa46d..2e0c898 100644
--- a/pom.xml
+++ b/pom.xml
@@ -168,6 +168,7 @@
<module>external/storm-hive</module>
<module>external/storm-jdbc</module>
<module>external/storm-redis</module>
+ <module>external/storm-eventhubs</module>
</modules>
<scm>
http://git-wip-us.apache.org/repos/asf/storm/blob/38638474/storm-dist/binary/src/main/assembly/binary.xml
----------------------------------------------------------------------
diff --git a/storm-dist/binary/src/main/assembly/binary.xml b/storm-dist/binary/src/main/assembly/binary.xml
index 61a8027..fef56bb 100644
--- a/storm-dist/binary/src/main/assembly/binary.xml
+++ b/storm-dist/binary/src/main/assembly/binary.xml
@@ -136,7 +136,28 @@
<includes>
<include>README.*</include>
</includes>
- </fileSet>
+ </fileSet>
+ <fileSet>
+ <directory>${project.basedir}/../../external/storm-eventhubs/target</directory>
+ <outputDirectory>external/storm-eventhubs</outputDirectory>
+ <includes>
+ <include>storm*jar</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>${project.basedir}/../../external/storm-eventhubs</directory>
+ <outputDirectory>external/storm-eventhubs</outputDirectory>
+ <includes>
+ <include>README.*</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>${project.basedir}/../../external/storm-eventhubs/src/main/resources</directory>
+ <outputDirectory>external/storm-eventhubs</outputDirectory>
+ <includes>
+ <include>Config.properties</include>
+ </includes>
+ </fileSet>
<!-- $STORM_HOME/extlib -->
<fileSet>
@@ -160,7 +181,7 @@
<files>
<!-- EXAMPLES -->
<file>
- <source>${project.basedir}/../../examples/storm-starter/target/storm-starter-${project.version}-jar-with-dependencies.jar</source>
+ <source>${project.basedir}/../../examples/storm-starter/target/storm-starter-${project.version}.jar</source>
<outputDirectory>/examples/storm-starter/</outputDirectory>
<destName>storm-starter-topologies-${project.version}.jar</destName>
</file>