You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2017/04/18 21:26:34 UTC

[1/3] storm git commit: Storm 2371: Replace existing AMQP eventhub client with the lastest one from Microsoft eventhubs

Repository: storm
Updated Branches:
  refs/heads/master f794d7230 -> 2307b8023


Storm 2371: Replace existing AMQP eventhub client with the lastest one from Microsoft eventhubs


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fdb9136b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fdb9136b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fdb9136b

Branch: refs/heads/master
Commit: fdb9136bc69f28175f4dc3bee9297352440f2f13
Parents: 1850dd5
Author: Ranjan Banerjee <ra...@microsoft.com>
Authored: Tue Feb 21 13:51:16 2017 -0800
Committer: Ranjan Banerjee <ra...@microsoft.com>
Committed: Sun Apr 9 13:06:00 2017 -0700

----------------------------------------------------------------------
 external/storm-eventhubs/pom.xml                |  29 +---
 .../storm/eventhubs/bolt/EventHubBolt.java      |  82 ++++++++---
 .../eventhubs/bolt/EventHubBoltConfig.java      |  10 +-
 .../eventhubs/spout/BinaryEventDataScheme.java  |  43 +++---
 .../apache/storm/eventhubs/spout/EventData.java |  48 -------
 .../storm/eventhubs/spout/EventDataScheme.java  |  51 +++----
 .../storm/eventhubs/spout/EventDataWrap.java    |  48 +++++++
 .../eventhubs/spout/EventHubException.java      |  41 ++++++
 .../storm/eventhubs/spout/EventHubFilter.java   |  46 ++++++
 .../eventhubs/spout/EventHubReceiverImpl.java   | 141 +++++++++++--------
 .../storm/eventhubs/spout/EventHubSpout.java    |  24 ++--
 .../eventhubs/spout/EventHubSpoutConfig.java    |  18 ++-
 .../eventhubs/spout/EventHubSpoutException.java |  37 -----
 .../storm/eventhubs/spout/FieldConstants.java   |   1 +
 .../storm/eventhubs/spout/IEventDataScheme.java |   7 +-
 .../storm/eventhubs/spout/IEventFilter.java     |  28 ++++
 .../eventhubs/spout/IEventHubReceiver.java      |  11 +-
 .../eventhubs/spout/IPartitionManager.java      |   2 +-
 .../storm/eventhubs/spout/PartitionManager.java |  28 ++--
 .../spout/SerializeDeserializeUtil.java         |  34 +++++
 .../eventhubs/spout/SimplePartitionManager.java |  32 ++---
 .../spout/StaticPartitionCoordinator.java       |   1 -
 .../eventhubs/spout/StringEventDataScheme.java  |  43 +++---
 .../trident/ITridentPartitionManager.java       |   6 +-
 .../TransactionalTridentEventHubEmitter.java    |  36 ++---
 .../trident/TridentPartitionManager.java        |  29 ++--
 .../eventhubs/spout/EventHubReceiverMock.java   |  36 ++---
 .../spout/PartitionManagerCallerMock.java       |   6 +-
 .../storm/eventhubs/spout/TestEventData.java    |   8 +-
 .../eventhubs/spout/TestEventHubSpout.java      |   7 +-
 pom.xml                                         |   2 +-
 31 files changed, 531 insertions(+), 404 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/pom.xml b/external/storm-eventhubs/pom.xml
index 7fe6071..12a0b2f 100755
--- a/external/storm-eventhubs/pom.xml
+++ b/external/storm-eventhubs/pom.xml
@@ -52,9 +52,9 @@
     </build>
     <dependencies>
         <dependency>
