You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/04/24 22:32:39 UTC

[1/4] storm git commit: STORM-583: initial import of donated code

Repository: storm
Updated Branches:
  refs/heads/master b4351ede1 -> 38638474e


http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubSpout.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubSpout.java
new file mode 100755
index 0000000..66ad425
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubSpout.java
@@ -0,0 +1,66 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import java.util.Map;
+
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+import org.apache.storm.eventhubs.spout.IEventDataScheme;
+
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Fields;
+import storm.trident.spout.IPartitionedTridentSpout;
+import org.apache.storm.eventhubs.trident.Partition;
+
+/**
+ * Transactional Trident EventHub Spout
+ */
+public class TransactionalTridentEventHubSpout implements 
+  IPartitionedTridentSpout<Partitions, Partition, Map> {
+  private static final long serialVersionUID = 1L;
+  private final IEventDataScheme scheme;
+  private final EventHubSpoutConfig spoutConfig;
+  
+  public TransactionalTridentEventHubSpout(EventHubSpoutConfig config) {
+    spoutConfig = config;
+    scheme = spoutConfig.getEventDataScheme();
+  }
+  
+  @Override
+  public Map getComponentConfiguration() {
+    return null;
+  }
+
+  @Override
+  public IPartitionedTridentSpout.Coordinator<Partitions> getCoordinator(
+      Map conf, TopologyContext context) {
+    return new org.apache.storm.eventhubs.trident.Coordinator(spoutConfig);
+  }
+
+  @Override
+  public IPartitionedTridentSpout.Emitter<Partitions, Partition, Map> getEmitter(
+      Map conf, TopologyContext context) {
+    return new TransactionalTridentEventHubEmitter(spoutConfig);
+  }
+
+  @Override
+  public Fields getOutputFields() {
+    return scheme.getOutputFields();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java
new file mode 100755
index 0000000..60391c3
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java
@@ -0,0 +1,91 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.eventhubs.client.Constants;
+import org.apache.storm.eventhubs.client.EventHubException;
+import org.apache.storm.eventhubs.spout.EventData;
+import org.apache.storm.eventhubs.spout.EventHubReceiverFilter;
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+import org.apache.storm.eventhubs.spout.IEventHubReceiver;
+
+public class TridentPartitionManager implements ITridentPartitionManager {
+  private static final Logger logger = LoggerFactory.getLogger(TridentPartitionManager.class);
+  private final int receiveTimeoutMs = 5000;
+  private final IEventHubReceiver receiver;
+  private final EventHubSpoutConfig spoutConfig;
+  private String lastOffset = Constants.DefaultStartingOffset;
+  
+  public TridentPartitionManager(EventHubSpoutConfig spoutConfig, IEventHubReceiver receiver) {
+    this.receiver = receiver;
+    this.spoutConfig = spoutConfig;
+  }
+  
+  @Override
+  public boolean open(String offset) {
+    try {
+      if((offset == null || offset.equals(Constants.DefaultStartingOffset)) 
+        && spoutConfig.getEnqueueTimeFilter() != 0) {
+          receiver.open(new EventHubReceiverFilter(spoutConfig.getEnqueueTimeFilter()));
+      }
+      else {
+        receiver.open(new EventHubReceiverFilter(offset));
+      }
+      lastOffset = offset;
+      return true;
+    }
+    catch(EventHubException ex) {
+      logger.error("failed to open eventhub receiver: " + ex.getMessage());
+      return false;
+    }
+  }
+  
+  @Override
+  public void close() {
+    receiver.close();
+  }
+  
+  @Override
+  public List<EventData> receiveBatch(String offset, int count) {
+    List<EventData> batch = new ArrayList<EventData>(count);
+    if(!offset.equals(lastOffset) || !receiver.isOpen()) {
+      //re-establish connection to eventhub servers using the right offset
+      //TBD: might be optimized with cache.
+      close();
+      if(!open(offset)) {
+        return batch;
+      }
+    }
+    
+    for(int i=0; i<count; ++i) {
+      EventData ed = receiver.receive(receiveTimeoutMs);
+      if(ed == null) {
+        break;
+      }
+      batch.add(ed);
+      lastOffset = ed.getMessageId().getOffset();
+    }
+    return batch;
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/resources/Config.properties
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/resources/Config.properties b/external/storm-eventhubs/src/main/resources/Config.properties
new file mode 100755
index 0000000..82abb48
--- /dev/null
+++ b/external/storm-eventhubs/src/main/resources/Config.properties
@@ -0,0 +1,27 @@
+eventhubspout.username = [username]
+
+eventhubspout.password = [password]
+
+eventhubspout.namespace = [namespace]
+
+eventhubspout.entitypath = [entitypath]
+
+eventhubspout.partitions.count = [partitioncount]
+
+# if not provided, will use storm's zookeeper settings
+# zookeeper.connectionstring=localhost:2181
+
+eventhubspout.checkpoint.interval = 10
+
+eventhub.receiver.credits = 1024
+
+# Defines how many messages can be pending (sent but not acked by storm)
+# per partition in eventhubspout. If set too large, your spout may throw
+# OutOfMemoryError because all pending messages are cached in the spout.
+eventhubspout.max.pending.messages.per.partition = 1024
+
+# Defines how many seconds in the past the spout uses to filter events in
+# the EventHubs entity when we first create the Storm topology. If offsets
+# have been saved in Zookeeper, we'll ignore this configuration.
+# A value of 0 means disable time based filtering when creating the receiver.
+eventhub.receiver.filter.timediff = 0
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubReceiverMock.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubReceiverMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubReceiverMock.java
new file mode 100755
index 0000000..740ef63
--- /dev/null
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubReceiverMock.java
@@ -0,0 +1,105 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.eventhubs.spout.MessageId;
+import org.apache.storm.eventhubs.spout.EventData;
+import org.apache.storm.eventhubs.spout.IEventHubReceiver;
+
+import org.apache.qpid.amqp_1_0.client.Message;
+import org.apache.qpid.amqp_1_0.jms.impl.TextMessageImpl;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.messaging.Data;
+
+import org.apache.storm.eventhubs.client.EventHubException;
+
+/**
+ * A mock receiver that emits fake data with offset starting from given offset
+ * and increase by 1 each time.
+ */
+public class EventHubReceiverMock implements IEventHubReceiver {
+  private static boolean isPaused = false;
+  private final String partitionId;
+  private long currentOffset;
+  private boolean isOpen;
+
+  public EventHubReceiverMock(String pid) {
+    partitionId = pid;
+    isPaused = false;
+  }
+  
+  /**
+   * Use this method to pause/resume all the receivers.
+   * If paused all receiver will return null.
+   * @param val
+   */
+  public static void setPause(boolean val) {
+    isPaused = val;
+  }
+
+  @Override
+  public void open(IEventHubReceiverFilter filter) throws EventHubException {
+    if(filter.getOffset() != null) {
+      currentOffset = Long.parseLong(filter.getOffset());
+    }
+    else if(filter.getEnqueueTime() != 0) {
+      //assume if it's time based filter the offset matches the enqueue time.
+      currentOffset = filter.getEnqueueTime();
+    }
+    else {
+      throw new EventHubException("Invalid IEventHubReceiverFilter");
+    }
+    isOpen = true;
+  }
+
+  @Override
+  public void close() {
+    isOpen = false;
+  }
+  
+  @Override
+  public boolean isOpen() {
+    return isOpen;
+  }
+
+  @Override
+  public EventData receive(long timeoutInMilliseconds) {
+    if(isPaused) {
+      return null;
+    }
+
+    currentOffset++;
+    List<Section> body = new ArrayList<Section>();
+    //the body of the message is "message" + currentOffset, e.g. "message123"
+    body.add(new Data(new Binary(("message" + currentOffset).getBytes())));
+    Message m = new Message(body);
+    MessageId mid = new MessageId(partitionId, "" + currentOffset, currentOffset);
+    EventData ed = new EventData(m, mid);
+    return ed;
+  }
+  
+  @Override
+  public Map getMetricsData() {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubSpoutCallerMock.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubSpoutCallerMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubSpoutCallerMock.java
new file mode 100755
index 0000000..d5ba90a
--- /dev/null
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubSpoutCallerMock.java
@@ -0,0 +1,96 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import backtype.storm.spout.SpoutOutputCollector;
+
+/**
+ * Mocks EventHubSpout's caller (storm framework)
+ */
+public class EventHubSpoutCallerMock {
+  public static final String statePathPrefix = "/eventhubspout/TestTopo/namespace/entityname/partitions/";
+  EventHubSpout spout;
+  private IStateStore stateStore;
+  private SpoutOutputCollectorMock collector;
+  
+  public EventHubSpoutCallerMock(int totalPartitions,
+      int totalTasks, int taskIndex, int checkpointInterval) {
+    stateStore = new StateStoreMock();
+    EventHubSpoutConfig conf = new EventHubSpoutConfig("username", "password",
+        "namespace", "entityname", totalPartitions, "zookeeper", checkpointInterval, 1024);
+    conf.setTopologyName("TestTopo");
+    
+    IEventHubReceiverFactory recvFactory = new IEventHubReceiverFactory() {
+      @Override
+      public IEventHubReceiver create(EventHubSpoutConfig config,
+          String partitionId) {
+        return new EventHubReceiverMock(partitionId);
+      }
+    };
+    //mock state store and receiver
+    spout = new EventHubSpout(conf, stateStore, null, recvFactory);
+    
+    collector = new SpoutOutputCollectorMock();
+    
+    try {
+      spout.preparePartitions(null, totalTasks, taskIndex, new SpoutOutputCollector(collector));
+    }
+    catch(Exception ex) {
+    }
+  }
+  
+  /**
+   * Execute a sequence of calls to EventHubSpout.
+   * 
+   * @param callSequence: is represented as a string of commands, 
+   * e.g. "r,r,r,r,a1,f2,...". The commands are:
+   * r[N]: receive() called N times
+   * aP_X: ack(P_X), partition: P, offset: X
+   * fP_Y: fail(P_Y), partition: P, offset: Y
+   */
+  public String execute(String callSequence) {
+    String[] cmds = callSequence.split(",");
+    for(String cmd : cmds) {
+      if(cmd.startsWith("r")) {
+        int count = 1;
+        if(cmd.length() > 1) {
+          count = Integer.parseInt(cmd.substring(1));
+        }
+        for(int i=0; i<count; ++i) {
+          spout.nextTuple();
+        }
+      }
+      else if(cmd.startsWith("a")) {
+        String[] midStrs = cmd.substring(1).split("_");
+        MessageId msgId = new MessageId(midStrs[0], midStrs[1], Long.parseLong(midStrs[1]));
+        spout.ack(msgId);
+      }
+      else if(cmd.startsWith("f")) {
+        String[] midStrs = cmd.substring(1).split("_");
+        MessageId msgId = new MessageId(midStrs[0], midStrs[1], Long.parseLong(midStrs[1]));
+        spout.fail(msgId);
+      }
+    }
+    return collector.getOffsetSequenceAndReset();
+  }
+  
+  public String getCheckpoint(int partitionIndex) {
+    String statePath = statePathPrefix + partitionIndex;
+    return stateStore.readData(statePath);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/PartitionManagerCallerMock.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/PartitionManagerCallerMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/PartitionManagerCallerMock.java
new file mode 100755
index 0000000..dd63d5d
--- /dev/null
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/PartitionManagerCallerMock.java
@@ -0,0 +1,105 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import org.apache.storm.eventhubs.spout.PartitionManager;
+import org.apache.storm.eventhubs.spout.EventData;
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+
+/**
+ * This mock exercises PartitionManager
+ */
+public class PartitionManagerCallerMock {
+  public static final String statePath = "/eventhubspout/TestTopo/namespace/entityname/partitions/1";
+  private IPartitionManager pm;
+  private IStateStore stateStore;
+
+  public PartitionManagerCallerMock(String partitionId) {
+    this(partitionId, 0);
+  }
+  
+  public PartitionManagerCallerMock(String partitionId, long enqueueTimeFilter) {
+    EventHubReceiverMock receiver = new EventHubReceiverMock(partitionId);
+    EventHubSpoutConfig conf = new EventHubSpoutConfig("username", "password",
+      "namespace", "entityname", 16, "zookeeper", 10, 1024, 1024, enqueueTimeFilter);
+    conf.setTopologyName("TestTopo");
+    stateStore = new StateStoreMock();
+    this.pm = new PartitionManager(conf, partitionId, stateStore, receiver);
+    
+    stateStore.open();
+    try {
+      pm.open();
+    }
+    catch (Exception ex) {
+    }
+  }
+  
+  /**
+   * Execute a sequence of calls to Partition Manager.
+   * 
+   * @param callSequence: is represented as a string of commands, 
+   * e.g. "r,r,r,r,a1,f2,...". The commands are:
+   * r[N]: receive() called N times
+   * aX: ack(X)
+   * fY: fail(Y)
+   * 
+   * @return the sequence of messages the receive call returns
+   */
+  public String execute(String callSequence) {
+    
+    String[] cmds = callSequence.split(",");
+    StringBuilder ret = new StringBuilder();
+    for(String cmd : cmds) {
+      if(cmd.startsWith("r")) {
+        int count = 1;
+        if(cmd.length() > 1) {
+          count = Integer.parseInt(cmd.substring(1));
+        }
+        for(int i=0; i<count; ++i) {
+          EventData ed = pm.receive();
+          if(ed == null) {
+            ret.append("null,");
+          }
+          else {
+            ret.append(ed.getMessageId().getOffset());
+            ret.append(",");
+          }
+        }
+      }
+      else if(cmd.startsWith("a")) {
+        pm.ack(cmd.substring(1));
+      }
+      else if(cmd.startsWith("f")) {
+        pm.fail(cmd.substring(1));
+      }
+    }
+    if(ret.length() > 0) {
+      ret.setLength(ret.length()-1);
+    }
+    return ret.toString();
+  }
+  
+  /**
+   * Exercise the IPartitionManager.checkpoint() method
+   * @return the offset that we write to state store
+   */
+  public String checkpoint() {
+    pm.checkpoint();
+    return stateStore.readData(statePath);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
new file mode 100755
index 0000000..02e6830
--- /dev/null
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
@@ -0,0 +1,61 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import java.util.List;
+
+import backtype.storm.spout.ISpoutOutputCollector;
+
+/**
+ * Mock of ISpoutOutputCollector
+ */
+public class SpoutOutputCollectorMock implements ISpoutOutputCollector {
+  //comma separated offsets
+  StringBuilder emittedOffset;
+  
+  public SpoutOutputCollectorMock() {
+    emittedOffset = new StringBuilder();
+  }
+  
+  public String getOffsetSequenceAndReset() {
+    String ret = null;
+    if(emittedOffset.length() > 0) {
+      emittedOffset.setLength(emittedOffset.length()-1);
+      ret = emittedOffset.toString();
+      emittedOffset.setLength(0);
+    }
+    return ret;
+  }
+
+  @Override
+  public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
+    MessageId mid = (MessageId)messageId;
+    String pid = mid.getPartitionId();
+    String offset = mid.getOffset();
+    emittedOffset.append(pid+"_"+offset+",");
+    return null;
+  }
+
+  @Override
+  public void emitDirect(int arg0, String arg1, List<Object> arg2, Object arg3) {
+  }
+
+  @Override
+  public void reportError(Throwable arg0) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/StateStoreMock.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/StateStoreMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/StateStoreMock.java
new file mode 100755
index 0000000..cd6e13e
--- /dev/null
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/StateStoreMock.java
@@ -0,0 +1,54 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.storm.eventhubs.spout.IStateStore;
+
+/**
+ * A state store mocker
+ */
+public class StateStoreMock implements IStateStore {
+  Map<String, String> myDataMap;
+  @Override
+  public void open() {
+    myDataMap = new HashMap<String, String>();
+  }
+
+  @Override
+  public void close() {
+    myDataMap = null;
+  }
+
+  @Override
+  public void saveData(String path, String data) {
+    if(myDataMap != null) {
+      myDataMap.put(path, data);
+    }
+  }
+
+  @Override
+  public String readData(String path) {
+    if(myDataMap != null) {
+      return myDataMap.get(path);
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java
new file mode 100755
index 0000000..aa8d097
--- /dev/null
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java
@@ -0,0 +1,47 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import static org.junit.Assert.*;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestEventData {
+
+  @Before
+  public void setUp() throws Exception {
+  }
+
+  @After
+  public void tearDown() throws Exception {
+  }
+
+  @Test
+  public void testEventDataComparision() {
+
+	MessageId messageId1 = MessageId.create(null, "3", 1);
+	EventData eventData1 = EventData.create(null, messageId1);
+
+	MessageId messageId2 = MessageId.create(null, "13", 2);
+	EventData eventData2 = EventData.create(null, messageId2);
+
+	assertTrue(eventData2.compareTo(eventData1) > 0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java
new file mode 100755
index 0000000..6a0d163
--- /dev/null
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java
@@ -0,0 +1,70 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import static org.junit.Assert.*;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class TestEventHubSpout {
+
+  @Before
+  public void setUp() throws Exception {
+  }
+
+  @After
+  public void tearDown() throws Exception {
+  }
+  
+  @Test
+  public void testSpoutConfig() {
+    EventHubSpoutConfig conf = new EventHubSpoutConfig("username", "pas\\s+w/ord",
+        "namespace", "entityname", 16, "zookeeper");
+    assertEquals(conf.getConnectionString(), "amqps://username:pas%5Cs%2Bw%2Ford@namespace.servicebus.windows.net");
+  }
+
+  @Test
+  public void testSpoutBasic() {
+    //This spout owns 2 partitions: 6 and 14
+    EventHubSpoutCallerMock mock = new EventHubSpoutCallerMock(16, 8, 6,10);
+    String result = mock.execute("r6,f6_0,a6_1,a6_2,a14_0,a14_2,r4,f14_1,r2");
+    assertEquals("6_0,14_0,6_1,14_1,6_2,14_2,6_0,14_3,6_3,14_4,6_4,14_1", result);
+  }
+  
+  @Test
+  public void testSpoutCheckpoint() {
+    //Make sure that even though nextTuple() doesn't receive valid data,
+    //the offset will be checkpointed after checkpointInterval seconds.
+    
+    //This spout owns 1 partitions: 6
+    EventHubSpoutCallerMock mock = new EventHubSpoutCallerMock(8, 8, 6, 1);
+    String result = mock.execute("r6,a6_0,a6_1,a6_2");
+    try {
+      Thread.sleep(2000);
+    }
+    catch(InterruptedException ex) {}
+    EventHubReceiverMock.setPause(true);
+    result = mock.execute("r3");
+    EventHubReceiverMock.setPause(false);
+    assertEquals("3", mock.getCheckpoint(6));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestPartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestPartitionManager.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestPartitionManager.java
new file mode 100755
index 0000000..5b28f1d
--- /dev/null
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestPartitionManager.java
@@ -0,0 +1,117 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestPartitionManager {
+
+  @Before
+  public void setUp() throws Exception {
+  }
+
+  @After
+  public void tearDown() throws Exception {
+  }
+
+  @Test
+  public void testPartitionManagerNoFail() {
+    PartitionManagerCallerMock mock
+      = new PartitionManagerCallerMock("1");
+    String result = mock.execute("r,r,r,a0,a1,a2,r");
+    assertEquals("0,1,2,3", result);
+  }
+
+  @Test
+  public void testPartitionManagerResend() {
+    PartitionManagerCallerMock mock
+      = new PartitionManagerCallerMock("1");
+    String result = mock.execute("r,a0,r,r,r,f3,r,f2,f1,r,r,a1,a2,a3,r");
+    assertEquals("0,1,2,3,3,1,2,4", result);
+  }
+  
+  @Test
+  public void testPMCheckpointWithPending() {
+    PartitionManagerCallerMock mock
+      = new PartitionManagerCallerMock("1");
+    mock.execute("r,r,r");
+    //no ack, so return the first of pending list
+    assertEquals("0", mock.checkpoint());
+    mock.execute("a0,a2");
+    //still need to return the first of pending list
+    assertEquals("1", mock.checkpoint());
+  }
+  
+  @Test
+  public void testPMCheckpointWithResend() {
+    PartitionManagerCallerMock mock
+      = new PartitionManagerCallerMock("1");
+    mock.execute("r,r,r,f2,f1,f0");
+    //pending is empty, return the smallest in toResend
+    assertEquals("0", mock.checkpoint());
+    mock.execute("r,a0");
+    //pending is still empty
+    assertEquals("1", mock.checkpoint());
+  }
+  
+  @Test
+  public void testPMCheckpointWithPendingAndResend() {
+    PartitionManagerCallerMock mock
+      = new PartitionManagerCallerMock("1");
+    mock.execute("r,r,r,f2,f1");
+    //return the smaller of pending and toResend
+    assertEquals("0", mock.checkpoint());
+    mock.execute("a0,r");
+    //now pending: [3], toResend: [1,2]
+    assertEquals("1", mock.checkpoint());
+  }
+  
+  @Test
+  public void testPMCheckpointWithNoPendingAndNoResend() {
+    PartitionManagerCallerMock mock
+      = new PartitionManagerCallerMock("1");
+    //if no event sent, no checkpoint shall be created
+    assertEquals(null, mock.checkpoint());
+    mock.execute("r,r,r,f2,f1,r,r,a2,a1,a0");
+    //all events are sent successfully, return last sent offset
+    assertEquals("2", mock.checkpoint());
+  }
+  
+  @Test
+  public void testPartitionManagerMaxPendingMessages() {
+    PartitionManagerCallerMock mock
+      = new PartitionManagerCallerMock("1");
+    String result = mock.execute("r1024");
+    //any receive call after exceeding max pending messages results in null
+    result = mock.execute("r2");
+    assertEquals("null,null", result);
+    result = mock.execute("a0,a1,r2");
+    assertEquals("1024,1025", result);
+  }
+  
+  @Test
+  public void testPartitionManagerEnqueueTimeFilter() {
+    PartitionManagerCallerMock mock
+      = new PartitionManagerCallerMock("1", 123456);
+    String result = mock.execute("r2");
+    assertEquals("123457,123458", result);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TestTransactionalTridentEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TestTransactionalTridentEmitter.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TestTransactionalTridentEmitter.java
new file mode 100755
index 0000000..03696a2
--- /dev/null
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TestTransactionalTridentEmitter.java
@@ -0,0 +1,93 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import static org.junit.Assert.*;
+
+import java.util.Map;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.storm.eventhubs.spout.EventHubReceiverMock;
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+import org.apache.storm.eventhubs.spout.IEventHubReceiver;
+import org.apache.storm.eventhubs.spout.IEventHubReceiverFactory;
+
+public class TestTransactionalTridentEmitter {
+  private TransactionalTridentEventHubEmitter emitter;
+  private Partition partition;
+  private TridentCollectorMock collectorMock;
+  private final int batchSize = 32;
+
+  @Before
+  public void setUp() throws Exception {
+    EventHubSpoutConfig conf = new EventHubSpoutConfig("username", "password",
+        "namespace", "entityname", 16, "zookeeper");
+    conf.setTopologyName("TestTopo");
+    IEventHubReceiverFactory recvFactory = new IEventHubReceiverFactory() {
+      @Override
+      public IEventHubReceiver create(EventHubSpoutConfig config,
+          String partitionId) {
+        return new EventHubReceiverMock(partitionId);
+      }
+    };
+    partition = new Partition(conf, "0");
+    emitter = new TransactionalTridentEventHubEmitter(conf, batchSize, null, recvFactory);
+    collectorMock = new TridentCollectorMock();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    emitter.close();
+    emitter = null;
+  }
+
+  @Test
+  public void testEmitInSequence() {
+    //test the happy path, emit batches in sequence
+    Map meta = emitter.emitPartitionBatchNew(null, collectorMock, partition, null);
+    String collected = collectorMock.getBuffer();
+    assertTrue(collected.startsWith("message"+0));
+    //System.out.println("collected: " + collected);
+    collectorMock.clear();
+    
+    emitter.emitPartitionBatchNew(null, collectorMock, partition, meta);
+    collected = collectorMock.getBuffer();
+    //System.out.println("collected: " + collected);
+    assertTrue(collected.startsWith("message"+batchSize));
+  }
+
+  @Test
+  public void testReEmit() {
+    //test we can re-emit the second batch
+    Map meta = emitter.emitPartitionBatchNew(null, collectorMock, partition, null);
+    collectorMock.clear();
+    
+    //emit second batch
+    Map meta1 = emitter.emitPartitionBatchNew(null, collectorMock, partition, meta);
+    String collected0 = collectorMock.getBuffer();
+    collectorMock.clear();
+    
+    //re-emit second batch
+    emitter.emitPartitionBatch(null, collectorMock, partition, meta1);
+    String collected1 = collectorMock.getBuffer();
+    assertTrue(collected0.equals(collected1));
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java
new file mode 100755
index 0000000..cc4686e
--- /dev/null
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java
@@ -0,0 +1,52 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import java.util.List;
+
+import storm.trident.operation.TridentCollector;
+
+/**
+ * A mock of TridentCollector
+ */
+public class TridentCollectorMock implements TridentCollector {
+  StringBuilder buffer;
+  
+  public TridentCollectorMock() {
+    buffer = new StringBuilder();
+  }
+  
+  @Override
+  public void emit(List<Object> tuples) {
+    for(Object o: tuples) {
+      buffer.append(o.toString());
+    }
+  }
+
+  @Override
+  public void reportError(Throwable arg0) {
+  }
+
+  public void clear() {
+    buffer.setLength(0);
+  }
+  
+  public String getBuffer() {
+    return buffer.toString();
+  }
+}


[2/4] storm git commit: STORM-583: initial import of donated code

Posted by pt...@apache.org.
http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
new file mode 100755
index 0000000..5600873
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
@@ -0,0 +1,150 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import org.apache.qpid.amqp_1_0.client.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.metric.api.CountMetric;
+import backtype.storm.metric.api.MeanReducer;
+import backtype.storm.metric.api.ReducedMetric;
+
+import org.apache.storm.eventhubs.client.Constants;
+import org.apache.storm.eventhubs.client.EventHubClient;
+import org.apache.storm.eventhubs.client.EventHubException;
+import org.apache.storm.eventhubs.client.EventHubReceiver;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.messaging.MessageAnnotations;
+
+public class EventHubReceiverImpl implements IEventHubReceiver {
+  private static final Logger logger = LoggerFactory.getLogger(EventHubReceiverImpl.class);
+  private static final Symbol OffsetKey = Symbol.valueOf("x-opt-offset");
+  private static final Symbol SequenceNumberKey = Symbol.valueOf("x-opt-sequence-number");
+
+  private final String connectionString;
+  private final String entityName;
+  private final String partitionId;
+  private final int defaultCredits;
+
+  private EventHubReceiver receiver;
+  private String lastOffset = null;
+  private ReducedMetric receiveApiLatencyMean;
+  private CountMetric receiveApiCallCount;
+  private CountMetric receiveMessageCount;
+
+  public EventHubReceiverImpl(EventHubSpoutConfig config, String partitionId) {
+    this.connectionString = config.getConnectionString();
+    this.entityName = config.getEntityPath();
+    this.defaultCredits = config.getReceiverCredits();
+    this.partitionId = partitionId;
+    receiveApiLatencyMean = new ReducedMetric(new MeanReducer());
+    receiveApiCallCount = new CountMetric();
+    receiveMessageCount = new CountMetric();
+  }
+
+  @Override
+  public void open(IEventHubReceiverFilter filter) throws EventHubException {
+    logger.info("creating eventhub receiver: partitionId=" + partitionId + ", offset=" + filter.getOffset()
+        + ", enqueueTime=" + filter.getEnqueueTime());
+    long start = System.currentTimeMillis();
+    EventHubClient eventHubClient = EventHubClient.create(connectionString, entityName);
+    if(filter.getOffset() != null) {
+      receiver = eventHubClient.getDefaultConsumerGroup().createReceiver(partitionId, filter.getOffset(), defaultCredits);
+    }
+    else if(filter.getEnqueueTime() != 0) {
+      receiver = eventHubClient.getDefaultConsumerGroup().createReceiver(partitionId, filter.getEnqueueTime(), defaultCredits);
+    }
+    else {
+      logger.error("Invalid IEventHubReceiverFilter, use default offset as filter");
+      receiver = eventHubClient.getDefaultConsumerGroup().createReceiver(partitionId, Constants.DefaultStartingOffset, defaultCredits);
+    }
+    long end = System.currentTimeMillis();
+    logger.info("created eventhub receiver, time taken(ms): " + (end-start));
+  }
+
+  @Override
+  public void close() {
+    if(receiver != null) {
+      receiver.close();
+      logger.info("closed eventhub receiver: partitionId=" + partitionId );
+      receiver = null;
+    }
+  }
+  
+  @Override
+  public boolean isOpen() {
+    return (receiver != null);
+  }
+
+  @Override
+  public EventData receive(long timeoutInMilliseconds) {
+    long start = System.currentTimeMillis();
+    Message message = receiver.receive(timeoutInMilliseconds);
+    long end = System.currentTimeMillis();
+    long millis = (end - start);
+    receiveApiLatencyMean.update(millis);
+    receiveApiCallCount.incr();
+
+    if (message == null) {
+      return null;
+    }
+    receiveMessageCount.incr();
+
+    //logger.info(String.format("received a message. PartitionId: %s, Offset: %s", partitionId, this.lastOffset));
+    MessageId messageId = createMessageId(message);
+
+    return EventData.create(message, messageId);
+  }
+  
+  private MessageId createMessageId(Message message) {
+    String offset = null;
+    long sequenceNumber = 0;
+
+    for (Section section : message.getPayload()) {
+      if (section instanceof MessageAnnotations) {
+        MessageAnnotations annotations = (MessageAnnotations) section;
+        HashMap annonationMap = (HashMap) annotations.getValue();
+
+        if (annonationMap.containsKey(OffsetKey)) {
+          offset = (String) annonationMap.get(OffsetKey);
+        }
+
+        if (annonationMap.containsKey(SequenceNumberKey)) {
+          sequenceNumber = (Long) annonationMap.get(SequenceNumberKey);
+        }
+      }
+    }
+
+    return MessageId.create(partitionId, offset, sequenceNumber);
+  }
+
+  @Override
+  public Map getMetricsData() {
+    Map ret = new HashMap();
+    ret.put(partitionId + "/receiveApiLatencyMean", receiveApiLatencyMean.getValueAndReset());
+    ret.put(partitionId + "/receiveApiCallCount", receiveApiCallCount.getValueAndReset());
+    ret.put(partitionId + "/receiveMessageCount", receiveMessageCount.getValueAndReset());
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java
new file mode 100755
index 0000000..9290e6e
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java
@@ -0,0 +1,258 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import backtype.storm.Config;
+import backtype.storm.metric.api.IMetric;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.amqp_1_0.client.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EventHubSpout extends BaseRichSpout {
+
+  private static final Logger logger = LoggerFactory.getLogger(EventHubSpout.class);
+
+  private final UUID instanceId;
+  private final EventHubSpoutConfig eventHubConfig;
+  private final IEventDataScheme scheme;
+  private final int checkpointIntervalInSeconds;
+
+  private IStateStore stateStore;
+  private IPartitionCoordinator partitionCoordinator;
+  private IPartitionManagerFactory pmFactory;
+  private IEventHubReceiverFactory recvFactory;
+  private SpoutOutputCollector collector;
+  private long lastCheckpointTime;
+  private int currentPartitionIndex = -1;
+
+  public EventHubSpout(EventHubSpoutConfig spoutConfig) {
+    this(spoutConfig, null, null, null);
+  }
+  
+  public EventHubSpout(EventHubSpoutConfig spoutConfig,
+      IStateStore store,
+      IPartitionManagerFactory pmFactory,
+      IEventHubReceiverFactory recvFactory) {
+    this.eventHubConfig = spoutConfig;
+    this.scheme = spoutConfig.getEventDataScheme();
+    this.instanceId = UUID.randomUUID();
+    this.checkpointIntervalInSeconds = spoutConfig.getCheckpointIntervalInSeconds();
+    this.lastCheckpointTime = System.currentTimeMillis();
+    stateStore = store;
+    this.pmFactory = pmFactory;
+    if(this.pmFactory == null) {
+      this.pmFactory = new IPartitionManagerFactory() {
+        @Override
+        public IPartitionManager create(EventHubSpoutConfig spoutConfig,
+            String partitionId, IStateStore stateStore,
+            IEventHubReceiver receiver) {
+          return new PartitionManager(spoutConfig, partitionId,
+              stateStore, receiver);
+        }
+      };
+    }
+    this.recvFactory = recvFactory;
+    if(this.recvFactory == null) {
+      this.recvFactory = new IEventHubReceiverFactory() {
+        @Override
+        public IEventHubReceiver create(EventHubSpoutConfig spoutConfig,
+            String partitionId) {
+          return new EventHubReceiverImpl(spoutConfig, partitionId);
+        }
+      };
+    }
+    
+  }
+  
+  /**
+   * This is a extracted method that is easy to test
+   * @param config
+   * @param totalTasks
+   * @param taskIndex
+   * @param collector
+   * @throws Exception
+   */
+  public void preparePartitions(Map config, int totalTasks, int taskIndex, SpoutOutputCollector collector) throws Exception {
+    this.collector = collector;
+    if(stateStore == null) {
+      String zkEndpointAddress = eventHubConfig.getZkConnectionString();
+      if (zkEndpointAddress == null || zkEndpointAddress.length() == 0) {
+        //use storm's zookeeper servers if not specified.
+        @SuppressWarnings("unchecked")
+        List<String> zkServers = (List<String>) config.get(Config.STORM_ZOOKEEPER_SERVERS);
+        Integer zkPort = ((Number) config.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
+        StringBuilder sb = new StringBuilder();
+        for (String zk : zkServers) {
+          if (sb.length() > 0) {
+            sb.append(',');
+          }
+          sb.append(zk+":"+zkPort);
+        }
+        zkEndpointAddress = sb.toString();
+      }
+      stateStore = new ZookeeperStateStore(zkEndpointAddress,
+          (Integer)config.get(Config.STORM_ZOOKEEPER_RETRY_TIMES),
+          (Integer)config.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL));
+    }
+    stateStore.open();
+
+    partitionCoordinator = new StaticPartitionCoordinator(
+        eventHubConfig, taskIndex, totalTasks, stateStore, pmFactory, recvFactory);
+
+    for (IPartitionManager partitionManager : 
+      partitionCoordinator.getMyPartitionManagers()) {
+      partitionManager.open();
+    }
+  }
+
+  @Override
+  public void open(Map config, TopologyContext context, SpoutOutputCollector collector) {
+    logger.info("begin: open()");
+    String topologyName = (String) config.get(Config.TOPOLOGY_NAME);
+    eventHubConfig.setTopologyName(topologyName);
+
+    int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
+    int taskIndex = context.getThisTaskIndex();
+    if (totalTasks > eventHubConfig.getPartitionCount()) {
+      throw new RuntimeException("total tasks of EventHubSpout is greater than partition count.");
+    }
+
+    logger.info(String.format("topologyName: %s, totalTasks: %d, taskIndex: %d", topologyName, totalTasks, taskIndex));
+
+    try {
+      preparePartitions(config, totalTasks, taskIndex, collector);
+    } catch (Exception e) {
+      logger.error(e.getMessage());
+      throw new RuntimeException(e);
+    }
+    
+    //register metrics
+    context.registerMetric("EventHubReceiver", new IMetric() {
+      @Override
+      public Object getValueAndReset() {
+          Map concatMetricsDataMaps = new HashMap();
+          for (IPartitionManager partitionManager : 
+            partitionCoordinator.getMyPartitionManagers()) {
+            concatMetricsDataMaps.putAll(partitionManager.getMetricsData());
+          }
+          return concatMetricsDataMaps;
+      }
+    }, (Integer)config.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS));
+    logger.info("end open()");
+  }
+
+  @Override
+  public void nextTuple() {
+    EventData eventData = null;
+
+    List<IPartitionManager> partitionManagers = partitionCoordinator.getMyPartitionManagers();
+    for (int i = 0; i < partitionManagers.size(); i++) {
+      currentPartitionIndex = (currentPartitionIndex + 1) % partitionManagers.size();
+      IPartitionManager partitionManager = partitionManagers.get(currentPartitionIndex);
+
+      if (partitionManager == null) {
+        throw new RuntimeException("partitionManager doesn't exist.");
+      }
+
+      eventData = partitionManager.receive();
+
+      if (eventData != null) {
+        break;
+      }
+    }
+
+
+    if (eventData != null) {
+      MessageId messageId = eventData.getMessageId();
+      Message message = eventData.getMessage();
+
+      List<Object> tuples = scheme.deserialize(message);
+
+      if (tuples != null) {
+        collector.emit(tuples, messageId);
+      }
+    }
+    
+    checkpointIfNeeded();
+
+    // We don't need to sleep here because the IPartitionManager.receive() is
+    // a blocked call so it's fine to call this function in a tight loop.
+  }
+
+  @Override
+  public void ack(Object msgId) {
+    MessageId messageId = (MessageId) msgId;
+    IPartitionManager partitionManager = partitionCoordinator.getPartitionManager(messageId.getPartitionId());
+    String offset = messageId.getOffset();
+    partitionManager.ack(offset);
+  }
+
+  @Override
+  public void fail(Object msgId) {
+    MessageId messageId = (MessageId) msgId;
+    IPartitionManager partitionManager = partitionCoordinator.getPartitionManager(messageId.getPartitionId());
+    String offset = messageId.getOffset();
+    partitionManager.fail(offset);
+  }
+
+  @Override
+  public void deactivate() {
+    // let's checkpoint so that we can get the last checkpoint when restarting.
+    checkpoint();
+  }
+
+  @Override
+  public void close() {
+    for (IPartitionManager partitionManager : 
+      partitionCoordinator.getMyPartitionManagers()) {
+      partitionManager.close();
+    }
+    stateStore.close();
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    declarer.declare(scheme.getOutputFields());
+  }
+
+  private void checkpointIfNeeded() {
+    long nextCheckpointTime = lastCheckpointTime + checkpointIntervalInSeconds * 1000;
+    if (nextCheckpointTime < System.currentTimeMillis()) {
+
+      checkpoint();
+      lastCheckpointTime = System.currentTimeMillis();
+    }
+  }
+  
+  private void checkpoint() {
+    for (IPartitionManager partitionManager : 
+      partitionCoordinator.getMyPartitionManagers()) {
+      partitionManager.checkpoint();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
new file mode 100755
index 0000000..ae11680
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
@@ -0,0 +1,165 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.List;
+
+public class EventHubSpoutConfig implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+  private final String userName;
+  private final String password;
+  private final String namespace;
+  private final String entityPath;
+  private final String zkConnectionString;
+  private final int partitionCount;
+  private final int checkpointIntervalInSeconds;
+  private final int receiverCredits;
+  private final int maxPendingMsgsPerPartition;
+  private final long enqueueTimeFilter; //timestamp in millisecond
+
+  private String connectionString;
+  private String targetFqnAddress;
+  private String topologyName;
+  private IEventDataScheme scheme;
+
+  public EventHubSpoutConfig(String username, String password, String namespace,
+      String entityPath, int partitionCount, String zkConnectionString) {
+    this(username, password, namespace, entityPath, partitionCount,
+        zkConnectionString, 10, 1024, 1024, 0);
+  }
+  
+  //Keep this constructor for backward compatibility
+  public EventHubSpoutConfig(String username, String password, String namespace,
+      String entityPath, int partitionCount, String zkConnectionString,
+      int checkpointIntervalInSeconds, int receiverCredits) {
+    this(username, password, namespace, entityPath, partitionCount,
+        zkConnectionString, checkpointIntervalInSeconds, receiverCredits, 1024, 0);
+  }
+      
+  public EventHubSpoutConfig(String username, String password, String namespace,
+    String entityPath, int partitionCount, String zkConnectionString,
+    int checkpointIntervalInSeconds, int receiverCredits, int maxPendingMsgsPerPartition, long enqueueTimeFilter) {
+    this.userName = username;
+    this.password = password;
+    this.connectionString = buildConnectionString(username, password, namespace);
+    this.namespace = namespace;
+    this.entityPath = entityPath;
+    this.partitionCount = partitionCount;
+    this.zkConnectionString = zkConnectionString;
+    this.checkpointIntervalInSeconds = checkpointIntervalInSeconds;
+    this.receiverCredits = receiverCredits;
+    this.maxPendingMsgsPerPartition = maxPendingMsgsPerPartition;
+    this.enqueueTimeFilter = enqueueTimeFilter;
+    this.scheme = new EventDataScheme();
+  }
+
+  public String getConnectionString() {
+    return connectionString;
+  }
+
+  public String getNamespace() {
+    return namespace;
+  }
+
+  public String getEntityPath() {
+    return entityPath;
+  }
+
+  public String getZkConnectionString() {
+    return zkConnectionString;
+  }
+
+  public int getCheckpointIntervalInSeconds() {
+    return checkpointIntervalInSeconds;
+  }
+
+  public int getPartitionCount() {
+    return partitionCount;
+  }
+  
+  public int getReceiverCredits() {
+    return receiverCredits;
+  }
+  
+  public int getMaxPendingMsgsPerPartition() {
+    return maxPendingMsgsPerPartition;
+  }
+  
+  public long getEnqueueTimeFilter() {
+    return enqueueTimeFilter;
+  }
+
+  public String getTopologyName() {
+    return topologyName;
+  }
+
+  public void setTopologyName(String value) {
+    topologyName = value;
+  }
+
+  public IEventDataScheme getEventDataScheme() {
+    return scheme;
+  }
+
+  public void setEventDataScheme(IEventDataScheme scheme) {
+    this.scheme = scheme;
+  }
+
+  public List<String> getPartitionList() {
+    List<String> partitionList = new ArrayList<String>();
+
+    for (int i = 0; i < this.partitionCount; i++) {
+      partitionList.add(Integer.toString(i));
+    }
+
+    return partitionList;
+  }
+  
+  public void setTargetAddress(String targetFqnAddress) {
+    this.targetFqnAddress = targetFqnAddress;
+    this.connectionString = buildConnectionString(
+        this.userName, this.password, this.namespace, this.targetFqnAddress);
+  }
+
+  public static String buildConnectionString(String username, String password, String namespace) {
+    String targetFqnAddress = "servicebus.windows.net";
+    return buildConnectionString(username, password, namespace, targetFqnAddress);
+  }
+
+  public static String buildConnectionString(String username, String password,
+      String namespace, String targetFqnAddress) {
+    return "amqps://" + username + ":" + encodeString(password)
+        + "@" + namespace + "." + targetFqnAddress;
+  }	
+
+  private static String encodeString(String input) {
+    try {
+      return URLEncoder.encode(input, "UTF-8");
+    } catch (UnsupportedEncodingException e) {
+      //We don't need to throw this exception because the exception won't
+      //happen because of user input. Our unit tests will catch this error.
+      return "";
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutException.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutException.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutException.java
new file mode 100755
index 0000000..0fd6ac4
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutException.java
@@ -0,0 +1,37 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+public class EventHubSpoutException extends Exception {
+
+  public EventHubSpoutException() {
+    super();
+  }
+
+  public EventHubSpoutException(String message) {
+    super(message);
+  }
+
+  public EventHubSpoutException(Throwable cause) {
+    super(cause);
+  }
+
+  public EventHubSpoutException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java
new file mode 100755
index 0000000..bd655d6
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java
@@ -0,0 +1,25 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+public class FieldConstants {
+
+  public static final String PartitionKey = "partitionKey";
+  public static final String Offset = "offset";
+  public static final String Message = "message";
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java
new file mode 100755
index 0000000..c96767d
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java
@@ -0,0 +1,30 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import backtype.storm.tuple.Fields;
+import java.io.Serializable;
+import java.util.List;
+import org.apache.qpid.amqp_1_0.client.Message;
+
+public interface IEventDataScheme extends Serializable {
+
+  List<Object> deserialize(Message message);
+
+  Fields getOutputFields();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java
new file mode 100755
index 0000000..45e9e57
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java
@@ -0,0 +1,35 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import java.util.Map;
+
+import org.apache.storm.eventhubs.client.EventHubException;
+
+public interface IEventHubReceiver {
+
+  void open(IEventHubReceiverFilter filter) throws EventHubException;
+
+  void close();
+  
+  boolean isOpen();
+
+  EventData receive(long timeoutInMilliseconds);
+  
+  Map getMetricsData();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFactory.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFactory.java
new file mode 100755
index 0000000..fbebdc8
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFactory.java
@@ -0,0 +1,30 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+/**
+ * 
+ */
+package org.apache.storm.eventhubs.spout;
+
+import java.io.Serializable;
+
+/**
+ * An abstract factory to generate EventHubReceiver
+ */
+public interface IEventHubReceiverFactory extends Serializable {
+  IEventHubReceiver create(EventHubSpoutConfig config, String partitionId);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFilter.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFilter.java
new file mode 100755
index 0000000..e5b93cf
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFilter.java
@@ -0,0 +1,35 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+/**
+ * The filter to create an EventHubs receiver
+ */
+public interface IEventHubReceiverFilter {
+  /**
+   * Get offset to filter events based on offset 
+   * @return null if offset not set
+   */
+  String getOffset();
+  
+  /**
+   * Get timestamp to filter events based on enqueue time.
+   * @return 0 if enqueue time is not set
+   */
+  long getEnqueueTime();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionCoordinator.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionCoordinator.java
new file mode 100755
index 0000000..a460681
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionCoordinator.java
@@ -0,0 +1,27 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import java.util.List;
+
+public interface IPartitionCoordinator {
+
+  List<IPartitionManager> getMyPartitionManagers();
+
+  IPartitionManager getPartitionManager(String partitionId);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java
new file mode 100755
index 0000000..ac986d3
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java
@@ -0,0 +1,37 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import java.util.Map;
+
+public interface IPartitionManager {
+
+  void open() throws Exception;
+
+  void close();
+
+  EventData receive();
+
+  void checkpoint();
+
+  void ack(String offset);
+
+  void fail(String offset);
+  
+  Map getMetricsData();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManagerFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManagerFactory.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManagerFactory.java
new file mode 100755
index 0000000..dc136eb
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManagerFactory.java
@@ -0,0 +1,33 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+/**
+ * 
+ */
+package org.apache.storm.eventhubs.spout;
+
+import java.io.Serializable;
+
+/**
+ * An interface of factory method to create IPartitionManager
+ */
+public interface IPartitionManagerFactory extends Serializable {
+  IPartitionManager create(EventHubSpoutConfig spoutConfig,
+      String partitionId,
+      IStateStore stateStore,
+      IEventHubReceiver receiver);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IStateStore.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IStateStore.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IStateStore.java
new file mode 100755
index 0000000..f0ee2be
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IStateStore.java
@@ -0,0 +1,31 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import java.io.Serializable;
+
+public interface IStateStore extends Serializable {
+
+  public void open();
+
+  public void close();
+
+  public void saveData(String path, String data);
+
+  public String readData(String path);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/MessageId.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/MessageId.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/MessageId.java
new file mode 100755
index 0000000..2247f1f
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/MessageId.java
@@ -0,0 +1,56 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+public class MessageId {
+
+  private final String partitionId;
+  private final String offset;
+  private final long sequenceNumber;
+
+  public MessageId(
+    String partitionId,
+    String offset,
+    long sequenceNumber) {
+    this.partitionId = partitionId;
+    this.offset = offset;
+    this.sequenceNumber = sequenceNumber;
+  }
+
+  public static MessageId create(String partitionId, String offset, long sequenceNumber) {
+    return new MessageId(partitionId, offset, sequenceNumber);
+  }
+
+  public String getPartitionId() {
+    return this.partitionId;
+  }
+
+  public String getOffset() {
+    return this.offset;
+  }
+
+  public Long getSequenceNumber() {
+    return this.sequenceNumber;
+  }
+  
+  @Override
+  public String toString() {
+    return String.format("PartitionId: %s, Offset: %s, SequenceNumber: %s",
+      this.partitionId, this.offset, this.sequenceNumber);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/PartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/PartitionManager.java
new file mode 100755
index 0000000..1054742
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/PartitionManager.java
@@ -0,0 +1,101 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.TreeSet;
+
+public class PartitionManager extends SimplePartitionManager {
+  private static final Logger logger = LoggerFactory.getLogger(PartitionManager.class);
+  private final int ehReceiveTimeoutMs = 5000;
+
+  //all sent events are stored in pending
+  private final Map<String, EventData> pending;
+  //all failed events are put in toResend, which is sorted by event's offset
+  private final TreeSet<EventData> toResend;
+
+  public PartitionManager(
+    EventHubSpoutConfig spoutConfig,
+    String partitionId,
+    IStateStore stateStore,
+    IEventHubReceiver receiver) {
+
+    super(spoutConfig, partitionId, stateStore, receiver);
+    
+    this.pending = new LinkedHashMap<String, EventData>();
+    this.toResend = new TreeSet<EventData>();
+  }
+
+  @Override
+  public EventData receive() {
+    if(pending.size() >= config.getMaxPendingMsgsPerPartition()) {
+      return null;
+    }
+
+    EventData eventData;
+    if (toResend.isEmpty()) {
+      eventData = receiver.receive(ehReceiveTimeoutMs);
+    } else {
+      eventData = toResend.pollFirst();
+    }
+
+    if (eventData != null) {
+      lastOffset = eventData.getMessageId().getOffset();
+      pending.put(lastOffset, eventData);
+    }
+
+    return eventData;
+  }
+
+  @Override
+  public void ack(String offset) {
+    pending.remove(offset);
+  }
+
+  @Override
+  public void fail(String offset) {
+    logger.warn("fail on " + offset);
+    EventData eventData = pending.remove(offset);
+    toResend.add(eventData);
+  }
+  
+  @Override
+  protected String getCompletedOffset() {
+    String offset = null;
+    
+    if(pending.size() > 0) {
+      //find the smallest offset in pending list
+      offset = pending.keySet().iterator().next();
+    }
+    if(toResend.size() > 0) {
+      //find the smallest offset in toResend list
+      String offset2 = toResend.first().getMessageId().getOffset();
+      if(offset == null || offset2.compareTo(offset) < 0) {
+        offset = offset2;
+      }
+    }
+    if(offset == null) {
+      offset = lastOffset;
+    }
+    return offset;
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java
new file mode 100755
index 0000000..bcbcbac
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java
@@ -0,0 +1,136 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.eventhubs.client.Constants;
+
+/**
+ * A simple partition manager that does not re-send failed messages
+ */
+public class SimplePartitionManager implements IPartitionManager {
+  private static final Logger logger = LoggerFactory.getLogger(SimplePartitionManager.class);
+  private static final String statePathPrefix = "/eventhubspout";
+
+  protected final IEventHubReceiver receiver;
+  protected String lastOffset = "-1";
+  protected String committedOffset = "-1";
+  
+  protected final EventHubSpoutConfig config;
+  private final String partitionId;
+  private final IStateStore stateStore;
+  private final String statePath;
+  
+  public SimplePartitionManager(
+      EventHubSpoutConfig spoutConfig,
+      String partitionId,
+      IStateStore stateStore,
+      IEventHubReceiver receiver) {
+    this.receiver = receiver;
+    this.config = spoutConfig;
+    this.partitionId = partitionId;
+    this.statePath = this.getPartitionStatePath();
+    this.stateStore = stateStore;
+  }
+  
+  @Override
+  public void open() throws Exception {
+
+    //read from state store, if not found, use startingOffset
+    String offset = stateStore.readData(statePath);
+    logger.info("read offset from state store: " + offset);
+    if(offset == null) {
+      offset = Constants.DefaultStartingOffset;
+    }
+
+    EventHubReceiverFilter filter = new EventHubReceiverFilter();
+    if (offset.equals(Constants.DefaultStartingOffset)
+        && config.getEnqueueTimeFilter() != 0) {
+      filter.setEnqueueTime(config.getEnqueueTimeFilter());
+    }
+    else {
+      filter.setOffset(offset);
+    }
+
+    receiver.open(filter);
+  }
+  
+  @Override
+  public void close() {
+    this.receiver.close();
+    this.checkpoint();
+  }
+  
+  @Override
+  public void checkpoint() {
+    String completedOffset = getCompletedOffset();
+    if(!committedOffset.equals(completedOffset)) {
+      logger.info("saving state " + completedOffset);
+      stateStore.saveData(statePath, completedOffset);
+      committedOffset = completedOffset;
+    }
+  }
+  
+  protected String getCompletedOffset() {
+    return lastOffset;
+  }
+
+  @Override
+  public EventData receive() {
+    EventData eventData = receiver.receive(5000);
+    if (eventData != null) {
+      lastOffset = eventData.getMessageId().getOffset();
+    }
+    return eventData;
+  }
+
+  @Override
+  public void ack(String offset) {
+    //do nothing
+  }
+
+  @Override
+  public void fail(String offset) {
+    logger.warn("fail on " + offset);
+    //do nothing
+  }
+  
+  private String getPartitionStatePath() {
+
+    // Partition state path = 
+    // "/{prefix}/{topologyName}/{namespace}/{entityPath}/partitions/{partitionId}/state";
+    String namespace = config.getNamespace();
+    String entityPath = config.getEntityPath();
+    String topologyName = config.getTopologyName();
+
+    String partitionStatePath = statePathPrefix + "/" + topologyName + "/" + namespace + "/" + entityPath + "/partitions/" + this.partitionId;
+
+    logger.info("partition state path: " + partitionStatePath);
+
+    return partitionStatePath;
+  }
+  
+  @Override
+  public Map getMetricsData() {
+    return receiver.getMetricsData();
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StaticPartitionCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StaticPartitionCoordinator.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StaticPartitionCoordinator.java
new file mode 100755
index 0000000..3f5f156
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StaticPartitionCoordinator.java
@@ -0,0 +1,85 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.eventhubs.client.Constants;
+
+public class StaticPartitionCoordinator implements IPartitionCoordinator {
+
+  private static final Logger logger = LoggerFactory.getLogger(StaticPartitionCoordinator.class);
+
+  protected final EventHubSpoutConfig config;
+  protected final int taskIndex;
+  protected final int totalTasks;
+  protected final List<IPartitionManager> partitionManagers;
+  protected final Map<String, IPartitionManager> partitionManagerMap;
+  protected final IStateStore stateStore;
+
+  public StaticPartitionCoordinator(
+    EventHubSpoutConfig spoutConfig,
+    int taskIndex,
+    int totalTasks,
+    IStateStore stateStore,
+    IPartitionManagerFactory pmFactory,
+    IEventHubReceiverFactory recvFactory) {
+
+    this.config = spoutConfig;
+    this.taskIndex = taskIndex;
+    this.totalTasks = totalTasks;
+    this.stateStore = stateStore;
+    List<String> partitionIds = calculateParititionIdsToOwn();
+    partitionManagerMap = new HashMap<String, IPartitionManager>();
+    partitionManagers = new ArrayList<IPartitionManager>();
+    
+    for (String partitionId : partitionIds) {
+      IEventHubReceiver receiver = recvFactory.create(config, partitionId);
+      IPartitionManager partitionManager = pmFactory.create(
+          config, partitionId, stateStore, receiver);
+      partitionManagerMap.put(partitionId, partitionManager);
+      partitionManagers.add(partitionManager);
+    }
+  }
+
+  @Override
+  public List<IPartitionManager> getMyPartitionManagers() {
+    return partitionManagers;
+  }
+
+  @Override
+  public IPartitionManager getPartitionManager(String partitionId) {
+    return partitionManagerMap.get(partitionId);
+  }
+
+  protected List<String> calculateParititionIdsToOwn() {
+    List<String> taskPartitions = new ArrayList<String>();
+    for (int i = this.taskIndex; i < config.getPartitionCount(); i += this.totalTasks) {
+      taskPartitions.add(Integer.toString(i));
+      logger.info(String.format("taskIndex %d owns partitionId %d.", this.taskIndex, i));
+    }
+
+    return taskPartitions;
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/ZookeeperStateStore.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/ZookeeperStateStore.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/ZookeeperStateStore.java
new file mode 100755
index 0000000..6af9df6
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/ZookeeperStateStore.java
@@ -0,0 +1,95 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZookeeperStateStore implements IStateStore {
+  private static final long serialVersionUID = 1L;
+  private static final Logger logger = LoggerFactory.getLogger(ZookeeperStateStore.class);
+
+  private final String zookeeperConnectionString;
+  private final CuratorFramework curatorFramework;
+  
+  public ZookeeperStateStore(String zookeeperConnectionString) {
+    this(zookeeperConnectionString, 3, 100);
+  }
+
+  public ZookeeperStateStore(String connectionString, int retries, int retryInterval) {
+    if (connectionString == null) {
+      zookeeperConnectionString = "localhost:2181";
+    } else {
+      zookeeperConnectionString = connectionString;
+    }
+
+    RetryPolicy retryPolicy = new RetryNTimes(retries, retryInterval);
+    curatorFramework = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
+  }
+
+  @Override
+  public void open() {
+    curatorFramework.start();
+  }
+
+  @Override
+  public void close() {
+    curatorFramework.close();
+  }
+
+  @Override
+  public void saveData(String statePath, String data) {
+    data = data == null ? "" : data;
+    byte[] bytes = data.getBytes();
+
+    try {
+      if (curatorFramework.checkExists().forPath(statePath) == null) {
+        curatorFramework.create().creatingParentsIfNeeded().forPath(statePath, bytes);
+      } else {
+        curatorFramework.setData().forPath(statePath, bytes);
+      }
+
+      logger.info(String.format("data was saved. path: %s, data: %s.", statePath, data));
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public String readData(String statePath) {
+    try {
+      if (curatorFramework.checkExists().forPath(statePath) == null) {
+        // do we want to throw an exception if path doesn't exist??
+        return null;
+      } else {
+        byte[] bytes = curatorFramework.getData().forPath(statePath);
+        String data = new String(bytes);
+
+        logger.info(String.format("data was retrieved. path: %s, data: %s.", statePath, data));
+
+        return data;
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Coordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Coordinator.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Coordinator.java
new file mode 100755
index 0000000..a43193d
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Coordinator.java
@@ -0,0 +1,60 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import storm.trident.spout.IOpaquePartitionedTridentSpout;
+import storm.trident.spout.IPartitionedTridentSpout;
+
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+
+public class Coordinator implements IPartitionedTridentSpout.Coordinator<Partitions>,
+    IOpaquePartitionedTridentSpout.Coordinator<Partitions> {
+  private static final Logger logger = LoggerFactory.getLogger(Coordinator.class);
+  private final EventHubSpoutConfig spoutConfig;
+  Partitions partitions;
+  
+  public Coordinator(EventHubSpoutConfig spoutConfig) {
+    this.spoutConfig = spoutConfig;
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public Partitions getPartitionsForBatch() {
+    if(partitions != null) {
+      return partitions;
+    }
+    
+    partitions = new Partitions();
+    for(int i=0; i<spoutConfig.getPartitionCount(); ++i) {
+      partitions.addPartition(new Partition(spoutConfig, Integer.toString(i)));
+    }
+    logger.info("created partitions, size=" + spoutConfig.getPartitionCount());
+    return partitions;
+  }
+
+  @Override
+  public boolean isReady(long txid) {
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java
new file mode 100755
index 0000000..fbe779d
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java
@@ -0,0 +1,35 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import java.util.List;
+
+import org.apache.storm.eventhubs.spout.EventData;
+
+public interface ITridentPartitionManager {
+  boolean open(String offset);
+  void close();
+  
+  /**
+   * receive a batch of messages from EvenHub up to "count" messages
+   * @param offset the starting offset
+   * @param count max number of messages in this batch
+   * @return list of EventData, if failed to receive, return empty list
+   */
+  public List<EventData> receiveBatch(String offset, int count);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManagerFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManagerFactory.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManagerFactory.java
new file mode 100755
index 0000000..5804e28
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManagerFactory.java
@@ -0,0 +1,26 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import java.io.Serializable;
+
+import org.apache.storm.eventhubs.spout.IEventHubReceiver;
+
+public interface ITridentPartitionManagerFactory extends Serializable {
+  ITridentPartitionManager create(IEventHubReceiver receiver);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java
new file mode 100755
index 0000000..c7bd8c3
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java
@@ -0,0 +1,69 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+import org.apache.storm.eventhubs.spout.IEventHubReceiverFactory;
+
+import storm.trident.operation.TridentCollector;
+import storm.trident.spout.IOpaquePartitionedTridentSpout;
+import storm.trident.topology.TransactionAttempt;
+
+/**
+ * A thin wrapper of TransactionalTridentEventHubEmitter for OpaqueTridentEventHubSpout
+ */
+public class OpaqueTridentEventHubEmitter implements IOpaquePartitionedTridentSpout.Emitter<Partitions, Partition, Map> {
+  private final TransactionalTridentEventHubEmitter transactionalEmitter;
+  public OpaqueTridentEventHubEmitter(EventHubSpoutConfig spoutConfig) {
+    transactionalEmitter = new TransactionalTridentEventHubEmitter(spoutConfig);
+  }
+  
+  public OpaqueTridentEventHubEmitter(EventHubSpoutConfig spoutConfig,
+      int batchSize,
+      ITridentPartitionManagerFactory pmFactory,
+      IEventHubReceiverFactory recvFactory) {
+    transactionalEmitter = new TransactionalTridentEventHubEmitter(spoutConfig,
+        batchSize,
+        pmFactory,
+        recvFactory);
+  }
+
+  @Override
+  public void close() {
+    transactionalEmitter.close();
+  }
+
+  @Override
+  public Map emitPartitionBatch(TransactionAttempt attempt, TridentCollector collector,
+      Partition partition, Map meta) {
+    return transactionalEmitter.emitPartitionBatchNew(attempt, collector, partition, meta);
+  }
+
+  @Override
+  public List<Partition> getOrderedPartitions(Partitions partitions) {
+    return transactionalEmitter.getOrderedPartitions(partitions);
+  }
+
+  @Override
+  public void refreshPartitions(List<Partition> partitionList) {
+    transactionalEmitter.refreshPartitions(partitionList);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubSpout.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubSpout.java
new file mode 100755
index 0000000..46084ef
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubSpout.java
@@ -0,0 +1,64 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import java.util.Map;
+
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+import org.apache.storm.eventhubs.spout.IEventDataScheme;
+
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Fields;
+import storm.trident.spout.IOpaquePartitionedTridentSpout;
+
+/**
+ * Opaque Trident EventHubs Spout
+ */
+public class OpaqueTridentEventHubSpout implements IOpaquePartitionedTridentSpout<Partitions, Partition, Map> {
+  private static final long serialVersionUID = 1L;
+  private final IEventDataScheme scheme;
+  private final EventHubSpoutConfig spoutConfig;
+
+  public OpaqueTridentEventHubSpout(EventHubSpoutConfig config) {
+    spoutConfig = config;
+    scheme = spoutConfig.getEventDataScheme();
+  }
+
+  @Override
+  public Map getComponentConfiguration() {
+    return null;
+  }
+
+  @Override
+  public IOpaquePartitionedTridentSpout.Coordinator<Partitions> getCoordinator(
+      Map conf, TopologyContext context) {
+    return new org.apache.storm.eventhubs.trident.Coordinator(spoutConfig);
+  }
+
+  @Override
+  public IOpaquePartitionedTridentSpout.Emitter<Partitions, Partition, Map> getEmitter(
+      Map conf, TopologyContext context) {
+    return new OpaqueTridentEventHubEmitter(spoutConfig);
+  }
+
+  @Override
+  public Fields getOutputFields() {
+    return scheme.getOutputFields();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partition.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partition.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partition.java
new file mode 100755
index 0000000..8b166cd
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partition.java
@@ -0,0 +1,39 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import java.io.Serializable;
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+import storm.trident.spout.ISpoutPartition;
+
+/**
+ * Represents an EventHub partition
+ */
+public class Partition implements ISpoutPartition, Serializable {
+  private static final long serialVersionUID = 1L;
+  String partitionId;
+  
+  public Partition(EventHubSpoutConfig config, String partitionId) {
+    this.partitionId = partitionId;
+  }
+  
+  @Override
+  public String getId() {
+    return partitionId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partitions.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partitions.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partitions.java
new file mode 100755
index 0000000..235f5b6
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partitions.java
@@ -0,0 +1,41 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Represents all EventHub partitions a spout is receiving messages from.
+ */
+public class Partitions implements Serializable {
+  private static final long serialVersionUID = 1L;
+  private List<Partition> partitionList;
+  public Partitions() {
+    partitionList = new ArrayList<Partition>();
+  }
+  
+  public void addPartition(Partition partition) {
+    partitionList.add(partition);
+  }
+  
+  public List<Partition> getPartitions() {
+    return partitionList;
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java
new file mode 100755
index 0000000..2b92c3c
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java
@@ -0,0 +1,167 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.trident;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.eventhubs.spout.EventData;
+import org.apache.storm.eventhubs.spout.EventHubReceiverImpl;
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+import org.apache.storm.eventhubs.spout.IEventHubReceiver;
+import org.apache.storm.eventhubs.spout.IEventHubReceiverFactory;
+import org.apache.storm.eventhubs.client.Constants;
+
+import storm.trident.operation.TridentCollector;
+import storm.trident.spout.IOpaquePartitionedTridentSpout;
+import storm.trident.spout.IPartitionedTridentSpout;
+import storm.trident.topology.TransactionAttempt;
+
+
+public class TransactionalTridentEventHubEmitter
+    implements IPartitionedTridentSpout.Emitter<Partitions, Partition, Map> {
+  private static final Logger logger = LoggerFactory.getLogger(TransactionalTridentEventHubEmitter.class);
+  private final int batchSize; 
+  private final EventHubSpoutConfig spoutConfig;
+  private Map<String, ITridentPartitionManager> pmMap;
+  private ITridentPartitionManagerFactory pmFactory;
+  private IEventHubReceiverFactory recvFactory;
+
+  public TransactionalTridentEventHubEmitter(EventHubSpoutConfig spoutConfig) {
+    //use batch size that matches the default credit size
+    this(spoutConfig, spoutConfig.getReceiverCredits(), null, null);
+  }
+      
+  public TransactionalTridentEventHubEmitter(final EventHubSpoutConfig spoutConfig,
+      int batchSize,
+      ITridentPartitionManagerFactory pmFactory,
+      IEventHubReceiverFactory recvFactory) {
+    this.spoutConfig = spoutConfig;
+    this.batchSize = batchSize;
+    this.pmFactory = pmFactory;
+    this.recvFactory = recvFactory;
+    pmMap = new HashMap<String, ITridentPartitionManager>();
+    if(this.pmFactory == null) {
+      this.pmFactory = new ITridentPartitionManagerFactory() {
+        @Override
+        public ITridentPartitionManager create(IEventHubReceiver receiver) {
+          return new TridentPartitionManager(spoutConfig, receiver);
+        }
+      };
+    }
+    if(this.recvFactory == null) {
+      this.recvFactory = new IEventHubReceiverFactory() {
+        @Override
+        public IEventHubReceiver create(EventHubSpoutConfig config,
+            String partitionId) {
+          return new EventHubReceiverImpl(config, partitionId);
+        }
+      };
+    }
+  }
+  
+  @Override
+  public void close() {
+    for(ITridentPartitionManager pm: pmMap.values()) {
+      pm.close();
+    }
+  }
+  
+  /**
+   * Check if partition manager for a given partiton is created
+   * if not, create it.
+   * @param partition
+   */
+  private ITridentPartitionManager getOrCreatePartitionManager(Partition partition) {
+    ITridentPartitionManager pm;
+    if(!pmMap.containsKey(partition.getId())) {
+      IEventHubReceiver receiver = recvFactory.create(spoutConfig, partition.getId());
+      pm = pmFactory.create(receiver);
+      pmMap.put(partition.getId(), pm);
+    }
+    else {
+      pm = pmMap.get(partition.getId());
+    }
+    return pm;
+  }
+
+  @Override
+  public void emitPartitionBatch(TransactionAttempt attempt,
+      TridentCollector collector, Partition partition, Map meta) {
+    String offset = (String)meta.get("offset");
+    int count = Integer.parseInt((String)meta.get("count"));
+    logger.info("re-emit for partition " + partition.getId() + ", offset=" + offset + ", count=" + count);
+    ITridentPartitionManager pm = getOrCreatePartitionManager(partition);
+    List<EventData> listEvents = pm.receiveBatch(offset, count);
+    if(listEvents.size() != count) {
+      logger.error("failed to refetch eventhub messages, new count=" + listEvents.size());
+      return;
+    }
+
+    for(EventData ed: listEvents) {
+      List<Object> tuples = 
+          spoutConfig.getEventDataScheme().deserialize(ed.getMessage());
+      collector.emit(tuples);
+    }
+  }
+
+  @Override
+  public Map emitPartitionBatchNew(TransactionAttempt attempt,
+      TridentCollector collector, Partition partition, Map meta) {
+    ITridentPartitionManager pm = getOrCreatePartitionManager(partition);
+    String offset = Constants.DefaultStartingOffset;
+    if(meta != null && meta.containsKey("nextOffset")) {
+      offset = (String)meta.get("nextOffset");
+    }
+    //logger.info("emit for partition " + partition.getId() + ", offset=" + offset);
+    String nextOffset = offset;
+
+    List<EventData> listEvents = pm.receiveBatch(offset, batchSize);
+
+    for(EventData ed: listEvents) {
+      //update nextOffset;
+      nextOffset = ed.getMessageId().getOffset();
+      List<Object> tuples = 
+          spoutConfig.getEventDataScheme().deserialize(ed.getMessage());
+      collector.emit(tuples);
+    }
+    //logger.info("emitted new batches: " + listEvents.size());
+    
+    Map newMeta = new HashMap();
+    newMeta.put("offset", offset);
+    newMeta.put("nextOffset", nextOffset);
+    newMeta.put("count", ""+listEvents.size());
+    return newMeta;
+  }
+
+  @Override
+  public List<Partition> getOrderedPartitions(Partitions partitions) {
+    return partitions.getPartitions();
+  }
+
+  @Override
+  public void refreshPartitions(List<Partition> partitionList) {
+    //partition info does not change in EventHub
+    return;
+  }
+
+}


[3/4] storm git commit: STORM-583: initial import of donated code

Posted by pt...@apache.org.
STORM-583: initial import of donated code


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/eff6cfa4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/eff6cfa4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/eff6cfa4

Branch: refs/heads/master
Commit: eff6cfa4d9bc6d991fb4e94be72040415f676187
Parents: b4351ed
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri Apr 24 15:50:19 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri Apr 24 15:50:19 2015 -0400

----------------------------------------------------------------------
 external/storm-eventhubs/README.md              |  37 +++
 external/storm-eventhubs/pom.xml                | 122 +++++++++
 .../storm/eventhubs/bolt/EventHubBolt.java      |  81 ++++++
 .../client/ConnectionStringBuilder.java         | 116 +++++++++
 .../storm/eventhubs/client/Constants.java       |  32 +++
 .../storm/eventhubs/client/EventHubClient.java  |  92 +++++++
 .../eventhubs/client/EventHubConsumerGroup.java |  72 ++++++
 .../eventhubs/client/EventHubException.java     |  37 +++
 .../eventhubs/client/EventHubReceiver.java      | 139 ++++++++++
 .../eventhubs/client/EventHubSendClient.java    |  70 +++++
 .../storm/eventhubs/client/EventHubSender.java  |  95 +++++++
 .../storm/eventhubs/client/SelectorFilter.java  |  38 +++
 .../eventhubs/client/SelectorFilterWriter.java  |  64 +++++
 .../eventhubs/samples/AtMostOnceEventCount.java |  54 ++++
 .../storm/eventhubs/samples/EventCount.java     | 155 +++++++++++
 .../storm/eventhubs/samples/EventHubLoop.java   |  51 ++++
 .../samples/OpaqueTridentEventCount.java        |  53 ++++
 .../samples/TransactionalTridentEventCount.java |  81 ++++++
 .../eventhubs/samples/bolt/GlobalCountBolt.java |  83 ++++++
 .../samples/bolt/PartialCountBolt.java          |  63 +++++
 .../apache/storm/eventhubs/spout/EventData.java |  48 ++++
 .../storm/eventhubs/spout/EventDataScheme.java  |  55 ++++
 .../eventhubs/spout/EventHubReceiverFilter.java |  56 ++++
 .../eventhubs/spout/EventHubReceiverImpl.java   | 150 +++++++++++
 .../storm/eventhubs/spout/EventHubSpout.java    | 258 +++++++++++++++++++
 .../eventhubs/spout/EventHubSpoutConfig.java    | 165 ++++++++++++
 .../eventhubs/spout/EventHubSpoutException.java |  37 +++
 .../storm/eventhubs/spout/FieldConstants.java   |  25 ++
 .../storm/eventhubs/spout/IEventDataScheme.java |  30 +++
 .../eventhubs/spout/IEventHubReceiver.java      |  35 +++
 .../spout/IEventHubReceiverFactory.java         |  30 +++
 .../spout/IEventHubReceiverFilter.java          |  35 +++
 .../eventhubs/spout/IPartitionCoordinator.java  |  27 ++
 .../eventhubs/spout/IPartitionManager.java      |  37 +++
 .../spout/IPartitionManagerFactory.java         |  33 +++
 .../storm/eventhubs/spout/IStateStore.java      |  31 +++
 .../apache/storm/eventhubs/spout/MessageId.java |  56 ++++
 .../storm/eventhubs/spout/PartitionManager.java | 101 ++++++++
 .../eventhubs/spout/SimplePartitionManager.java | 136 ++++++++++
 .../spout/StaticPartitionCoordinator.java       |  85 ++++++
 .../eventhubs/spout/ZookeeperStateStore.java    |  95 +++++++
 .../storm/eventhubs/trident/Coordinator.java    |  60 +++++
 .../trident/ITridentPartitionManager.java       |  35 +++
 .../ITridentPartitionManagerFactory.java        |  26 ++
 .../trident/OpaqueTridentEventHubEmitter.java   |  69 +++++
 .../trident/OpaqueTridentEventHubSpout.java     |  64 +++++
 .../storm/eventhubs/trident/Partition.java      |  39 +++
 .../storm/eventhubs/trident/Partitions.java     |  41 +++
 .../TransactionalTridentEventHubEmitter.java    | 167 ++++++++++++
 .../TransactionalTridentEventHubSpout.java      |  66 +++++
 .../trident/TridentPartitionManager.java        |  91 +++++++
 .../src/main/resources/Config.properties        |  27 ++
 .../eventhubs/spout/EventHubReceiverMock.java   | 105 ++++++++
 .../spout/EventHubSpoutCallerMock.java          |  96 +++++++
 .../spout/PartitionManagerCallerMock.java       | 105 ++++++++
 .../spout/SpoutOutputCollectorMock.java         |  61 +++++
 .../storm/eventhubs/spout/StateStoreMock.java   |  54 ++++
 .../storm/eventhubs/spout/TestEventData.java    |  47 ++++
 .../eventhubs/spout/TestEventHubSpout.java      |  70 +++++
 .../eventhubs/spout/TestPartitionManager.java   | 117 +++++++++
 .../TestTransactionalTridentEmitter.java        |  93 +++++++
 .../eventhubs/trident/TridentCollectorMock.java |  52 ++++
 62 files changed, 4545 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/README.md
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/README.md b/external/storm-eventhubs/README.md
new file mode 100755
index 0000000..ea8076b
--- /dev/null
+++ b/external/storm-eventhubs/README.md
@@ -0,0 +1,37 @@
+storm-eventhubs
+=====================
+
+Storm spout and bolt implementation for Microsoft Azure Eventhubs
+
+### build ###
+	mvn clean package
+
+### run sample topology ###
+To run the sample topology, you need to modify the config.properties file with
+the eventhubs configurations. Here is an example:
+
+	eventhubspout.username = [username: policy name in EventHubs Portal]
+	eventhubspout.password = [password: shared access key in EventHubs Portal]
+	eventhubspout.namespace = [namespace]
+	eventhubspout.entitypath = [entitypath]
+	eventhubspout.partitions.count = [partitioncount]
+
+	# if not provided, will use storm's zookeeper settings
+	# zookeeper.connectionstring=zookeeper0:2181,zookeeper1:2181,zookeeper2:2181
+
+	eventhubspout.checkpoint.interval = 10
+	eventhub.receiver.credits = 1024
+
+Then you can use storm.cmd to submit the sample topology:
+	storm jar {jarfile} com.microsoft.eventhubs.samples.EventCount {topologyname} {spoutconffile}
+	where the {jarfile} should be: eventhubs-storm-spout-{version}-jar-with-dependencies.jar
+
+### Run EventHubSendClient ###
+We have included a simple EventHubs send client for testing purpose. You can run the client like this:
+	java -cp .\target\eventhubs-storm-spout-{version}-jar-with-dependencies.jar com.microsoft.eventhubs.client.EventHubSendClient
+ 	[username] [password] [entityPath] [partitionId] [messageSize] [messageCount]
+If you want to send messages to all partitions, use "-1" as partitionId.
+
+### Windows Azure Eventhubs ###
+	http://azure.microsoft.com/en-us/services/event-hubs/
+

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/pom.xml b/external/storm-eventhubs/pom.xml
new file mode 100755
index 0000000..bc11dd1
--- /dev/null
+++ b/external/storm-eventhubs/pom.xml
@@ -0,0 +1,122 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>0.10.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+    
+    <artifactId>storm-eventhubs</artifactId>
+    <version>0.10.0-SNAPSHOT</version>
+    <packaging>jar</packaging>
+    <name>storm-eventhubs</name>
+    <description>EventHubs Storm Spout</description>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <qpid.version>0.28</qpid.version>
+    </properties>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>2.4.1</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>attached</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            <descriptorRefs>
+                                <descriptorRef>jar-with-dependencies</descriptorRef>
+                            </descriptorRefs>
+                            <archive>
+                                <manifest>
+                                    <mainClass>org.apache.storm.eventhubs.samples.EventCount</mainClass>
+                                </manifest>
+                            </archive>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+		        <artifactId>maven-antrun-plugin</artifactId>
+		        <executions>
+		          <execution>
+		            <phase>package</phase>
+		            <configuration>
+		              <tasks>
+		                <copy file="src/main/resources/config.properties" tofile="target/eventhubs-config.properties"/>
+                    </tasks>
+		            </configuration>
+		            <goals>
+		              <goal>run</goal>
+		            </goals>
+		          </execution>
+		        </executions>
+	        </plugin>
+        </plugins>
+    </build>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-client</artifactId>
+            <version>${qpid.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-amqp-1-0-client-jms</artifactId>
+            <version>${qpid.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <!-- keep storm out of the jar-with-dependencies -->
+            <type>jar</type>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-framework</artifactId>
+            <version>${curator.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.11</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies> 
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
new file mode 100755
index 0000000..8016be3
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
@@ -0,0 +1,81 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.bolt;
+
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.eventhubs.client.EventHubClient;
+import org.apache.storm.eventhubs.client.EventHubException;
+import org.apache.storm.eventhubs.client.EventHubSender;
+
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.BasicOutputCollector;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseBasicBolt;
+import backtype.storm.tuple.Tuple;
+
+/**
+ * A bolt that writes message to EventHub.
+ * We assume the incoming tuple has only one field which is a string.
+ */
+public class EventHubBolt extends BaseBasicBolt {
+  private static final long serialVersionUID = 1L;
+  private static final Logger logger = LoggerFactory
+      .getLogger(EventHubBolt.class);
+  
+  private EventHubSender sender;
+  private String connectionString;
+  private String entityPath;
+  
+  public EventHubBolt(String connectionString, String entityPath) {
+    this.connectionString = connectionString;
+    this.entityPath = entityPath;
+  }
+  
+  @Override
+  public void prepare(Map config, TopologyContext context) {
+    try {
+      EventHubClient eventHubClient = EventHubClient.create(connectionString, entityPath);
+      sender = eventHubClient.createPartitionSender(null);
+    }
+    catch(Exception ex) {
+      logger.error(ex.getMessage());
+      throw new RuntimeException(ex);
+    }
+
+  }
+
+  @Override
+  public void execute(Tuple tuple, BasicOutputCollector collector) {
+    try {
+      sender.send((String)tuple.getValue(0));
+    }
+    catch(EventHubException ex) {
+      logger.error(ex.getMessage());
+    }
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/ConnectionStringBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/ConnectionStringBuilder.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/ConnectionStringBuilder.java
new file mode 100755
index 0000000..518c88d
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/ConnectionStringBuilder.java
@@ -0,0 +1,116 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.client;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.net.URLDecoder;
+import java.net.URLStreamHandler;
+
+public class ConnectionStringBuilder {
+
+  private final String connectionString;
+
+  private String host;
+  private int port;
+  private String userName;
+  private String password;
+  private boolean ssl;
+
+  // amqps://[username]:[password]@[namespace].servicebus.windows.net/
+  public ConnectionStringBuilder(String connectionString) throws EventHubException {
+    this.connectionString = connectionString;
+    this.initialize();
+  }
+
+  public String getHost() {
+    return this.host;
+  }
+
+  public void setHost(String value) {
+    this.host = value;
+  }
+
+  public int getPort() {
+    return this.port;
+  }
+
+  public void setPort(int value) {
+    this.port = value;
+  }
+
+  public String getUserName() {
+    return this.userName;
+  }
+
+  public void setUserName(String value) {
+    this.userName = value;
+  }
+
+  public String getPassword() {
+    return this.password;
+  }
+
+  public void setPassword(String value) {
+    this.password = value;
+  }
+
+  public boolean getSsl() {
+    return this.ssl;
+  }
+
+  public void setSsl(boolean value) {
+    this.ssl = value;
+  }
+
+  private void initialize() throws EventHubException {
+
+    URL url;
+    try {
+      url = new URL(null, this.connectionString, new NullURLStreamHandler());
+    } catch (MalformedURLException e) {
+      throw new EventHubException("connectionString is not valid.", e);
+    }
+
+    String protocol = url.getProtocol();
+    this.ssl = protocol.equalsIgnoreCase(Constants.SslScheme);
+    this.host = url.getHost();
+    this.port = url.getPort();
+
+    if (this.port == -1) {
+      this.port = this.ssl ? Constants.DefaultSslPort : Constants.DefaultPort;
+    }
+
+    String userInfo = url.getUserInfo();
+    if (userInfo != null) {
+      String[] credentials = userInfo.split(":", 2);
+      this.userName = URLDecoder.decode(credentials[0]);
+      this.password = URLDecoder.decode(credentials[1]);
+    }
+  }
+
+  class NullURLStreamHandler extends URLStreamHandler {
+
+    @Override
+    protected URLConnection openConnection(URL u) throws IOException {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/Constants.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/Constants.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/Constants.java
new file mode 100755
index 0000000..d87ad53
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/Constants.java
@@ -0,0 +1,32 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.client;
+
+public class Constants {
+
+  public static final String DefaultStartingOffset = "-1";
+  public static final String SelectorFilterName = "apache.org:selector-filter:string";
+  public static final String OffsetFilterFormatString = "amqp.annotation.x-opt-offset > '%s'";
+  public static final String EnqueueTimeFilterFormatString = "amqp.annotation.x-opt-enqueuedtimeutc > %d";
+  public static final String ConsumerAddressFormatString = "%s/ConsumerGroups/%s/Partitions/%s";
+  public static final String DestinationAddressFormatString = "%s/Partitions/%s";
+
+  public static final String SslScheme = "amqps";
+  public static final int DefaultPort = 5672;
+  public static final int DefaultSslPort = 5671;
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubClient.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubClient.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubClient.java
new file mode 100755
index 0000000..e06091d
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubClient.java
@@ -0,0 +1,92 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.client;
+
+import org.apache.qpid.amqp_1_0.client.Connection;
+import org.apache.qpid.amqp_1_0.client.ConnectionErrorException;
+import org.apache.qpid.amqp_1_0.client.ConnectionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EventHubClient {
+
+  private static final String DefaultConsumerGroupName = "$default";
+  private static final Logger logger = LoggerFactory.getLogger(EventHubClient.class);
+  private static final long ConnectionSyncTimeout = 60000L;
+
+  private final String connectionString;
+  private final String entityPath;
+  private final Connection connection;
+
+  private EventHubClient(String connectionString, String entityPath) throws EventHubException {
+    this.connectionString = connectionString;
+    this.entityPath = entityPath;
+    this.connection = this.createConnection();
+  }
+
+  /**
+   * creates a new instance of EventHubClient using the supplied connection string and entity path.
+   *
+   * @param connectionString connection string to the namespace of event hubs. connection string format:
+   * amqps://{userId}:{password}@{namespaceName}.servicebus.windows.net
+   * @param entityPath the name of event hub entity.
+   *
+   * @return EventHubClient
+   * @throws org.apache.storm.eventhubs.client.EventHubException
+   */
+  public static EventHubClient create(String connectionString, String entityPath) throws EventHubException {
+    return new EventHubClient(connectionString, entityPath);
+  }
+
+  public EventHubSender createPartitionSender(String partitionId) throws Exception {
+    return new EventHubSender(this.connection.createSession(), this.entityPath, partitionId);
+  }
+
+  public EventHubConsumerGroup getDefaultConsumerGroup() {
+    return new EventHubConsumerGroup(this.connection, this.entityPath, DefaultConsumerGroupName);
+  }
+
+  public void close() {
+    try {
+      this.connection.close();
+    } catch (ConnectionErrorException e) {
+      logger.error(e.toString());
+    }
+  }
+
+  private Connection createConnection() throws EventHubException {
+    ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder(this.connectionString);
+    Connection clientConnection;
+
+    try {
+      clientConnection = new Connection(
+        connectionStringBuilder.getHost(),
+        connectionStringBuilder.getPort(),
+        connectionStringBuilder.getUserName(),
+        connectionStringBuilder.getPassword(),
+        connectionStringBuilder.getHost(),
+        connectionStringBuilder.getSsl());
+    } catch (ConnectionException e) {
+      logger.error(e.toString());
+      throw new EventHubException(e);
+    }
+    clientConnection.getEndpoint().setSyncTimeout(ConnectionSyncTimeout);
+    SelectorFilterWriter.register(clientConnection.getEndpoint().getDescribedTypeRegistry());
+    return clientConnection;
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubConsumerGroup.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubConsumerGroup.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubConsumerGroup.java
new file mode 100755
index 0000000..892ff9c
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubConsumerGroup.java
@@ -0,0 +1,72 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.client;
+
+import org.apache.qpid.amqp_1_0.client.Connection;
+import org.apache.qpid.amqp_1_0.client.ConnectionException;
+import org.apache.qpid.amqp_1_0.client.Session;
+
+public class EventHubConsumerGroup {
+
+  private final Connection connection;
+  private final String entityPath;
+  private final String consumerGroupName;
+
+  private Session session;
+
+  public EventHubConsumerGroup(Connection connection, String entityPath, String consumerGroupName) {
+    this.connection = connection;
+    this.entityPath = entityPath;
+    this.consumerGroupName = consumerGroupName;
+  }
+
+  public EventHubReceiver createReceiver(String partitionId, String startingOffset, int defaultCredits) throws EventHubException {
+    this.ensureSessionCreated();
+
+    if (startingOffset == null || startingOffset.equals("")) {
+      startingOffset = Constants.DefaultStartingOffset;
+    }
+
+    String filterStr = String.format(Constants.OffsetFilterFormatString, startingOffset);
+    return new EventHubReceiver(this.session, this.entityPath, this.consumerGroupName, partitionId, filterStr, defaultCredits);
+  }
+  
+  public EventHubReceiver createReceiver(String partitionId, long timeAfter, int defaultCredits) throws EventHubException {
+    this.ensureSessionCreated();
+
+    String filterStr = String.format(Constants.EnqueueTimeFilterFormatString, timeAfter);
+    return new EventHubReceiver(this.session, this.entityPath, this.consumerGroupName, partitionId, filterStr, defaultCredits);
+  }
+
+  public void close() {
+    if (this.session != null) {
+      this.session.close();
+    }
+  }
+
+  synchronized void ensureSessionCreated() throws EventHubException {
+
+    try {
+      if (this.session == null) {
+        this.session = this.connection.createSession();
+      }
+    } catch (ConnectionException e) {
+      throw new EventHubException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubException.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubException.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubException.java
new file mode 100755
index 0000000..3e94573
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubException.java
@@ -0,0 +1,37 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.client;
+
+public class EventHubException extends Exception {
+
+  public EventHubException() {
+    super();
+  }
+
+  public EventHubException(String message) {
+    super(message);
+  }
+
+  public EventHubException(Throwable cause) {
+    super(cause);
+  }
+
+  public EventHubException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubReceiver.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubReceiver.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubReceiver.java
new file mode 100755
index 0000000..c8900a8
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubReceiver.java
@@ -0,0 +1,139 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.client;
+
+import java.util.Collections;
+import java.util.Map;
+import org.apache.qpid.amqp_1_0.client.AcknowledgeMode;
+import org.apache.qpid.amqp_1_0.client.ConnectionErrorException;
+import org.apache.qpid.amqp_1_0.client.Message;
+import org.apache.qpid.amqp_1_0.client.Receiver;
+import org.apache.qpid.amqp_1_0.client.Session;
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.messaging.Filter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class EventHubReceiver {
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(EventHubReceiver.class);
+  private static final String linkName = "eventhubs-receiver-link";
+
+  private final Session session;
+  private final String entityPath;
+  private final String consumerGroupName;
+  private final String partitionId;
+  private final String consumerAddress;
+  private final Map<Symbol, Filter> filters;
+  private final int defaultCredits;
+
+  private Receiver receiver;
+  private boolean isClosed;
+
+  public EventHubReceiver(Session session, String entityPath,
+      String consumerGroupName, String partitionId, String filterStr, int defaultCredits)
+      throws EventHubException {
+
+    this.session = session;
+    this.entityPath = entityPath;
+    this.consumerGroupName = consumerGroupName;
+    this.partitionId = partitionId;
+    this.consumerAddress = this.getConsumerAddress();
+    this.filters = Collections.singletonMap(
+        Symbol.valueOf(Constants.SelectorFilterName),
+        (Filter) new SelectorFilter(filterStr));
+    logger.info("receiver filter string: " + filterStr);
+    this.defaultCredits = defaultCredits;
+
+    this.ensureReceiverCreated();
+  }
+
+  // receive without timeout means wait until a message is delivered.
+  public Message receive() {
+    return this.receive(-1L);
+  }
+
+  public Message receive(long waitTimeInMilliseconds) {
+
+    this.checkIfClosed();
+
+    Message message = this.receiver.receive(waitTimeInMilliseconds);
+
+    if (message != null) {
+      // Let's acknowledge a message although EH service doesn't need it
+      // to avoid AMQP flow issue.
+      receiver.acknowledge(message);
+
+      return message;
+    } else {
+      this.checkError();
+    }
+
+    return null;
+  }
+
+  public void close() {
+    if (!isClosed) {
+      receiver.close();
+      isClosed = true;
+    }
+  }
+
+  private String getConsumerAddress() {
+    return String.format(Constants.ConsumerAddressFormatString,
+        entityPath, consumerGroupName, partitionId);
+  }
+
+  private void ensureReceiverCreated() throws EventHubException {
+    try {
+      logger.info("defaultCredits: " + defaultCredits);
+      receiver = session.createReceiver(consumerAddress,
+          AcknowledgeMode.ALO, linkName, false, filters, null);
+      receiver.setCredit(UnsignedInteger.valueOf(defaultCredits), true);
+    } catch (ConnectionErrorException e) {
+      // caller (EventHubSpout) will log the error
+      throw new EventHubException(e);
+    }
+  }
+
+  private void checkError() {
+    org.apache.qpid.amqp_1_0.type.transport.Error error = this.receiver.getError();
+    if (error != null) {
+      String errorMessage = error.toString();
+      logger.error(errorMessage);
+      this.close();
+
+      throw new RuntimeException(errorMessage);
+    } else {
+      // adding a sleep here to avoid any potential tight-loop issue.
+      try {
+        Thread.sleep(10);
+      } catch (InterruptedException e) {
+        logger.error(e.toString());
+      }
+    }
+  }
+  
+  private void checkIfClosed() {
+    if (this.isClosed) {
+      throw new RuntimeException("receiver was closed.");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSendClient.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSendClient.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSendClient.java
new file mode 100755
index 0000000..ad31cc1
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSendClient.java
@@ -0,0 +1,70 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.client;
+
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+
+public class EventHubSendClient {
+  
+  public static void main(String[] args) throws Exception {
+    
+    if (args == null || args.length < 7) {
+      throw new IllegalArgumentException(
+        "arguments are missing. [username] [password] [namespace] [entityPath] [partitionId] [messageSize] [messageCount] are required.");
+    }
+    
+    String username = args[0];
+    String password = args[1];
+    String namespace = args[2];
+    String entityPath = args[3];
+    String partitionId = args[4];
+    int messageSize = Integer.parseInt(args[5]);
+    int messageCount = Integer.parseInt(args[6]);
+    assert(messageSize > 0);
+    assert(messageCount > 0);
+    
+    if (partitionId.equals("-1")) {
+      // -1 means we want to send data to partitions in round-robin fashion.
+      partitionId = null;
+    }
+    
+    try {
+      String connectionString = EventHubSpoutConfig.buildConnectionString(username, password, namespace);
+      EventHubClient client = EventHubClient.create(connectionString, entityPath);
+      EventHubSender sender = client.createPartitionSender(partitionId);
+      
+      StringBuilder sb = new StringBuilder(messageSize);
+      for(int i=1; i<messageCount+1; ++i) {
+        while(sb.length() < messageSize) {
+          sb.append(" current message: " + i);
+        }
+        sb.setLength(messageSize);
+        sender.send(sb.toString());
+        sb.setLength(0);
+        if(i % 1000 == 0) {
+          System.out.println("Number of messages sent: " + i);
+        }
+      }
+      System.out.println("Total Number of messages sent: " + messageCount);
+    } catch (Exception e) {
+      System.out.println("Exception: " + e.getMessage());
+    }
+    
+    System.out.println("done");
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSender.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSender.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSender.java
new file mode 100755
index 0000000..41b1d97
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSender.java
@@ -0,0 +1,95 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.client;
+
+import java.util.concurrent.TimeoutException;
+import org.apache.qpid.amqp_1_0.client.LinkDetachedException;
+import org.apache.qpid.amqp_1_0.client.Message;
+import org.apache.qpid.amqp_1_0.client.Sender;
+import org.apache.qpid.amqp_1_0.client.Session;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.messaging.Data;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EventHubSender {
+
+  private static final Logger logger = LoggerFactory.getLogger(EventHubSender.class);
+
+  private final Session session;
+  private final String entityPath;
+  private final String partitionId;
+  private final String destinationAddress;
+
+  private Sender sender;
+
+  public EventHubSender(Session session, String entityPath, String partitionId) {
+    this.session = session;
+    this.entityPath = entityPath;
+    this.partitionId = partitionId;
+    this.destinationAddress = this.getDestinationAddress();
+  }
+
+  public void send(String data) throws EventHubException {
+    try {
+      if (this.sender == null) {
+        this.ensureSenderCreated();
+      }
+
+      //For interop with other language, convert string to bytes
+      Binary bin = new Binary(data.getBytes());
+      Message message = new Message(new Data(bin));
+      this.sender.send(message);
+
+    } catch (LinkDetachedException e) {
+      logger.error(e.getMessage());
+
+      EventHubException eventHubException = new EventHubException("Sender has been closed");
+      throw eventHubException;
+    } catch (TimeoutException e) {
+      logger.error(e.getMessage());
+
+      EventHubException eventHubException = new EventHubException("Timed out while waiting to get credit to send");
+      throw eventHubException;
+    } catch (Exception e) {
+      logger.error(e.getMessage());
+    }
+  }
+
+  public void close() {
+    try {
+      this.sender.close();
+    } catch (Sender.SenderClosingException e) {
+      logger.error("Closing a sender encountered error: " + e.getMessage());
+    }
+  }
+
+  private String getDestinationAddress() {
+    if (this.partitionId == null || this.partitionId.equals("")) {
+      return this.entityPath;
+    } else {
+      return String.format(Constants.DestinationAddressFormatString, this.entityPath, this.partitionId);
+    }
+  }
+
+  private synchronized void ensureSenderCreated() throws Exception {
+    if (this.sender == null) {
+      this.sender = this.session.createSender(this.destinationAddress);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/SelectorFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/SelectorFilter.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/SelectorFilter.java
new file mode 100755
index 0000000..7869cce
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/SelectorFilter.java
@@ -0,0 +1,38 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.client;
+
+import org.apache.qpid.amqp_1_0.type.messaging.Filter;
+
+public class SelectorFilter implements Filter {
+
+  private final String value;
+
+  public SelectorFilter(String value) {
+    this.value = value;
+  }
+
+  public String getValue() {
+    return value;
+  }
+
+  @Override
+  public String toString() {
+    return value;
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/SelectorFilterWriter.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/SelectorFilterWriter.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/SelectorFilterWriter.java
new file mode 100755
index 0000000..102b6b6
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/SelectorFilterWriter.java
@@ -0,0 +1,64 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.client;
+
+import org.apache.qpid.amqp_1_0.codec.AbstractDescribedTypeWriter;
+import org.apache.qpid.amqp_1_0.codec.ValueWriter;
+import org.apache.qpid.amqp_1_0.type.UnsignedLong;
+
+public class SelectorFilterWriter extends
+  AbstractDescribedTypeWriter<SelectorFilter> {
+
+  private static final ValueWriter.Factory<SelectorFilter> FACTORY = new ValueWriter.Factory<SelectorFilter>() {
+
+    @Override
+    public ValueWriter<SelectorFilter> newInstance(ValueWriter.Registry registry) {
+      return new SelectorFilterWriter(registry);
+    }
+  };
+
+  private SelectorFilter value;
+
+  public SelectorFilterWriter(final ValueWriter.Registry registry) {
+    super(registry);
+  }
+
+  public static void register(ValueWriter.Registry registry) {
+    registry.register(SelectorFilter.class, FACTORY);
+  }
+
+  @Override
+  protected void onSetValue(final SelectorFilter value) {
+    this.value = value;
+  }
+
+  @Override
+  protected void clear() {
+    value = null;
+  }
+
+  @Override
+  protected Object getDescriptor() {
+    return UnsignedLong.valueOf(0x00000137000000AL);
+  }
+
+  @Override
+  protected ValueWriter<String> createDescribedWriter() {
+    return getRegistry().getValueWriter(value.getValue());
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/AtMostOnceEventCount.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/AtMostOnceEventCount.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/AtMostOnceEventCount.java
new file mode 100755
index 0000000..b0dd33a
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/AtMostOnceEventCount.java
@@ -0,0 +1,54 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples;
+
+import java.io.Serializable;
+
+import org.apache.storm.eventhubs.spout.EventHubSpout;
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+import org.apache.storm.eventhubs.spout.IEventHubReceiver;
+import org.apache.storm.eventhubs.spout.IPartitionManager;
+import org.apache.storm.eventhubs.spout.IPartitionManagerFactory;
+import org.apache.storm.eventhubs.spout.IStateStore;
+import org.apache.storm.eventhubs.spout.SimplePartitionManager;
+
+public class AtMostOnceEventCount extends EventCount implements Serializable {
+  @Override
+  protected EventHubSpout createEventHubSpout() {
+    IPartitionManagerFactory pmFactory = new IPartitionManagerFactory() {
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public IPartitionManager create(EventHubSpoutConfig spoutConfig,
+          String partitionId, IStateStore stateStore,
+          IEventHubReceiver receiver) {
+        return new SimplePartitionManager(spoutConfig, partitionId,
+            stateStore, receiver);
+      }
+    };
+    EventHubSpout eventHubSpout = new EventHubSpout(
+        spoutConfig, null, pmFactory, null);
+    return eventHubSpout;
+  }
+  
+  public static void main(String[] args) throws Exception {
+    AtMostOnceEventCount scenario = new AtMostOnceEventCount();
+
+    scenario.runScenario(args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventCount.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventCount.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventCount.java
new file mode 100755
index 0000000..dd53e42
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventCount.java
@@ -0,0 +1,155 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples;
+
+import backtype.storm.StormSubmitter;
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.TopologyBuilder;
+
+import org.apache.storm.eventhubs.samples.bolt.GlobalCountBolt;
+import org.apache.storm.eventhubs.samples.bolt.PartialCountBolt;
+import org.apache.storm.eventhubs.spout.EventHubSpout;
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+
+import java.io.FileReader;
+import java.util.Properties;
+
+/**
+ * The basic scenario topology that uses EventHubSpout with PartialCountBolt
+ * and GlobalCountBolt.
+ * To submit this topology:
+ * storm jar {jarfile} {classname} {topologyname} {spoutconffile}
+ */
+public class EventCount {
+  protected EventHubSpoutConfig spoutConfig;
+  protected int numWorkers;
+  
+  public EventCount() {
+  }
+  
+  protected void readEHConfig(String[] args) throws Exception {
+	  Properties properties = new Properties();
+    if(args.length > 1) {
+      properties.load(new FileReader(args[1]));
+    }
+    else {
+      properties.load(EventCount.class.getClassLoader().getResourceAsStream(
+          "Config.properties"));
+    }
+
+    String username = properties.getProperty("eventhubspout.username");
+    String password = properties.getProperty("eventhubspout.password");
+    String namespaceName = properties.getProperty("eventhubspout.namespace");
+    String entityPath = properties.getProperty("eventhubspout.entitypath");
+    String targetFqnAddress = properties.getProperty("eventhubspout.targetfqnaddress");
+    String zkEndpointAddress = properties.getProperty("zookeeper.connectionstring");
+    int partitionCount = Integer.parseInt(properties.getProperty("eventhubspout.partitions.count"));
+    int checkpointIntervalInSeconds = Integer.parseInt(properties.getProperty("eventhubspout.checkpoint.interval"));
+    int receiverCredits = Integer.parseInt(properties.getProperty("eventhub.receiver.credits"));
+    String maxPendingMsgsPerPartitionStr = properties.getProperty("eventhubspout.max.pending.messages.per.partition");
+    if(maxPendingMsgsPerPartitionStr == null) {
+      maxPendingMsgsPerPartitionStr = "1024";
+    }
+    int maxPendingMsgsPerPartition = Integer.parseInt(maxPendingMsgsPerPartitionStr);
+    String enqueueTimeDiffStr = properties.getProperty("eventhub.receiver.filter.timediff");
+    if(enqueueTimeDiffStr == null) {
+      enqueueTimeDiffStr = "0";
+    }
+    int enqueueTimeDiff = Integer.parseInt(enqueueTimeDiffStr);
+    long enqueueTimeFilter = 0;
+    if(enqueueTimeDiff != 0) {
+      enqueueTimeFilter = System.currentTimeMillis() - enqueueTimeDiff*1000;
+    }
+    
+    System.out.println("Eventhub spout config: ");
+    System.out.println("  partition count: " + partitionCount);
+    System.out.println("  checkpoint interval: " + checkpointIntervalInSeconds);
+    System.out.println("  receiver credits: " + receiverCredits);
+    spoutConfig = new EventHubSpoutConfig(username, password,
+      namespaceName, entityPath, partitionCount, zkEndpointAddress,
+      checkpointIntervalInSeconds, receiverCredits, maxPendingMsgsPerPartition, enqueueTimeFilter);
+
+    if(targetFqnAddress != null)
+    {
+      spoutConfig.setTargetAddress(targetFqnAddress);      
+    }
+
+    //set the number of workers to be the same as partition number.
+    //the idea is to have a spout and a partial count bolt co-exist in one
+    //worker to avoid shuffling messages across workers in storm cluster.
+    numWorkers = spoutConfig.getPartitionCount();
+    
+    if(args.length > 0) {
+      //set topology name so that sample Trident topology can use it as stream name.
+      spoutConfig.setTopologyName(args[0]);
+    }
+	}
+  
+  protected EventHubSpout createEventHubSpout() {
+    EventHubSpout eventHubSpout = new EventHubSpout(spoutConfig);
+    return eventHubSpout;
+  }
+	
+  protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
+    TopologyBuilder topologyBuilder = new TopologyBuilder();
+
+    topologyBuilder.setSpout("EventHubsSpout", eventHubSpout, spoutConfig.getPartitionCount())
+      .setNumTasks(spoutConfig.getPartitionCount());
+    topologyBuilder.setBolt("PartialCountBolt", new PartialCountBolt(), spoutConfig.getPartitionCount())
+      .localOrShuffleGrouping("EventHubsSpout").setNumTasks(spoutConfig.getPartitionCount());
+    topologyBuilder.setBolt("GlobalCountBolt", new GlobalCountBolt(), 1)
+      .globalGrouping("PartialCountBolt").setNumTasks(1);
+    return topologyBuilder.createTopology();
+  }
+	
+  protected void submitTopology(String[] args, StormTopology topology) throws Exception {
+	  Config config = new Config();
+    config.setDebug(false);
+    //Enable metrics
+    config.registerMetricsConsumer(backtype.storm.metric.LoggingMetricsConsumer.class, 1);
+
+    
+	  if (args != null && args.length > 0) {
+      config.setNumWorkers(numWorkers);
+      StormSubmitter.submitTopology(args[0], config, topology);
+    } else {
+      config.setMaxTaskParallelism(2);
+
+      LocalCluster localCluster = new LocalCluster();
+      localCluster.submitTopology("test", config, topology);
+
+      Thread.sleep(5000000);
+
+      localCluster.shutdown();
+    }
+	}
+  
+  protected void runScenario(String[] args) throws Exception{
+    readEHConfig(args);
+    EventHubSpout eventHubSpout = createEventHubSpout();
+    StormTopology topology = buildTopology(eventHubSpout);
+    submitTopology(args, topology);
+  }
+
+  public static void main(String[] args) throws Exception {
+    EventCount scenario = new EventCount();
+    scenario.runScenario(args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventHubLoop.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
new file mode 100755
index 0000000..cae0573
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
@@ -0,0 +1,51 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.TopologyBuilder;
+
+import org.apache.storm.eventhubs.bolt.EventHubBolt;
+import org.apache.storm.eventhubs.spout.EventHubSpout;
+
+/**
+ * A sample topology that loops message back to EventHub
+ */
+public class EventHubLoop extends EventCount {
+
+  @Override
+  protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
+    TopologyBuilder topologyBuilder = new TopologyBuilder();
+
+    topologyBuilder.setSpout("EventHubsSpout", eventHubSpout, spoutConfig.getPartitionCount())
+      .setNumTasks(spoutConfig.getPartitionCount());
+    
+    EventHubBolt eventHubBolt = new EventHubBolt(spoutConfig.getConnectionString(),
+        spoutConfig.getEntityPath());
+    //For every spout, let's create multiple bolts because send is much slower
+    int boltTasks = spoutConfig.getPartitionCount() * 50;
+    topologyBuilder.setBolt("EventHubsBolt", eventHubBolt, boltTasks)
+      .localOrShuffleGrouping("EventHubsSpout").setNumTasks(boltTasks);
+    return topologyBuilder.createTopology();
+  }
+  
+  public static void main(String[] args) throws Exception {
+    EventHubLoop scenario = new EventHubLoop();
+    scenario.runScenario(args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java
new file mode 100755
index 0000000..f4fe127
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java
@@ -0,0 +1,53 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples;
+
+import storm.trident.TridentState;
+import storm.trident.TridentTopology;
+import storm.trident.operation.builtin.Count;
+import storm.trident.operation.builtin.Sum;
+import storm.trident.testing.MemoryMapState;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.tuple.Fields;
+
+import org.apache.storm.eventhubs.samples.TransactionalTridentEventCount.LoggingFilter;
+import org.apache.storm.eventhubs.spout.EventHubSpout;
+import org.apache.storm.eventhubs.trident.OpaqueTridentEventHubSpout;
+
+/**
+ * A simple Trident topology uses OpaqueTridentEventHubSpout
+ */
+public class OpaqueTridentEventCount extends EventCount {
+  @Override
+  protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
+    TridentTopology topology = new TridentTopology();
+    
+    OpaqueTridentEventHubSpout spout = new OpaqueTridentEventHubSpout(spoutConfig);
+    TridentState state = topology.newStream("stream-" + spoutConfig.getTopologyName(), spout)
+        .parallelismHint(spoutConfig.getPartitionCount())
+        .aggregate(new Count(), new Fields("partial-count"))
+        .persistentAggregate(new MemoryMapState.Factory(), new Fields("partial-count"), new Sum(), new Fields("count"));
+    state.newValuesStream().each(new Fields("count"), new LoggingFilter("got count: ", 10000));
+    return topology.build();
+  }
+  
+  public static void main(String[] args) throws Exception {
+    OpaqueTridentEventCount scenario = new OpaqueTridentEventCount();
+    scenario.runScenario(args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java
new file mode 100755
index 0000000..1e7628b
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java
@@ -0,0 +1,81 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.tuple.Fields;
+
+import org.apache.storm.eventhubs.spout.EventHubSpout;
+import org.apache.storm.eventhubs.trident.TransactionalTridentEventHubSpout;
+
+import storm.trident.TridentState;
+import storm.trident.TridentTopology;
+import storm.trident.operation.BaseFilter;
+import storm.trident.operation.builtin.Count;
+import storm.trident.operation.builtin.Sum;
+import storm.trident.testing.MemoryMapState;
+import storm.trident.tuple.TridentTuple;
+
+/**
+ * A simple Trident topology uses TransactionalTridentEventHubSpout
+ */
+public class TransactionalTridentEventCount extends EventCount {
+  public static class LoggingFilter extends BaseFilter {
+    private static final long serialVersionUID = 1L;
+    private static final Logger logger = LoggerFactory.getLogger(LoggingFilter.class);
+    private final String prefix;
+    private final long logIntervalMs;
+    private long lastTime;
+    public LoggingFilter(String prefix, int logIntervalMs) {
+      this.prefix = prefix;
+      this.logIntervalMs = logIntervalMs;
+      lastTime = System.nanoTime();
+    }
+
+    @Override
+    public boolean isKeep(TridentTuple tuple) {
+      long now = System.nanoTime();
+      if(logIntervalMs < (now - lastTime) / 1000000) {
+        logger.info(prefix + tuple.toString());
+        lastTime = now;
+      }
+      return false;
+    }
+  }
+  
+  @Override
+  protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
+    TridentTopology topology = new TridentTopology();
+    
+    TransactionalTridentEventHubSpout spout = new TransactionalTridentEventHubSpout(spoutConfig);
+    TridentState state = topology.newStream("stream-" + spoutConfig.getTopologyName(), spout)
+        .parallelismHint(spoutConfig.getPartitionCount())
+        .aggregate(new Count(), new Fields("partial-count"))
+        .persistentAggregate(new MemoryMapState.Factory(), new Fields("partial-count"), new Sum(), new Fields("count"));
+    state.newValuesStream().each(new Fields("count"), new LoggingFilter("got count: ", 10000));
+    return topology.build();
+  }
+  
+  public static void main(String[] args) throws Exception {
+    TransactionalTridentEventCount scenario = new TransactionalTridentEventCount();
+    scenario.runScenario(args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java
new file mode 100755
index 0000000..16b34a6
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java
@@ -0,0 +1,83 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples.bolt;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.Config;
+import backtype.storm.metric.api.IMetric;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.BasicOutputCollector;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseBasicBolt;
+import backtype.storm.tuple.Tuple;
+
+/**
+ * Globally count number of messages
+ */
+public class GlobalCountBolt extends BaseBasicBolt {
+  private static final long serialVersionUID = 1L;
+  private static final Logger logger = LoggerFactory
+      .getLogger(GlobalCountBolt.class);
+  private long globalCount;
+  private long globalCountDiff;
+  private long lastMetricsTime;
+  private long throughput;
+  
+  @Override
+  public void prepare(Map config, TopologyContext context) {
+    globalCount = 0;
+    globalCountDiff = 0;
+    lastMetricsTime = System.nanoTime();
+    context.registerMetric("GlobalMessageCount", new IMetric() {
+      @Override
+      public Object getValueAndReset() {
+        long now = System.nanoTime();
+        long millis = (now - lastMetricsTime) / 1000000;
+        throughput = globalCountDiff / millis * 1000;
+        Map values = new HashMap();
+        values.put("global_count", globalCount);
+        values.put("throughput", throughput);
+        lastMetricsTime = now;
+        globalCountDiff = 0;
+        return values;
+      }
+  }, (Integer)config.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS));
+  }
+
+  @Override
+  public void execute(Tuple tuple, BasicOutputCollector collector) {
+    int partial = (Integer)tuple.getValueByField("partial_count");
+    globalCount += partial;
+    globalCountDiff += partial;
+    if((globalCountDiff == partial) && (globalCount != globalCountDiff)) {
+      //metrics has just been collected, let's also log it
+      logger.info("Current throughput (messages/second): " + throughput);
+    }
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java
new file mode 100755
index 0000000..21f1ab4
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java
@@ -0,0 +1,63 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples.bolt;
+
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.BasicOutputCollector;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseBasicBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+/**
+ * Partially count number of messages from EventHubs
+ */
+public class PartialCountBolt extends BaseBasicBolt {
+  private static final long serialVersionUID = 1L;
+  private static final Logger logger = LoggerFactory
+      .getLogger(PartialCountBolt.class);
+  private static final int PartialCountBatchSize = 1000; 
+  
+  private int partialCount;
+  
+  @Override
+  public void prepare(Map stormConf, TopologyContext context) {
+    partialCount = 0;
+  }
+  
+  @Override
+  public void execute(Tuple tuple, BasicOutputCollector collector) {
+    partialCount++;
+    if(partialCount == PartialCountBatchSize) {
+      collector.emit(new Values(PartialCountBatchSize));
+      partialCount = 0;
+    }
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    declarer.declare(new Fields("partial_count"));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventData.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventData.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventData.java
new file mode 100755
index 0000000..e5834b4
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventData.java
@@ -0,0 +1,48 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import org.apache.qpid.amqp_1_0.client.Message;
+
+public class EventData implements Comparable<EventData> {
+  private final Message message;
+  private final MessageId messageId;
+
+  public EventData(Message message, MessageId messageId) {
+    this.message = message;
+    this.messageId = messageId;
+  }
+
+  public static EventData create(Message message, MessageId messageId) {
+    return new EventData(message, messageId);
+  }
+
+  public Message getMessage() {
+    return this.message;
+  }
+
+  public MessageId getMessageId() {
+    return this.messageId;
+  }
+
+  @Override
+  public int compareTo(EventData ed) {
+    return messageId.getSequenceNumber().
+        compareTo(ed.getMessageId().getSequenceNumber());
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
new file mode 100755
index 0000000..d01050d
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
@@ -0,0 +1,55 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import backtype.storm.tuple.Fields;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.qpid.amqp_1_0.client.Message;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
+import org.apache.qpid.amqp_1_0.type.messaging.Data;
+
+public class EventDataScheme implements IEventDataScheme {
+
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  public List<Object> deserialize(Message message) {
+    List<Object> fieldContents = new ArrayList<Object>();
+
+    for (Section section : message.getPayload()) {
+      if (section instanceof Data) {
+        Data data = (Data) section;
+        fieldContents.add(new String(data.getValue().getArray()));
+        return fieldContents;
+      } else if (section instanceof AmqpValue) {
+        AmqpValue amqpValue = (AmqpValue) section;
+        fieldContents.add(amqpValue.getValue().toString());
+        return fieldContents;
+      }
+    }
+
+    return null;
+  }
+
+  @Override
+  public Fields getOutputFields() {
+    return new Fields(FieldConstants.Message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eff6cfa4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverFilter.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverFilter.java
new file mode 100755
index 0000000..e80cd25
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverFilter.java
@@ -0,0 +1,56 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+
+public class EventHubReceiverFilter implements IEventHubReceiverFilter {
+  String offset = null;
+  long enqueueTime = 0;
+  public EventHubReceiverFilter() {
+    
+  }
+  
+  public EventHubReceiverFilter(String offset) {
+    //Creates offset only filter
+    this.offset = offset;
+  }
+  
+  public EventHubReceiverFilter(long enqueueTime) {
+    //Creates enqueue time only filter
+    this.enqueueTime = enqueueTime;
+  }
+  
+  public void setOffset(String offset) {
+    this.offset = offset;
+  }
+  
+  public void setEnqueueTime(long enqueueTime) {
+    this.enqueueTime = enqueueTime;
+  }
+  
+  @Override
+  public String getOffset() {
+    return offset;
+  }
+
+  @Override
+  public long getEnqueueTime() {
+    return enqueueTime;
+  }
+
+}


[4/4] storm git commit: STORM-583: incorporate storm-eventhubs into build (this closes #336)

Posted by pt...@apache.org.
STORM-583: incorporate storm-eventhubs into build (this closes #336)


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/38638474
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/38638474
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/38638474

Branch: refs/heads/master
Commit: 38638474ef6b045f9e61942b2fd882a8ed4ed8e9
Parents: eff6cfa
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri Apr 24 16:29:04 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri Apr 24 16:29:04 2015 -0400

----------------------------------------------------------------------
 CHANGELOG.md                                   |  1 +
 external/storm-eventhubs/README.md             |  4 ++++
 external/storm-eventhubs/pom.xml               |  4 ++--
 pom.xml                                        |  1 +
 storm-dist/binary/src/main/assembly/binary.xml | 25 +++++++++++++++++++--
 5 files changed, 31 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/38638474/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index fd140ea..23920e3 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.11.0
+ * STORM-583: Add Microsoft Azure Event Hub spout implementations
  * STORM-712: Storm daemons shutdown if OutOfMemoryError occurs in any thread
  * STORM-730: remove extra curly brace
  * STORM-735: [storm-redis] Upgrade Jedis to 2.7.0

http://git-wip-us.apache.org/repos/asf/storm/blob/38638474/external/storm-eventhubs/README.md
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/README.md b/external/storm-eventhubs/README.md
index ea8076b..1586ca8 100755
--- a/external/storm-eventhubs/README.md
+++ b/external/storm-eventhubs/README.md
@@ -35,3 +35,7 @@ If you want to send messages to all partitions, use "-1" as partitionId.
 ### Windows Azure Eventhubs ###
 	http://azure.microsoft.com/en-us/services/event-hubs/
 
+## Committer Sponsors
+
+ * P. Taylor Goetz ([ptgoetz@apache.org](mailto:ptgoetz@apache.org))
+

http://git-wip-us.apache.org/repos/asf/storm/blob/38638474/external/storm-eventhubs/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/pom.xml b/external/storm-eventhubs/pom.xml
index bc11dd1..5ed65c7 100755
--- a/external/storm-eventhubs/pom.xml
+++ b/external/storm-eventhubs/pom.xml
@@ -21,12 +21,12 @@
     <parent>
         <artifactId>storm</artifactId>
         <groupId>org.apache.storm</groupId>
-        <version>0.10.0-SNAPSHOT</version>
+        <version>0.11.0-SNAPSHOT</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
     
     <artifactId>storm-eventhubs</artifactId>
-    <version>0.10.0-SNAPSHOT</version>
+    <version>0.11.0-SNAPSHOT</version>
     <packaging>jar</packaging>
     <name>storm-eventhubs</name>
     <description>EventHubs Storm Spout</description>

http://git-wip-us.apache.org/repos/asf/storm/blob/38638474/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 95aa46d..2e0c898 100644
--- a/pom.xml
+++ b/pom.xml
@@ -168,6 +168,7 @@
         <module>external/storm-hive</module>
         <module>external/storm-jdbc</module>
         <module>external/storm-redis</module>
+        <module>external/storm-eventhubs</module>
     </modules>
 
     <scm>

http://git-wip-us.apache.org/repos/asf/storm/blob/38638474/storm-dist/binary/src/main/assembly/binary.xml
----------------------------------------------------------------------
diff --git a/storm-dist/binary/src/main/assembly/binary.xml b/storm-dist/binary/src/main/assembly/binary.xml
index 61a8027..fef56bb 100644
--- a/storm-dist/binary/src/main/assembly/binary.xml
+++ b/storm-dist/binary/src/main/assembly/binary.xml
@@ -136,7 +136,28 @@
             <includes>
                 <include>README.*</include>
             </includes>
-		</fileSet>
+        </fileSet>
+        <fileSet>
+            <directory>${project.basedir}/../../external/storm-eventhubs/target</directory>
+            <outputDirectory>external/storm-eventhubs</outputDirectory>
+            <includes>
+                <include>storm*jar</include>
+            </includes>
+        </fileSet>
+        <fileSet>
+            <directory>${project.basedir}/../../external/storm-eventhubs</directory>
+            <outputDirectory>external/storm-eventhubs</outputDirectory>
+            <includes>
+                <include>README.*</include>
+            </includes>
+        </fileSet>
+        <fileSet>
+            <directory>${project.basedir}/../../external/storm-eventhubs/src/main/resources</directory>
+            <outputDirectory>external/storm-eventhubs</outputDirectory>
+            <includes>
+                <include>Config.properties</include>
+            </includes>
+        </fileSet>
 
         <!-- $STORM_HOME/extlib -->
         <fileSet>
@@ -160,7 +181,7 @@
     <files>
         <!-- EXAMPLES -->
         <file>
-            <source>${project.basedir}/../../examples/storm-starter/target/storm-starter-${project.version}-jar-with-dependencies.jar</source>
+            <source>${project.basedir}/../../examples/storm-starter/target/storm-starter-${project.version}.jar</source>
             <outputDirectory>/examples/storm-starter/</outputDirectory>
             <destName>storm-starter-topologies-${project.version}.jar</destName>
         </file>