-            <groupId>com.microsoft.eventhubs.client</groupId>
-            <artifactId>eventhubs-client</artifactId>
-            <version>${eventhubs.client.version}</version>
+            <groupId>com.microsoft.azure</groupId>
+            <artifactId>azure-eventhubs</artifactId>
+            <version>${azure-eventhubs.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.storm</groupId>
@@ -86,26 +86,5 @@
                 </exclusion>
             </exclusions>
         </dependency>
-        <dependency>
-            <groupId>org.apache.qpid</groupId>
-            <artifactId>qpid-amqp-1-0-client</artifactId>
-            <version>${qpid.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.qpid</groupId>
-            <artifactId>qpid-amqp-1-0-client-jms</artifactId>
-            <version>${qpid.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.qpid</groupId>
-            <artifactId>qpid-amqp-1-0-common</artifactId>
-            <version>${qpid.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <version>4.11</version>
-            <scope>test</scope>
-        </dependency>
-    </dependencies> 
+    </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
index 604c62d..7503db3 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
@@ -17,33 +17,37 @@
  *******************************************************************************/
 package org.apache.storm.eventhubs.bolt;
 
-import java.util.Map;
-
-import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
-import org.apache.storm.utils.TupleUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.microsoft.eventhubs.client.EventHubClient;
-import com.microsoft.eventhubs.client.EventHubException;
-import com.microsoft.eventhubs.client.EventHubSender;
 
+import com.microsoft.azure.eventhubs.EventData;
+import com.microsoft.azure.eventhubs.EventHubClient;
+import com.microsoft.azure.eventhubs.PartitionSender;
+import com.microsoft.azure.servicebus.ServiceBusException;
+import org.apache.storm.eventhubs.spout.EventHubException;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.base.BaseRichBolt;
 import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
 
 /**
  * A bolt that writes event message to EventHub.
  */
-public class EventHubBolt extends BaseTickTupleAwareRichBolt {
+public class EventHubBolt extends BaseRichBolt {
 	private static final long serialVersionUID = 1L;
 	private static final Logger logger = LoggerFactory
 			.getLogger(EventHubBolt.class);
 
 	protected OutputCollector collector;
-	protected EventHubSender sender;
+	protected PartitionSender sender;
+	protected EventHubClient ehClient;
 	protected EventHubBoltConfig boltConfig;
 
 	public EventHubBolt(String connectionString, String entityPath) {
@@ -72,10 +76,10 @@ public class EventHubBolt extends BaseTickTupleAwareRichBolt {
 		logger.info("creating sender: " + boltConfig.getConnectionString()
 				+ ", " + boltConfig.getEntityPath() + ", " + myPartitionId);
 		try {
-			EventHubClient eventHubClient = EventHubClient.create(
-					boltConfig.getConnectionString(),
-					boltConfig.getEntityPath());
-			sender = eventHubClient.createPartitionSender(myPartitionId);
+			ehClient = EventHubClient.createFromConnectionStringSync(boltConfig.getConnectionString());
+			if (boltConfig.getPartitionMode()) {
+				sender = ehClient.createPartitionSenderSync(Integer.toString(context.getThisTaskIndex()));
+			}
 		} catch (Exception ex) {
 			collector.reportError(ex);
 			throw new RuntimeException(ex);
@@ -84,13 +88,53 @@ public class EventHubBolt extends BaseTickTupleAwareRichBolt {
 	}
 
 	@Override
-	protected void process(Tuple tuple) {
+	public void execute(Tuple tuple) {
 		try {
-			sender.send(boltConfig.getEventDataFormat().serialize(tuple));
+			EventData sendEvent = new EventData(boltConfig.getEventDataFormat().serialize(tuple));
+			if (boltConfig.getPartitionMode() && sender!=null) {
+				sender.sendSync(sendEvent);
+			}
+			else if (boltConfig.getPartitionMode() && sender==null) {
+				throw new EventHubException("Sender is null");
+			}
+			else if (!boltConfig.getPartitionMode() && ehClient!=null) {
+				ehClient.sendSync(sendEvent);
+			}
+			else if (!boltConfig.getPartitionMode() && ehClient==null) {
+				throw new EventHubException("ehclient is null");
+			}
 			collector.ack(tuple);
-		} catch (EventHubException ex) {
+		} catch (EventHubException ex ) {
 			collector.reportError(ex);
 			collector.fail(tuple);
+		} catch (ServiceBusException e) {
+			collector.reportError(e);
+			collector.fail(tuple);
+		}
+	}
+
+	@Override
+	public void cleanup() {
+		if(sender != null) {
+			try {
+				sender.close().whenComplete((voidargs,error)->{
+					try{
+						if(error!=null){
+							logger.error("Exception during sender cleanup phase"+error.toString());
+						}
+						ehClient.closeSync();
+					}catch (Exception e){
+						logger.error("Exception during ehclient cleanup phase"+e.toString());
+					}
+				}).get();
+			} catch (InterruptedException e) {
+				logger.error("Exception occured during cleanup phase"+e.toString());
+			} catch (ExecutionException e) {
+				logger.error("Exception occured during cleanup phase"+e.toString());
+			}
+			logger.info("Eventhub Bolt cleaned up");
+			sender = null;
+			ehClient =  null;
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java
index 10b4e39..fe2d989 100644
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java
@@ -17,10 +17,12 @@
  *******************************************************************************/
 package org.apache.storm.eventhubs.bolt;
 
+import com.microsoft.azure.servicebus.ConnectionStringBuilder;
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+
 import java.io.Serializable;
 
-import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
-import com.microsoft.eventhubs.client.ConnectionStringBuilder;
+import java.io.Serializable;
 
 /*
  * EventHubs bolt configurations
@@ -81,8 +83,8 @@ public class EventHubBoltConfig implements Serializable {
   public EventHubBoltConfig(String userName, String password, String namespace,
       String targetFqnAddress, String entityPath, boolean partitionMode,
       IEventDataFormat dataFormat) {
-    this.connectionString = new ConnectionStringBuilder(userName, password,
-    		namespace, targetFqnAddress).getConnectionString();
+    this.connectionString = new ConnectionStringBuilder(namespace,entityPath,
+            userName,password).toString();
     this.entityPath = entityPath;
     this.partitionMode = partitionMode;
     this.dataFormat = dataFormat;

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java
index 7b0d7e5..99d764c 100644
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java
@@ -17,17 +17,17 @@
  *******************************************************************************/
 package org.apache.storm.eventhubs.spout;
 
-import org.apache.qpid.amqp_1_0.client.Message;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
-import org.apache.qpid.amqp_1_0.type.messaging.Data;
+import com.microsoft.azure.eventhubs.EventData;
 import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+
 /**
  * An Event Data Scheme which deserializes message payload into the raw bytes.
  *
@@ -37,25 +37,30 @@ import java.util.Map;
  */
 public class BinaryEventDataScheme implements IEventDataScheme {
 
+	private static final Logger logger = LoggerFactory.getLogger(BinaryEventDataScheme.class);
 	@Override
-	public List<Object> deserialize(Message message) {
+	public List<Object> deserialize(EventData eventData){
 		final List<Object> fieldContents = new ArrayList<Object>();
-
-		Map metaDataMap = new HashMap();
-		byte[] messageData = new byte[0];
-
-		for (Section section : message.getPayload()) {
-			if (section instanceof Data) {
-				Data data = (Data) section;
-				messageData = data.getValue().getArray();
-			} else if (section instanceof ApplicationProperties) {
-				final ApplicationProperties applicationProperties = (ApplicationProperties) section;
-				metaDataMap = applicationProperties.getValue();
+		byte [] messageData = null;
+		if (eventData.getBytes() != null) {
+			messageData = eventData.getBytes();
+		}
+		else if (eventData.getObject()!=null) {
+			try {
+				messageData = SerializeDeserializeUtil.serialize(eventData.getObject());
+			} catch (IOException e) {
+				logger.error("Failed to serialize EventData payload class"
+						+ eventData.getObject().getClass());
+				logger.error("Exception encountered while serializing EventData payload is"
+						+ e.toString());
+				throw new RuntimeException(e);
 			}
 		}
-
+		Map metaDataMap = eventData.getProperties().size() > 0 ? eventData.getProperties() : null;
 		fieldContents.add(messageData);
-		fieldContents.add(metaDataMap);
+		if ( metaDataMap != null ) {
+			fieldContents.add(metaDataMap);
+		}
 		return fieldContents;
 	}
 

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventData.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventData.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventData.java
deleted file mode 100755
index e5834b4..0000000
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventData.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.spout;
-
-import org.apache.qpid.amqp_1_0.client.Message;
-
-public class EventData implements Comparable<EventData> {
-  private final Message message;
-  private final MessageId messageId;
-
-  public EventData(Message message, MessageId messageId) {
-    this.message = message;
-    this.messageId = messageId;
-  }
-
-  public static EventData create(Message message, MessageId messageId) {
-    return new EventData(message, messageId);
-  }
-
-  public Message getMessage() {
-    return this.message;
-  }
-
-  public MessageId getMessageId() {
-    return this.messageId;
-  }
-
-  @Override
-  public int compareTo(EventData ed) {
-    return messageId.getSequenceNumber().
-        compareTo(ed.getMessageId().getSequenceNumber());
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
index 90cad0a..a537975 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
@@ -17,18 +17,15 @@
  *******************************************************************************/
 package org.apache.storm.eventhubs.spout;
 
+import com.microsoft.azure.eventhubs.EventData;
 import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.qpid.amqp_1_0.client.Message;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
-import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
-import org.apache.qpid.amqp_1_0.type.messaging.Data;
-
 /**
  * An Event Data Scheme which deserializes message payload into the Strings. No
  * encoding is assumed. The receiver will need to handle parsing of the string
@@ -44,29 +41,35 @@ import org.apache.qpid.amqp_1_0.type.messaging.Data;
 public class EventDataScheme implements IEventDataScheme {
 
 	private static final long serialVersionUID = 1L;
-
+	private static final Logger logger = LoggerFactory.getLogger(EventDataScheme.class);
 	@Override
-	public List<Object> deserialize(Message message) {
+	public List<Object> deserialize(EventData eventData) {
 		final List<Object> fieldContents = new ArrayList<Object>();
-
-		Map metaDataMap = new HashMap();
 		String messageData = "";
-
-		for (Section section : message.getPayload()) {
-			if (section instanceof Data) {
-				Data data = (Data) section;
-				messageData = new String(data.getValue().getArray());
-			} else if (section instanceof AmqpValue) {
-				AmqpValue amqpValue = (AmqpValue) section;
-				messageData = amqpValue.getValue().toString();
-			} else if (section instanceof ApplicationProperties) {
-				final ApplicationProperties applicationProperties = (ApplicationProperties) section;
-				metaDataMap = applicationProperties.getValue();
+		if (eventData.getBytes()!=null) {
+			messageData = new String(eventData.getBytes());
+		}
+		/*Will only serialize AMQPValue type*/
+		else if (eventData.getObject()!=null) {
+			try {
+				if (!(eventData.getObject() instanceof List)) {
+					messageData = eventData.getObject().toString();
+				} else {
+					throw new RuntimeException("Cannot serialize the given AMQP type");
+				}
+			} catch (RuntimeException e) {
+				logger.error("Failed to serialize EventData payload class"
+						+ eventData.getObject().getClass());
+				logger.error("Exception encountered while serializing EventData payload is"
+						+ e.toString());
+				throw e;
 			}
 		}
-
+		Map metaDataMap = eventData.getProperties().size() > 0 ? eventData.getProperties() : null;
 		fieldContents.add(messageData);
-		fieldContents.add(metaDataMap);
+		if ( metaDataMap != null ) {
+			fieldContents.add(metaDataMap);
+		}
 		return fieldContents;
 	}
 

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataWrap.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataWrap.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataWrap.java
new file mode 100644
index 0000000..5eeb4d2
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataWrap.java
@@ -0,0 +1,48 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import com.microsoft.azure.eventhubs.EventData;
+
+public class EventDataWrap implements Comparable<EventDataWrap> {
+  private final EventData eventData;
+  private final MessageId messageId;
+
+  public EventDataWrap(EventData eventdata, MessageId messageId) {
+    this.eventData = eventdata;
+    this.messageId = messageId;
+  }
+
+  public static EventDataWrap create(EventData eventData, MessageId messageId) {
+    return new EventDataWrap(eventData, messageId);
+  }
+
+  public EventData getEventData() {
+    return this.eventData;
+  }
+
+  public MessageId getMessageId() {
+    return this.messageId;
+  }
+
+  @Override
+  public int compareTo(EventDataWrap ed) {
+    return messageId.getSequenceNumber().
+        compareTo(ed.getMessageId().getSequenceNumber());
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubException.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubException.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubException.java
new file mode 100644
index 0000000..f81197a
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubException.java
@@ -0,0 +1,41 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+
+package org.apache.storm.eventhubs.spout;
+
+public class EventHubException extends Exception {
+
+    private static final long serialVersionUID = 1L;
+
+    public EventHubException() {
+        super();
+    }
+
+    public EventHubException(String message) {
+        super(message);
+    }
+
+    public EventHubException(Throwable cause) {
+        super(cause);
+    }
+
+    public EventHubException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubFilter.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubFilter.java
new file mode 100644
index 0000000..a375380
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubFilter.java
@@ -0,0 +1,46 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import java.time.Instant;
+
+public class EventHubFilter implements IEventFilter{
+
+    String offset = null;
+    Instant time = null;
+
+    public EventHubFilter(String offset){
+        this.offset = offset;
+        this.time = null;
+    }
+
+    public EventHubFilter(Instant time){
+        this.time = time;
+        this.offset = null;
+    }
+
+    @Override
+    public String getOffset(){
+        return offset;
+    }
+
+    @Override
+    public Instant getTime(){
+        return time;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
index 5f9acbd..4949024 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
@@ -17,38 +17,31 @@
  *******************************************************************************/
 package org.apache.storm.eventhubs.spout;
 
-import org.apache.qpid.amqp_1_0.client.Message;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import com.microsoft.azure.eventhubs.EventData;
+import com.microsoft.azure.eventhubs.EventHubClient;
+import com.microsoft.azure.eventhubs.PartitionReceiver;
+import com.microsoft.azure.servicebus.ServiceBusException;
 import org.apache.storm.metric.api.CountMetric;
 import org.apache.storm.metric.api.MeanReducer;
 import org.apache.storm.metric.api.ReducedMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import com.microsoft.eventhubs.client.Constants;
-import com.microsoft.eventhubs.client.EventHubException;
-import com.microsoft.eventhubs.client.IEventHubFilter;
-import com.microsoft.eventhubs.client.ResilientEventHubReceiver;
-
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
-
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.Symbol;
-import org.apache.qpid.amqp_1_0.type.messaging.MessageAnnotations;
+import java.util.concurrent.ExecutionException;
 
 public class EventHubReceiverImpl implements IEventHubReceiver {
   private static final Logger logger = LoggerFactory.getLogger(EventHubReceiverImpl.class);
-  private static final Symbol OffsetKey = Symbol.valueOf(Constants.OffsetKey);
-  private static final Symbol SequenceNumberKey = Symbol.valueOf(Constants.SequenceNumberKey);
 
   private final String connectionString;
   private final String entityName;
   private final String partitionId;
-  private final int defaultCredits;
   private final String consumerGroupName;
 
-  private ResilientEventHubReceiver receiver;
+  private PartitionReceiver receiver;
+  private EventHubClient ehClient;
   private ReducedMetric receiveApiLatencyMean;
   private CountMetric receiveApiCallCount;
   private CountMetric receiveMessageCount;
@@ -56,7 +49,6 @@ public class EventHubReceiverImpl implements IEventHubReceiver {
   public EventHubReceiverImpl(EventHubSpoutConfig config, String partitionId) {
     this.connectionString = config.getConnectionString();
     this.entityName = config.getEntityPath();
-    this.defaultCredits = config.getReceiverCredits();
     this.partitionId = partitionId;
     this.consumerGroupName = config.getConsumerGroupName();
     receiveApiLatencyMean = new ReducedMetric(new MeanReducer());
@@ -65,14 +57,41 @@ public class EventHubReceiverImpl implements IEventHubReceiver {
   }
 
   @Override
-  public void open(IEventHubFilter filter) throws EventHubException {
-    logger.info("creating eventhub receiver: partitionId=" + partitionId + 
-    		", filterString=" + filter.getFilterString());
+  public void open(IEventFilter filter) throws EventHubException {
+    logger.info("creating eventhub receiver: partitionId=" + partitionId +
+            ", filter=" + filter.getOffset() != null ?
+            filter.getOffset() : Long.toString(filter.getTime().toEpochMilli()));
     long start = System.currentTimeMillis();
-    receiver = new ResilientEventHubReceiver(connectionString, entityName,
-    		partitionId, consumerGroupName, defaultCredits, filter);
-    receiver.initialize();
-    
+    try {
+      ehClient = EventHubClient.createFromConnectionStringSync(connectionString);
+
+      if (filter.getOffset()!=null) {
+        receiver = ehClient.createEpochReceiverSync(
+                   consumerGroupName,
+                   partitionId,
+                   filter.getOffset(),
+                   false,
+                   1);
+      }
+      else if (filter.getTime()!=null) {
+        receiver = ehClient.createEpochReceiverSync(
+                   consumerGroupName,
+                   partitionId,
+                   filter.getTime(),
+                   1);
+      }
+      else{
+        throw new RuntimeException("Eventhub receiver must have " +
+                "an offset or time to be created");
+      }
+    } catch (IOException e) {
+      logger.error("Exception in creating ehclient"+ e.toString());
+      throw new EventHubException(e);
+    }
+    catch (ServiceBusException e) {
+      logger.error("Exception in creating Receiver"+e.toString());
+      throw new EventHubException(e);
+    }
     long end = System.currentTimeMillis();
     logger.info("created eventhub receiver, time taken(ms): " + (end-start));
   }
@@ -80,62 +99,60 @@ public class EventHubReceiverImpl implements IEventHubReceiver {
   @Override
   public void close() {
     if(receiver != null) {
-      receiver.close();
+      try {
+        receiver.close().whenComplete((voidargs,error)->{
+          try {
+            if (error!=null) {
+              logger.error("Exception during receiver close phase"+error.toString());
+            }
+            ehClient.closeSync();
+          } catch (Exception e) {
+            logger.error("Exception during ehclient close phase"+e.toString());
+          }
+        }).get();
+      } catch (InterruptedException e) {
+        logger.error("Exception occured during close phase"+e.toString());
+      } catch (ExecutionException e) {
+        logger.error("Exception occured during close phase"+e.toString());
+      }
       logger.info("closed eventhub receiver: partitionId=" + partitionId );
       receiver = null;
+      ehClient =  null;
     }
   }
-  
+
+
   @Override
   public boolean isOpen() {
     return (receiver != null);
   }
 
   @Override
-  public EventData receive(long timeoutInMilliseconds) {
+  public EventDataWrap receive() {
     long start = System.currentTimeMillis();
-    Message message = receiver.receive(timeoutInMilliseconds);
+    Iterable<EventData> receivedEvents=null;
+    /*Get one message at a time for backward compatibility behaviour*/
+    try {
+      receivedEvents = receiver.receiveSync(1);
+    } catch (ServiceBusException e) {
+      logger.error("Exception occured during receive"+e.toString());
+      return null;
+    }
     long end = System.currentTimeMillis();
     long millis = (end - start);
     receiveApiLatencyMean.update(millis);
     receiveApiCallCount.incr();
-    
-    if (message == null) {
-      //Temporary workaround for AMQP/EH bug of failing to receive messages
-      /*if(timeoutInMilliseconds > 100 && millis < timeoutInMilliseconds/2) {
-        throw new RuntimeException(
-            "Restart EventHubSpout due to failure of receiving messages in "
-            + millis + " millisecond");
-      }*/
+
+    if (receivedEvents == null || receivedEvents.spliterator().getExactSizeIfKnown() == 0) {
       return null;
     }
-
     receiveMessageCount.incr();
+    EventData receivedEvent = receivedEvents.iterator().next();
+    MessageId messageId = new MessageId(partitionId,
+            receivedEvent.getSystemProperties().getOffset(),
+            receivedEvent.getSystemProperties().getSequenceNumber());
 
-    MessageId messageId = createMessageId(message);
-    return EventData.create(message, messageId);
-  }
-  
-  private MessageId createMessageId(Message message) {
-    String offset = null;
-    long sequenceNumber = 0;
-
-    for (Section section : message.getPayload()) {
-      if (section instanceof MessageAnnotations) {
-        MessageAnnotations annotations = (MessageAnnotations) section;
-        HashMap annonationMap = (HashMap) annotations.getValue();
-
-        if (annonationMap.containsKey(OffsetKey)) {
-          offset = (String) annonationMap.get(OffsetKey);
-        }
-
-        if (annonationMap.containsKey(SequenceNumberKey)) {
-          sequenceNumber = (Long) annonationMap.get(SequenceNumberKey);
-        }
-      }
-    }
-
-    return MessageId.create(partitionId, offset, sequenceNumber);
+    return EventDataWrap.create(receivedEvent,messageId);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java
index 6adef42..23a7453 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java
@@ -24,16 +24,14 @@ import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.base.BaseRichSpout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
-import org.apache.qpid.amqp_1_0.client.Message;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 public class EventHubSpout extends BaseRichSpout {
 
   private static final Logger logger = LoggerFactory.getLogger(EventHubSpout.class);
@@ -138,7 +136,7 @@ public class EventHubSpout extends BaseRichSpout {
 
   @Override
   public void open(Map config, TopologyContext context, SpoutOutputCollector collector) {
-    logger.info("begin: open()");
+    logger.info("begin:start open()");
     String topologyName = (String) config.get(Config.TOPOLOGY_NAME);
     eventHubConfig.setTopologyName(topologyName);
 
@@ -174,7 +172,7 @@ public class EventHubSpout extends BaseRichSpout {
 
   @Override
   public void nextTuple() {
-    EventData eventData = null;
+    EventDataWrap eventDatawrap = null;
 
     List<IPartitionManager> partitionManagers = partitionCoordinator.getMyPartitionManagers();
     for (int i = 0; i < partitionManagers.size(); i++) {
@@ -185,20 +183,16 @@ public class EventHubSpout extends BaseRichSpout {
         throw new RuntimeException("partitionManager doesn't exist.");
       }
 
-      eventData = partitionManager.receive();
+      eventDatawrap = partitionManager.receive();
 
-      if (eventData != null) {
+      if (eventDatawrap != null) {
         break;
       }
     }
 
-
-    if (eventData != null) {
-      MessageId messageId = eventData.getMessageId();
-      Message message = eventData.getMessage();
-
-      List<Object> tuples = scheme.deserialize(message);
-
+    if (eventDatawrap != null) {
+      MessageId messageId = eventDatawrap.getMessageId();
+      List<Object> tuples = scheme.deserialize(eventDatawrap.getEventData());
       if (tuples != null) {
         collector.emit(tuples, messageId);
       }

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
index e06953a..5556970 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
@@ -17,10 +17,12 @@
  *******************************************************************************/
 package org.apache.storm.eventhubs.spout;
 
+import com.microsoft.azure.eventhubs.EventHubClient;
+import com.microsoft.azure.servicebus.ConnectionStringBuilder;
+
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
-import com.microsoft.eventhubs.client.ConnectionStringBuilder;
 
 public class EventHubSpoutConfig implements Serializable {
 	private static final long serialVersionUID = 1L;
@@ -42,8 +44,7 @@ public class EventHubSpoutConfig implements Serializable {
 	private String connectionString;
 	private String topologyName;
 	private IEventDataScheme scheme = new StringEventDataScheme();
-	private String consumerGroupName = null; // if null then use default
-												// consumer group
+	private String consumerGroupName = EventHubClient.DEFAULT_CONSUMER_GROUP_NAME;
 	private String outputStreamId;
 
 
@@ -52,8 +53,8 @@ public class EventHubSpoutConfig implements Serializable {
 			String namespace, String entityPath, int partitionCount) {
 		this.userName = username;
 		this.password = password;
-		this.connectionString = new ConnectionStringBuilder(username, password,
-				namespace).getConnectionString();
+		this.connectionString = new ConnectionStringBuilder(namespace,entityPath,
+				username,password).toString();
 		this.namespace = namespace;
 		this.entityPath = entityPath;
 		this.partitionCount = partitionCount;
@@ -232,9 +233,12 @@ public class EventHubSpoutConfig implements Serializable {
 		return connectionString;
 	}
 
+	/*Keeping it for backward compatibility*/
 	public void setTargetAddress(String targetFqnAddress) {
-		this.connectionString = new ConnectionStringBuilder(userName, password,
-				namespace, targetFqnAddress).getConnectionString();
+	}
+
+	public void setTargetAddress(){
+
 	}
 
 	public EventHubSpoutConfig withTargetAddress(String targetFqnAddress) {

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutException.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutException.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutException.java
deleted file mode 100755
index 0fd6ac4..0000000
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutException.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.spout;
-
-public class EventHubSpoutException extends Exception {
-
-  public EventHubSpoutException() {
-    super();
-  }
-
-  public EventHubSpoutException(String message) {
-    super(message);
-  }
-
-  public EventHubSpoutException(Throwable cause) {
-    super(cause);
-  }
-
-  public EventHubSpoutException(String message, Throwable cause) {
-    super(message, cause);
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java
index b238391..079d1da 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java
@@ -23,4 +23,5 @@ public class FieldConstants {
   public static final String Offset = "offset";
   public static final String Message = "message";
   public static final String META_DATA = "metadata";
+  public static final String DefaultStartingOffset = "-1";
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java
index b8101b9..6c78524 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java
@@ -17,10 +17,11 @@
  *******************************************************************************/
 package org.apache.storm.eventhubs.spout;
 
+import com.microsoft.azure.eventhubs.EventData;
 import org.apache.storm.tuple.Fields;
+
 import java.io.Serializable;
 import java.util.List;
-import org.apache.qpid.amqp_1_0.client.Message;
 
 public interface IEventDataScheme extends Serializable {
 
@@ -29,10 +30,10 @@ public interface IEventDataScheme extends Serializable {
    *
    * @see #getOutputFields() for the list of fields the tuple will contain.
    *
-   * @param message The Message to Deserialize.
+   * @param eventData The EventData to Deserialize.
    * @return A tuple containing the deserialized fields of the message.
    */
-  List<Object> deserialize(Message message);
+  List<Object> deserialize(EventData eventData);
 
   /**
    * Retrieve the Fields that are present on tuples created by this object.

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventFilter.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventFilter.java
new file mode 100644
index 0000000..c67823d
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventFilter.java
@@ -0,0 +1,28 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+
+package org.apache.storm.eventhubs.spout;
+
+import java.time.Instant;
+
+public interface IEventFilter {
+
+    String getOffset();
+
+    Instant getTime();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java
index bc2db14..4ae78c5 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java
@@ -19,18 +19,15 @@ package org.apache.storm.eventhubs.spout;
 
 import java.util.Map;
 
-import com.microsoft.eventhubs.client.EventHubException;
-import com.microsoft.eventhubs.client.IEventHubFilter;
-
 public interface IEventHubReceiver {
 
-  void open(IEventHubFilter filter) throws EventHubException;
+  void open(IEventFilter filter) throws EventHubException;
 
   void close();
-  
+
   boolean isOpen();
 
-  EventData receive(long timeoutInMilliseconds);
-  
+  EventDataWrap receive();
+
   Map getMetricsData();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java
index ac986d3..845f508 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java
@@ -25,7 +25,7 @@ public interface IPartitionManager {
 
   void close();
 
-  EventData receive();
+  EventDataWrap receive();
 
   void checkpoint();
 

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/PartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/PartitionManager.java
index 1054742..20e021a 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/PartitionManager.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/PartitionManager.java
@@ -29,9 +29,9 @@ public class PartitionManager extends SimplePartitionManager {
   private final int ehReceiveTimeoutMs = 5000;
 
   //all sent events are stored in pending
-  private final Map<String, EventData> pending;
+  private final Map<String, EventDataWrap> pending;
   //all failed events are put in toResend, which is sorted by event's offset
-  private final TreeSet<EventData> toResend;
+  private final TreeSet<EventDataWrap> toResend;
 
   public PartitionManager(
     EventHubSpoutConfig spoutConfig,
@@ -41,29 +41,29 @@ public class PartitionManager extends SimplePartitionManager {
 
     super(spoutConfig, partitionId, stateStore, receiver);
     
-    this.pending = new LinkedHashMap<String, EventData>();
-    this.toResend = new TreeSet<EventData>();
+    this.pending = new LinkedHashMap<String, EventDataWrap>();
+    this.toResend = new TreeSet<EventDataWrap>();
   }
 
   @Override
-  public EventData receive() {
+  public EventDataWrap receive() {
     if(pending.size() >= config.getMaxPendingMsgsPerPartition()) {
       return null;
     }
 
-    EventData eventData;
+    EventDataWrap eventDatawrap;
     if (toResend.isEmpty()) {
-      eventData = receiver.receive(ehReceiveTimeoutMs);
+      eventDatawrap = receiver.receive();
     } else {
-      eventData = toResend.pollFirst();
+      eventDatawrap = toResend.pollFirst();
     }
 
-    if (eventData != null) {
-      lastOffset = eventData.getMessageId().getOffset();
-      pending.put(lastOffset, eventData);
+    if (eventDatawrap != null) {
+      lastOffset = eventDatawrap.getMessageId().getOffset();
+      pending.put(lastOffset, eventDatawrap);
     }
 
-    return eventData;
+    return eventDatawrap;
   }
 
   @Override
@@ -74,8 +74,8 @@ public class PartitionManager extends SimplePartitionManager {
   @Override
   public void fail(String offset) {
     logger.warn("fail on " + offset);
-    EventData eventData = pending.remove(offset);
-    toResend.add(eventData);
+    EventDataWrap eventDataWrap = pending.remove(offset);
+    toResend.add(eventDataWrap);
   }
   
   @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SerializeDeserializeUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SerializeDeserializeUtil.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SerializeDeserializeUtil.java
new file mode 100644
index 0000000..0439a4f
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SerializeDeserializeUtil.java
@@ -0,0 +1,34 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+
+package org.apache.storm.eventhubs.spout;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+
+public class SerializeDeserializeUtil {
+    public static byte[] serialize(Object obj) throws IOException {
+        try (ByteArrayOutputStream b = new ByteArrayOutputStream()) {
+            try (ObjectOutputStream o = new ObjectOutputStream(b)) {
+                o.writeObject(obj);
+            }
+            return b.toByteArray();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java
index b66a785..b76b342 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java
@@ -17,16 +17,11 @@
  *******************************************************************************/
 package org.apache.storm.eventhubs.spout;
 
-import java.util.Map;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import java.time.Instant;
 
-import com.microsoft.eventhubs.client.Constants;
-import com.microsoft.eventhubs.client.EventHubEnqueueTimeFilter;
-import com.microsoft.eventhubs.client.EventHubOffsetFilter;
-import com.microsoft.eventhubs.client.IEventHubFilter;
-
+import java.util.Map;
 /**
  * A simple partition manager that does not re-send failed messages
  */
@@ -62,16 +57,15 @@ public class SimplePartitionManager implements IPartitionManager {
     String offset = stateStore.readData(statePath);
     logger.info("read offset from state store: " + offset);
     if(offset == null) {
-      offset = Constants.DefaultStartingOffset;
+      offset = FieldConstants.DefaultStartingOffset;
     }
-
-    IEventHubFilter filter;
-    if (offset.equals(Constants.DefaultStartingOffset)
+    IEventFilter filter;
+    if (offset.equals(FieldConstants.DefaultStartingOffset)
         && config.getEnqueueTimeFilter() != 0) {
-      filter = new EventHubEnqueueTimeFilter(config.getEnqueueTimeFilter());
+      filter = new EventHubFilter(Instant.ofEpochMilli(config.getEnqueueTimeFilter()));
     }
-    else {
-      filter = new EventHubOffsetFilter(offset);
+    else{
+      filter = new EventHubFilter(offset);
     }
 
     receiver.open(filter);
@@ -98,12 +92,12 @@ public class SimplePartitionManager implements IPartitionManager {
   }
 
   @Override
-  public EventData receive() {
-    EventData eventData = receiver.receive(5000);
-    if (eventData != null) {
-      lastOffset = eventData.getMessageId().getOffset();
+  public EventDataWrap receive() {
+    EventDataWrap eventDatawrap = receiver.receive();
+    if (eventDatawrap != null) {
+      lastOffset = eventDatawrap.getEventData().getSystemProperties().getOffset();
     }
-    return eventData;
+    return eventDatawrap;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StaticPartitionCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StaticPartitionCoordinator.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StaticPartitionCoordinator.java
index 8d2c485..abbecac 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StaticPartitionCoordinator.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StaticPartitionCoordinator.java
@@ -25,7 +25,6 @@ import java.util.Map;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.microsoft.eventhubs.client.Constants;
 
 public class StaticPartitionCoordinator implements IPartitionCoordinator {
 

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java
index 0c6f8b6..a2024bd 100644
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java
@@ -17,19 +17,13 @@
  *******************************************************************************/
 package org.apache.storm.eventhubs.spout;
 
+import com.microsoft.azure.eventhubs.EventData;
 import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-
-import org.apache.qpid.amqp_1_0.client.Message;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
-import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
-import org.apache.qpid.amqp_1_0.type.messaging.Data;
-import org.apache.storm.tuple.Fields;
 
 /**
  * An Event Data Scheme which deserializes message payload into the Strings.
@@ -44,21 +38,32 @@ import org.apache.storm.tuple.Fields;
 public class StringEventDataScheme implements IEventDataScheme {
 
   private static final long serialVersionUID = 1L;
+  private static final Logger logger = LoggerFactory.getLogger(StringEventDataScheme.class);
 
   @Override
-  public List<Object> deserialize(Message message) {
+  public List<Object> deserialize(EventData eventData) {
     final List<Object> fieldContents = new ArrayList<Object>();
-
-    for (Section section : message.getPayload()) {
-      if (section instanceof Data) {
-        Data data = (Data) section;
-        fieldContents.add(new String(data.getValue().getArray()));
-      } else if (section instanceof AmqpValue) {
-        AmqpValue amqpValue = (AmqpValue) section;
-        fieldContents.add(amqpValue.getValue().toString());
+    String messageData = "";
+    if (eventData.getBytes()!=null) {
+      messageData = new String(eventData.getBytes());
+    }
+    /*Will only serialize AMQPValue type*/
+    else if (eventData.getObject()!=null) {
+      try {
+        if (!(eventData.getObject() instanceof List)) {
+          messageData = eventData.getObject().toString();
+        } else {
+          throw new RuntimeException("Cannot serialize the given AMQP type.");
+        }
+      } catch (RuntimeException e){
+        logger.error("Failed to serialize EventData payload class"
+                + eventData.getObject().getClass());
+        logger.error("Exception encountered while serializing EventData payload is"
+                + e.toString());
+        throw e;
       }
     }
-    
+    fieldContents.add(messageData);
     return fieldContents;
   }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java
index fbe779d..069d819 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java
@@ -17,9 +17,9 @@
  *******************************************************************************/
 package org.apache.storm.eventhubs.trident;
 
-import java.util.List;
+import org.apache.storm.eventhubs.spout.EventDataWrap;
 
-import org.apache.storm.eventhubs.spout.EventData;
+import java.util.List;
 
 public interface ITridentPartitionManager {
   boolean open(String offset);
@@ -31,5 +31,5 @@ public interface ITridentPartitionManager {
    * @param count max number of messages in this batch
    * @return list of EventData, if failed to receive, return empty list
    */
-  public List<EventData> receiveBatch(String offset, int count);
+  public List<EventDataWrap> receiveBatch(String offset, int count);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java
index e5c1c50..b78fcd3 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java
@@ -17,24 +17,16 @@
  *******************************************************************************/
 package org.apache.storm.eventhubs.trident;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.storm.eventhubs.spout.EventData;
-import org.apache.storm.eventhubs.spout.EventHubReceiverImpl;
-import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
-import org.apache.storm.eventhubs.spout.IEventHubReceiver;
-import org.apache.storm.eventhubs.spout.IEventHubReceiverFactory;
-import com.microsoft.eventhubs.client.Constants;
-
+import org.apache.storm.eventhubs.spout.*;
 import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
 import org.apache.storm.trident.spout.IPartitionedTridentSpout;
 import org.apache.storm.trident.topology.TransactionAttempt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 
 public class TransactionalTridentEventHubEmitter
@@ -111,15 +103,15 @@ public class TransactionalTridentEventHubEmitter
     int count = Integer.parseInt((String)meta.get("count"));
     logger.info("re-emit for partition " + partition.getId() + ", offset=" + offset + ", count=" + count);
     ITridentPartitionManager pm = getOrCreatePartitionManager(partition);
-    List<EventData> listEvents = pm.receiveBatch(offset, count);
+    List<EventDataWrap> listEvents = pm.receiveBatch(offset, count);
     if(listEvents.size() != count) {
       logger.error("failed to refetch eventhub messages, new count=" + listEvents.size());
       return;
     }
 
-    for(EventData ed: listEvents) {
+    for(EventDataWrap ed: listEvents) {
       List<Object> tuples = 
-          spoutConfig.getEventDataScheme().deserialize(ed.getMessage());
+          spoutConfig.getEventDataScheme().deserialize(ed.getEventData());
       collector.emit(tuples);
     }
   }
@@ -128,20 +120,20 @@ public class TransactionalTridentEventHubEmitter
   public Map emitPartitionBatchNew(TransactionAttempt attempt,
       TridentCollector collector, Partition partition, Map meta) {
     ITridentPartitionManager pm = getOrCreatePartitionManager(partition);
-    String offset = Constants.DefaultStartingOffset;
+    String offset = FieldConstants.DefaultStartingOffset;
     if(meta != null && meta.containsKey("nextOffset")) {
       offset = (String)meta.get("nextOffset");
     }
     //logger.info("emit for partition " + partition.getId() + ", offset=" + offset);
     String nextOffset = offset;
 
-    List<EventData> listEvents = pm.receiveBatch(offset, batchSize);
+    List<EventDataWrap> listEvents = pm.receiveBatch(offset, batchSize);
 
-    for(EventData ed: listEvents) {
+    for(EventDataWrap ed: listEvents) {
       //update nextOffset;
       nextOffset = ed.getMessageId().getOffset();
       List<Object> tuples = 
-          spoutConfig.getEventDataScheme().deserialize(ed.getMessage());
+          spoutConfig.getEventDataScheme().deserialize(ed.getEventData());
       collector.emit(tuples);
     }
     //logger.info("emitted new batches: " + listEvents.size());

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java
index 159fe41..a384667 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java
@@ -17,27 +17,20 @@
  *******************************************************************************/
 package org.apache.storm.eventhubs.trident;
 
-import java.util.ArrayList;
-import java.util.List;
-
+import org.apache.storm.eventhubs.spout.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.microsoft.eventhubs.client.Constants;
-import com.microsoft.eventhubs.client.EventHubEnqueueTimeFilter;
-import com.microsoft.eventhubs.client.EventHubException;
-import com.microsoft.eventhubs.client.EventHubOffsetFilter;
-
-import org.apache.storm.eventhubs.spout.EventData;
-import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
-import org.apache.storm.eventhubs.spout.IEventHubReceiver;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
 
 public class TridentPartitionManager implements ITridentPartitionManager {
   private static final Logger logger = LoggerFactory.getLogger(TridentPartitionManager.class);
   private final int receiveTimeoutMs = 5000;
   private final IEventHubReceiver receiver;
   private final EventHubSpoutConfig spoutConfig;
-  private String lastOffset = Constants.DefaultStartingOffset;
+  private String lastOffset = FieldConstants.DefaultStartingOffset;
   
   public TridentPartitionManager(EventHubSpoutConfig spoutConfig, IEventHubReceiver receiver) {
     this.receiver = receiver;
@@ -47,12 +40,12 @@ public class TridentPartitionManager implements ITridentPartitionManager {
   @Override
   public boolean open(String offset) {
     try {
-      if((offset == null || offset.equals(Constants.DefaultStartingOffset)) 
+      if((offset == null || offset.equals(FieldConstants.DefaultStartingOffset))
         && spoutConfig.getEnqueueTimeFilter() != 0) {
-          receiver.open(new EventHubEnqueueTimeFilter(spoutConfig.getEnqueueTimeFilter()));
+          receiver.open(new EventHubFilter(Instant.ofEpochMilli(spoutConfig.getEnqueueTimeFilter())));
       }
       else {
-        receiver.open(new EventHubOffsetFilter(offset));
+        receiver.open(new EventHubFilter(offset));
       }
       lastOffset = offset;
       return true;
@@ -69,8 +62,8 @@ public class TridentPartitionManager implements ITridentPartitionManager {
   }
   
   @Override
-  public List<EventData> receiveBatch(String offset, int count) {
-    List<EventData> batch = new ArrayList<EventData>(count);
+  public List<EventDataWrap> receiveBatch(String offset, int count) {
+    List<EventDataWrap> batch = new ArrayList<EventDataWrap>(count);
     if(!offset.equals(lastOffset) || !receiver.isOpen()) {
       //re-establish connection to eventhub servers using the right offset
       //TBD: might be optimized with cache.
@@ -81,7 +74,7 @@ public class TridentPartitionManager implements ITridentPartitionManager {
     }
     
     for(int i=0; i<count; ++i) {
-      EventData ed = receiver.receive(receiveTimeoutMs);
+      EventDataWrap ed = receiver.receive();
       if(ed == null) {
         break;
       }

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubReceiverMock.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubReceiverMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubReceiverMock.java
index b176598..7538d4b 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubReceiverMock.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubReceiverMock.java
@@ -17,22 +17,9 @@
  *******************************************************************************/
 package org.apache.storm.eventhubs.spout;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.storm.eventhubs.spout.MessageId;
-import org.apache.storm.eventhubs.spout.EventData;
-import org.apache.storm.eventhubs.spout.IEventHubReceiver;
-import org.apache.qpid.amqp_1_0.client.Message;
-import org.apache.qpid.amqp_1_0.jms.impl.TextMessageImpl;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.messaging.Data;
+import com.microsoft.azure.eventhubs.EventData;
 
-import com.microsoft.eventhubs.client.EventHubException;
-import com.microsoft.eventhubs.client.EventHubOffsetFilter;
-import com.microsoft.eventhubs.client.IEventHubFilter;
+import java.util.Map;
 
 /**
  * A mock receiver that emits fake data with offset starting from given offset
@@ -59,8 +46,10 @@ public class EventHubReceiverMock implements IEventHubReceiver {
   }
 
   @Override
-  public void open(IEventHubFilter filter) throws EventHubException {
-    currentOffset = Long.parseLong(filter.getFilterValue());
+  public void open(IEventFilter filter) throws EventHubException {
+    currentOffset = filter.getOffset() != null ?
+            Long.parseLong(filter.getOffset()) :
+            filter.getTime().toEpochMilli();
     isOpen = true;
   }
 
@@ -68,26 +57,25 @@ public class EventHubReceiverMock implements IEventHubReceiver {
   public void close() {
     isOpen = false;
   }
-  
+
   @Override
   public boolean isOpen() {
     return isOpen;
   }
 
   @Override
-  public EventData receive(long timeoutInMilliseconds) {
+  public EventDataWrap receive() {
     if(isPaused) {
       return null;
     }
 
     currentOffset++;
-    List<Section> body = new ArrayList<Section>();
+
     //the body of the message is "message" + currentOffset, e.g. "message123"
-    body.add(new Data(new Binary(("message" + currentOffset).getBytes())));
-    Message m = new Message(body);
+
     MessageId mid = new MessageId(partitionId, "" + currentOffset, currentOffset);
-    EventData ed = new EventData(m, mid);
-    return ed;
+    EventData ed = new EventData(("message" + currentOffset).getBytes());
+    return EventDataWrap.create(ed,mid);
   }
   
   @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/PartitionManagerCallerMock.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/PartitionManagerCallerMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/PartitionManagerCallerMock.java
index dd63d5d..467461c 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/PartitionManagerCallerMock.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/PartitionManagerCallerMock.java
@@ -17,10 +17,6 @@
  *******************************************************************************/
 package org.apache.storm.eventhubs.spout;
 
-import org.apache.storm.eventhubs.spout.PartitionManager;
-import org.apache.storm.eventhubs.spout.EventData;
-import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
-
 /**
  * This mock exercises PartitionManager
  */
@@ -71,7 +67,7 @@ public class PartitionManagerCallerMock {
           count = Integer.parseInt(cmd.substring(1));
         }
         for(int i=0; i<count; ++i) {
-          EventData ed = pm.receive();
+          EventDataWrap ed = pm.receive();
           if(ed == null) {
             ret.append("null,");
           }

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java
index aa8d097..f260dea 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java
@@ -17,12 +17,12 @@
  *******************************************************************************/
 package org.apache.storm.eventhubs.spout;
 
-import static org.junit.Assert.*;
-
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.junit.Assert.assertTrue;
+
 public class TestEventData {
 
   @Before
@@ -37,10 +37,10 @@ public class TestEventData {
   public void testEventDataComparision() {
 
 	MessageId messageId1 = MessageId.create(null, "3", 1);
-	EventData eventData1 = EventData.create(null, messageId1);
+	EventDataWrap eventData1 = EventDataWrap.create(null, messageId1);
 
 	MessageId messageId2 = MessageId.create(null, "13", 2);
-	EventData eventData2 = EventData.create(null, messageId2);
+	EventDataWrap eventData2 = EventDataWrap.create(null, messageId2);
 
 	assertTrue(eventData2.compareTo(eventData1) > 0);
   }

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java
index 49e544b..a7b3588 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java
@@ -17,12 +17,13 @@
  *******************************************************************************/
 package org.apache.storm.eventhubs.spout;
 
-import static org.junit.Assert.*;
-
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+
+
 
 public class TestEventHubSpout {
 
@@ -40,7 +41,7 @@ public class TestEventHubSpout {
         "namespace", "entityname", 16);
     conf.setZkConnectionString("zookeeper");
     conf.setCheckpointIntervalInSeconds(1);
-    assertEquals(conf.getConnectionString(), "amqps://username:pas%5Cs%2Bw%2Ford@namespace.servicebus.windows.net");
+    assertEquals(conf.getConnectionString(), "Endpoint=amqps://namespace.servicebus.windows.net;EntityPath=entityname;SharedAccessKeyName=username;SharedAccessKey=pas\\s+w/ord;OperationTimeout=PT1M;RetryPolicy=Default");
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1281171..d63907c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -306,7 +306,7 @@
         <mavenVersion>3.1.0</mavenVersion>
         <wagonVersion>1.0</wagonVersion>
         <qpid.version>0.32</qpid.version>
-        <eventhubs.client.version>1.0.1</eventhubs.client.version>
+        <azure-eventhubs.version>0.13.1</azure-eventhubs.version>
         <jersey.version>2.24.1</jersey.version>
 
         <!-- see intellij profile below... This fixes an annoyance with intellij -->


[3/3] storm git commit: Added STORM-2371 to CHANGELOG.

Posted by sr...@apache.org.
Added STORM-2371 to CHANGELOG.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2307b802
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2307b802
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2307b802

Branch: refs/heads/master
Commit: 2307b8023c947a408aac6d522d18e13434c0f57e
Parents: 940096a
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Tue Apr 18 14:08:22 2017 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Tue Apr 18 14:08:22 2017 -0700

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/2307b802/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 116c9b3..5f9d118 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 \ufeff## 2.0.0
+ * STORM-2371: Replace existing AMQP eventhub client with the lastest one from Microsoft eventhubs
  * STORM-2475: Fix parsing of host:port to deal with IPv6 addresses
  * STORM-832: Allow config validation to be used by plugins/etc.
  * STORM-2471: Add metric for thread count


[2/3] storm git commit: Merge branch 'eventhub3' of https://github.com/rban1/storm

Posted by sr...@apache.org.
Merge branch 'eventhub3' of https://github.com/rban1/storm


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/940096a2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/940096a2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/940096a2

Branch: refs/heads/master
Commit: 940096a275562a25eccf5da7a6b048702b1be788
Parents: f794d72 fdb9136
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Tue Apr 18 14:02:58 2017 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Tue Apr 18 14:02:58 2017 -0700

----------------------------------------------------------------------
 external/storm-eventhubs/pom.xml                |  29 +---
 .../storm/eventhubs/bolt/EventHubBolt.java      |  82 ++++++++---
 .../eventhubs/bolt/EventHubBoltConfig.java      |  10 +-
 .../eventhubs/spout/BinaryEventDataScheme.java  |  43 +++---
 .../apache/storm/eventhubs/spout/EventData.java |  48 -------
 .../storm/eventhubs/spout/EventDataScheme.java  |  51 +++----
 .../storm/eventhubs/spout/EventDataWrap.java    |  48 +++++++
 .../eventhubs/spout/EventHubException.java      |  41 ++++++
 .../storm/eventhubs/spout/EventHubFilter.java   |  46 ++++++
 .../eventhubs/spout/EventHubReceiverImpl.java   | 141 +++++++++++--------
 .../storm/eventhubs/spout/EventHubSpout.java    |  24 ++--
 .../eventhubs/spout/EventHubSpoutConfig.java    |  18 ++-
 .../eventhubs/spout/EventHubSpoutException.java |  37 -----
 .../storm/eventhubs/spout/FieldConstants.java   |   1 +
 .../storm/eventhubs/spout/IEventDataScheme.java |   7 +-
 .../storm/eventhubs/spout/IEventFilter.java     |  28 ++++
 .../eventhubs/spout/IEventHubReceiver.java      |  11 +-
 .../eventhubs/spout/IPartitionManager.java      |   2 +-
 .../storm/eventhubs/spout/PartitionManager.java |  28 ++--
 .../spout/SerializeDeserializeUtil.java         |  34 +++++
 .../eventhubs/spout/SimplePartitionManager.java |  32 ++---
 .../spout/StaticPartitionCoordinator.java       |   1 -
 .../eventhubs/spout/StringEventDataScheme.java  |  43 +++---
 .../trident/ITridentPartitionManager.java       |   6 +-
 .../TransactionalTridentEventHubEmitter.java    |  36 ++---
 .../trident/TridentPartitionManager.java        |  29 ++--
 .../eventhubs/spout/EventHubReceiverMock.java   |  36 ++---
 .../spout/PartitionManagerCallerMock.java       |   6 +-
 .../storm/eventhubs/spout/TestEventData.java    |   8 +-
 .../eventhubs/spout/TestEventHubSpout.java      |   7 +-
 pom.xml                                         |   2 +-
 31 files changed, 531 insertions(+), 404 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/940096a2/external/storm-eventhubs/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/940096a2/pom.xml
----------------------------------------------------------------------