You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/06/03 19:35:55 UTC

[41/50] [abbrv] storm git commit: storm-eventhubs improvement

storm-eventhubs improvement

EventHubBolt add event formatter to format tuples into bytes
Refactor EventHubSpoutConfig
Add support for specifying consumer group name
Workaround for Qpid issue that in rare cases messages cannot be received

Signed-off-by: Shanyu Zhao <sh...@microsoft.com>


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e5154928
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e5154928
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e5154928

Branch: refs/heads/master
Commit: e515492864092c16c2ecc86577a3be73ec79fa56
Parents: 847958c
Author: Shanyu Zhao <sh...@microsoft.com>
Authored: Wed May 13 12:26:53 2015 -0700
Committer: Shanyu Zhao <sh...@microsoft.com>
Committed: Wed May 13 12:26:53 2015 -0700

----------------------------------------------------------------------
 external/storm-eventhubs/pom.xml                | 239 +++++++++----------
 .../eventhubs/bolt/DefaultEventDataFormat.java  |  47 ++++
 .../storm/eventhubs/bolt/EventHubBolt.java      | 182 +++++++-------
 .../eventhubs/bolt/EventHubBoltConfig.java      | 107 +++++++++
 .../storm/eventhubs/bolt/IEventDataFormat.java  |  28 +++
 .../storm/eventhubs/client/EventHubClient.java  | 187 ++++++++-------
 .../storm/eventhubs/client/EventHubSender.java  | 194 +++++++--------
 .../storm/eventhubs/samples/EventCount.java     |   5 +-
 .../storm/eventhubs/samples/EventHubLoop.java   | 103 ++++----
 .../eventhubs/spout/EventHubReceiverImpl.java   |  20 +-
 .../storm/eventhubs/spout/EventHubSpout.java    |   5 +
 .../eventhubs/spout/EventHubSpoutConfig.java    | 105 +++++---
 .../src/main/resources/config.properties        |   5 +-
 .../eventhubs/spout/TestEventHubSpout.java      |   4 +-
 14 files changed, 750 insertions(+), 481 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e5154928/external/storm-eventhubs/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/pom.xml b/external/storm-eventhubs/pom.xml
index 5ed65c7..2ceed09 100755
--- a/external/storm-eventhubs/pom.xml
+++ b/external/storm-eventhubs/pom.xml
@@ -1,122 +1,119 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-    
-    <parent>
-        <artifactId>storm</artifactId>
-        <groupId>org.apache.storm</groupId>
-        <version>0.11.0-SNAPSHOT</version>
-        <relativePath>../../pom.xml</relativePath>
-    </parent>
-    
-    <artifactId>storm-eventhubs</artifactId>
-    <version>0.11.0-SNAPSHOT</version>
-    <packaging>jar</packaging>
-    <name>storm-eventhubs</name>
-    <description>EventHubs Storm Spout</description>
-
-    <properties>
-        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <qpid.version>0.28</qpid.version>
-    </properties>
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-assembly-plugin</artifactId>
-                <version>2.4.1</version>
-                <executions>
-                    <execution>
-                        <goals>
-                            <goal>attached</goal>
-                        </goals>
-                        <phase>package</phase>
-                        <configuration>
-                            <descriptorRefs>
-                                <descriptorRef>jar-with-dependencies</descriptorRef>
-                            </descriptorRefs>
-                            <archive>
-                                <manifest>
-                                    <mainClass>org.apache.storm.eventhubs.samples.EventCount</mainClass>
-                                </manifest>
-                            </archive>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-            <plugin>
-		        <artifactId>maven-antrun-plugin</artifactId>
-		        <executions>
-		          <execution>
-		            <phase>package</phase>
-		            <configuration>
-		              <tasks>
-		                <copy file="src/main/resources/config.properties" tofile="target/eventhubs-config.properties"/>
-                    </tasks>
-		            </configuration>
-		            <goals>
-		              <goal>run</goal>
-		            </goals>
-		          </execution>
-		        </executions>
-	        </plugin>
-        </plugins>
-    </build>
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.qpid</groupId>
-            <artifactId>qpid-client</artifactId>
-            <version>${qpid.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.qpid</groupId>
-            <artifactId>qpid-amqp-1-0-client-jms</artifactId>
-            <version>${qpid.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-core</artifactId>
-            <version>${project.version}</version>
-            <!-- keep storm out of the jar-with-dependencies -->
-            <type>jar</type>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.curator</groupId>
-            <artifactId>curator-framework</artifactId>
-            <version>${curator.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <version>4.11</version>
-            <scope>test</scope>
-        </dependency>
-    </dependencies> 
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>0.11.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+    
+    <artifactId>storm-eventhubs</artifactId>
+    <version>0.11.0-SNAPSHOT</version>
+    <packaging>jar</packaging>
+    <name>storm-eventhubs</name>
+    <description>EventHubs Storm Spout</description>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <qpid.version>0.32</qpid.version>
+    </properties>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>2.3</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <phase>package</phase>
+                    </execution>
+                </executions>
+                <configuration>
+                    <transformers>
+                        <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer">
+                        </transformer>
+                    </transformers>
+                    <outputFile>target/${project.artifactId}-${project.version}-jar-with-dependencies.jar</outputFile>
+                </configuration>
+	        </plugin>
+            <plugin>
+		        <artifactId>maven-antrun-plugin</artifactId>
+		        <executions>
+		          <execution>
+		            <phase>package</phase>
+		            <configuration>
+		              <tasks>
+		                <copy file="src/main/resources/config.properties" tofile="target/eventhubs-config.properties"/>
+                    </tasks>
+		            </configuration>
+		            <goals>
+		              <goal>run</goal>
+		            </goals>
+		          </execution>
+		        </executions>
+	        </plugin>
+        </plugins>
+    </build>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-client</artifactId>
+            <version>${qpid.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-amqp-1-0-client-jms</artifactId>
+            <version>${qpid.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <!-- keep storm out of the jar-with-dependencies -->
+            <type>jar</type>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-framework</artifactId>
+            <version>${curator.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.11</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies> 
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e5154928/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/DefaultEventDataFormat.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/DefaultEventDataFormat.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/DefaultEventDataFormat.java
new file mode 100644
index 0000000..1bd8288
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/DefaultEventDataFormat.java
@@ -0,0 +1,47 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.bolt;
+
+import backtype.storm.tuple.Tuple;
+
+/**
+ * A default implementation of IEventDataFormat that converts the tuple
+ * into a delimited string.
+ */
+public class DefaultEventDataFormat implements IEventDataFormat {
+  private static final long serialVersionUID = 1L;
+  private String delimiter = ",";
+  
+  public DefaultEventDataFormat withFieldDelimiter(String delimiter) {
+    this.delimiter = delimiter;
+    return this;
+  }
+
+  @Override
+  public byte[] serialize(Tuple tuple) {
+    StringBuilder sb = new StringBuilder();
+    for(Object obj : tuple.getValues()) {
+      if(sb.length() != 0) {
+        sb.append(delimiter);
+      }
+      sb.append(obj.toString());
+    }
+    return sb.toString().getBytes();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e5154928/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
index 8016be3..09f90b1 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
@@ -1,81 +1,101 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.bolt;
-
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.storm.eventhubs.client.EventHubClient;
-import org.apache.storm.eventhubs.client.EventHubException;
-import org.apache.storm.eventhubs.client.EventHubSender;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.tuple.Tuple;
-
-/**
- * A bolt that writes message to EventHub.
- * We assume the incoming tuple has only one field which is a string.
- */
-public class EventHubBolt extends BaseBasicBolt {
-  private static final long serialVersionUID = 1L;
-  private static final Logger logger = LoggerFactory
-      .getLogger(EventHubBolt.class);
-  
-  private EventHubSender sender;
-  private String connectionString;
-  private String entityPath;
-  
-  public EventHubBolt(String connectionString, String entityPath) {
-    this.connectionString = connectionString;
-    this.entityPath = entityPath;
-  }
-  
-  @Override
-  public void prepare(Map config, TopologyContext context) {
-    try {
-      EventHubClient eventHubClient = EventHubClient.create(connectionString, entityPath);
-      sender = eventHubClient.createPartitionSender(null);
-    }
-    catch(Exception ex) {
-      logger.error(ex.getMessage());
-      throw new RuntimeException(ex);
-    }
-
-  }
-
-  @Override
-  public void execute(Tuple tuple, BasicOutputCollector collector) {
-    try {
-      sender.send((String)tuple.getValue(0));
-    }
-    catch(EventHubException ex) {
-      logger.error(ex.getMessage());
-    }
-  }
-
-  @Override
-  public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    
-  }
-
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.bolt;
+
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.eventhubs.client.EventHubClient;
+import org.apache.storm.eventhubs.client.EventHubException;
+import org.apache.storm.eventhubs.client.EventHubSender;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
+
+/**
+ * A bolt that writes event message to EventHub.
+ */
+public class EventHubBolt extends BaseRichBolt {
+  private static final long serialVersionUID = 1L;
+  private static final Logger logger = LoggerFactory
+      .getLogger(EventHubBolt.class);
+  
+  protected OutputCollector collector;
+  protected EventHubSender sender;
+  protected EventHubBoltConfig boltConfig;
+  
+  
+  public EventHubBolt(String connectionString, String entityPath) {
+    boltConfig = new EventHubBoltConfig(connectionString, entityPath);
+  }
+
+  public EventHubBolt(String userName, String password, String namespace,
+      String entityPath, boolean partitionMode) {
+    boltConfig = new EventHubBoltConfig(userName, password, namespace,
+        entityPath, partitionMode);
+  }
+  
+  public EventHubBolt(EventHubBoltConfig config) {
+    boltConfig = config;
+  }
+
+  @Override
+  public void prepare(Map config, TopologyContext context, OutputCollector collector) {
+    this.collector = collector;
+    String myPartitionId = null;
+    if(boltConfig.getPartitionMode()) {
+      //We can use the task index (starting from 0) as the partition ID
+      myPartitionId = "" + context.getThisTaskIndex();
+    }
+    logger.info("creating sender: " + boltConfig.getConnectionString()
+        + ", " + boltConfig.getEntityPath() + ", " + myPartitionId);
+    try {
+      EventHubClient eventHubClient = EventHubClient.create(
+          boltConfig.getConnectionString(), boltConfig.getEntityPath());
+      sender = eventHubClient.createPartitionSender(myPartitionId);
+    }
+    catch(Exception ex) {
+      logger.error(ex.getMessage());
+      throw new RuntimeException(ex);
+    }
+
+  }
+
+  @Override
+  public void execute(Tuple tuple) {
+    try {
+      sender.send(boltConfig.getEventDataFormat().serialize(tuple));
+      collector.ack(tuple);
+    }
+    catch(EventHubException ex) {
+      logger.error(ex.getMessage());
+      collector.fail(tuple);
+    }
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e5154928/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java
new file mode 100644
index 0000000..909e8ac
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java
@@ -0,0 +1,107 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.bolt;
+
+import java.io.Serializable;
+
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+
+/*
+ * EventHubs bolt configurations
+ *
+ * Partition mode:
+ * With partitionMode=true you need to create the same number of tasks as the number of 
+ * EventHubs partitions, and each bolt task will only send data to one partition.
+ * The partition ID is the task ID of the bolt.
+ * 
+ * Event format:
+ * The formatter to convert tuple to bytes for EventHubs.
+ * if null, the default format is common delimited tuple fields.
+ */
+public class EventHubBoltConfig implements Serializable {
+  private static final long serialVersionUID = 1L;
+  
+  private String connectionString;
+  private final String entityPath;
+  protected boolean partitionMode;
+  protected IEventDataFormat dataFormat;
+  
+  public EventHubBoltConfig(String connectionString, String entityPath) {
+    this(connectionString, entityPath, false, null);
+  }
+  
+  public EventHubBoltConfig(String connectionString, String entityPath,
+      boolean partitionMode) {
+    this(connectionString, entityPath, partitionMode, null);
+  }
+  
+  public EventHubBoltConfig(String userName, String password, String namespace,
+      String entityPath, boolean partitionMode) {
+    this(userName, password, namespace,
+        EventHubSpoutConfig.EH_SERVICE_FQDN_SUFFIX, entityPath, partitionMode);
+  }
+  
+  public EventHubBoltConfig(String connectionString, String entityPath,
+      boolean partitionMode, IEventDataFormat dataFormat) {
+    this.connectionString = connectionString;
+    this.entityPath = entityPath;
+    this.partitionMode = partitionMode;
+    this.dataFormat = dataFormat;
+    if(this.dataFormat == null) {
+      this.dataFormat = new DefaultEventDataFormat();
+    }
+  }
+  
+  public EventHubBoltConfig(String userName, String password, String namespace,
+      String targetFqnAddress, String entityPath) {
+    this(userName, password, namespace, targetFqnAddress, entityPath, false, null);
+  }
+  
+  public EventHubBoltConfig(String userName, String password, String namespace,
+      String targetFqnAddress, String entityPath, boolean partitionMode) {
+    this(userName, password, namespace, targetFqnAddress, entityPath, partitionMode, null);
+  }
+  
+  public EventHubBoltConfig(String userName, String password, String namespace,
+      String targetFqnAddress, String entityPath, boolean partitionMode,
+      IEventDataFormat dataFormat) {
+    this.connectionString = EventHubSpoutConfig.buildConnectionString(userName, password, namespace, targetFqnAddress);
+    this.entityPath = entityPath;
+    this.partitionMode = partitionMode;
+    this.dataFormat = dataFormat;
+    if(this.dataFormat == null) {
+      this.dataFormat = new DefaultEventDataFormat();
+    }
+  }
+  
+  public String getConnectionString() {
+    return connectionString;
+  }
+  
+  public String getEntityPath() {
+    return entityPath;
+  }
+  
+  public boolean getPartitionMode() {
+    return partitionMode;
+  }
+  
+  public IEventDataFormat getEventDataFormat() {
+    return dataFormat;
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e5154928/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/IEventDataFormat.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/IEventDataFormat.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/IEventDataFormat.java
new file mode 100644
index 0000000..cb05c0f
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/IEventDataFormat.java
@@ -0,0 +1,28 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.bolt;
+
+import java.io.Serializable;
+import backtype.storm.tuple.Tuple;
+
+/**
+ * Serialize a tuple to a byte array to be sent to EventHubs
+ */
+public interface IEventDataFormat extends Serializable {
+  public byte[] serialize(Tuple tuple);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e5154928/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubClient.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubClient.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubClient.java
index e06091d..2afe5b4 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubClient.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubClient.java
@@ -1,92 +1,95 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.client;
-
-import org.apache.qpid.amqp_1_0.client.Connection;
-import org.apache.qpid.amqp_1_0.client.ConnectionErrorException;
-import org.apache.qpid.amqp_1_0.client.ConnectionException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class EventHubClient {
-
-  private static final String DefaultConsumerGroupName = "$default";
-  private static final Logger logger = LoggerFactory.getLogger(EventHubClient.class);
-  private static final long ConnectionSyncTimeout = 60000L;
-
-  private final String connectionString;
-  private final String entityPath;
-  private final Connection connection;
-
-  private EventHubClient(String connectionString, String entityPath) throws EventHubException {
-    this.connectionString = connectionString;
-    this.entityPath = entityPath;
-    this.connection = this.createConnection();
-  }
-
-  /**
-   * creates a new instance of EventHubClient using the supplied connection string and entity path.
-   *
-   * @param connectionString connection string to the namespace of event hubs. connection string format:
-   * amqps://{userId}:{password}@{namespaceName}.servicebus.windows.net
-   * @param entityPath the name of event hub entity.
-   *
-   * @return EventHubClient
-   * @throws org.apache.storm.eventhubs.client.EventHubException
-   */
-  public static EventHubClient create(String connectionString, String entityPath) throws EventHubException {
-    return new EventHubClient(connectionString, entityPath);
-  }
-
-  public EventHubSender createPartitionSender(String partitionId) throws Exception {
-    return new EventHubSender(this.connection.createSession(), this.entityPath, partitionId);
-  }
-
-  public EventHubConsumerGroup getDefaultConsumerGroup() {
-    return new EventHubConsumerGroup(this.connection, this.entityPath, DefaultConsumerGroupName);
-  }
-
-  public void close() {
-    try {
-      this.connection.close();
-    } catch (ConnectionErrorException e) {
-      logger.error(e.toString());
-    }
-  }
-
-  private Connection createConnection() throws EventHubException {
-    ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder(this.connectionString);
-    Connection clientConnection;
-
-    try {
-      clientConnection = new Connection(
-        connectionStringBuilder.getHost(),
-        connectionStringBuilder.getPort(),
-        connectionStringBuilder.getUserName(),
-        connectionStringBuilder.getPassword(),
-        connectionStringBuilder.getHost(),
-        connectionStringBuilder.getSsl());
-    } catch (ConnectionException e) {
-      logger.error(e.toString());
-      throw new EventHubException(e);
-    }
-    clientConnection.getEndpoint().setSyncTimeout(ConnectionSyncTimeout);
-    SelectorFilterWriter.register(clientConnection.getEndpoint().getDescribedTypeRegistry());
-    return clientConnection;
-  }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.client;
+
+import org.apache.qpid.amqp_1_0.client.Connection;
+import org.apache.qpid.amqp_1_0.client.ConnectionErrorException;
+import org.apache.qpid.amqp_1_0.client.ConnectionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EventHubClient {
+
+  private static final String DefaultConsumerGroupName = "$default";
+  private static final Logger logger = LoggerFactory.getLogger(EventHubClient.class);
+  private static final long ConnectionSyncTimeout = 60000L;
+
+  private final String connectionString;
+  private final String entityPath;
+  private final Connection connection;
+
+  private EventHubClient(String connectionString, String entityPath) throws EventHubException {
+    this.connectionString = connectionString;
+    this.entityPath = entityPath;
+    this.connection = this.createConnection();
+  }
+
+  /**
+   * creates a new instance of EventHubClient using the supplied connection string and entity path.
+   *
+   * @param connectionString connection string to the namespace of event hubs. connection string format:
+   * amqps://{userId}:{password}@{namespaceName}.servicebus.windows.net
+   * @param entityPath the name of event hub entity.
+   *
+   * @return EventHubClient
+   * @throws org.apache.storm.eventhubs.client.EventHubException
+   */
+  public static EventHubClient create(String connectionString, String entityPath) throws EventHubException {
+    return new EventHubClient(connectionString, entityPath);
+  }
+
+  public EventHubSender createPartitionSender(String partitionId) throws Exception {
+    return new EventHubSender(this.connection.createSession(), this.entityPath, partitionId);
+  }
+
+  public EventHubConsumerGroup getConsumerGroup(String cgName) {
+    if(cgName == null || cgName.length() == 0) {
+      cgName = DefaultConsumerGroupName;
+    }
+    return new EventHubConsumerGroup(connection, entityPath, cgName);
+  }
+
+  public void close() {
+    try {
+      this.connection.close();
+    } catch (ConnectionErrorException e) {
+      logger.error(e.toString());
+    }
+  }
+
+  private Connection createConnection() throws EventHubException {
+    ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder(this.connectionString);
+    Connection clientConnection;
+
+    try {
+      clientConnection = new Connection(
+        connectionStringBuilder.getHost(),
+        connectionStringBuilder.getPort(),
+        connectionStringBuilder.getUserName(),
+        connectionStringBuilder.getPassword(),
+        connectionStringBuilder.getHost(),
+        connectionStringBuilder.getSsl());
+    } catch (ConnectionException e) {
+      logger.error(e.toString());
+      throw new EventHubException(e);
+    }
+    clientConnection.getEndpoint().setSyncTimeout(ConnectionSyncTimeout);
+    SelectorFilterWriter.register(clientConnection.getEndpoint().getDescribedTypeRegistry());
+    return clientConnection;
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e5154928/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSender.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSender.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSender.java
index 41b1d97..7c45578 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSender.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSender.java
@@ -1,95 +1,99 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.client;
-
-import java.util.concurrent.TimeoutException;
-import org.apache.qpid.amqp_1_0.client.LinkDetachedException;
-import org.apache.qpid.amqp_1_0.client.Message;
-import org.apache.qpid.amqp_1_0.client.Sender;
-import org.apache.qpid.amqp_1_0.client.Session;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.messaging.Data;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class EventHubSender {
-
-  private static final Logger logger = LoggerFactory.getLogger(EventHubSender.class);
-
-  private final Session session;
-  private final String entityPath;
-  private final String partitionId;
-  private final String destinationAddress;
-
-  private Sender sender;
-
-  public EventHubSender(Session session, String entityPath, String partitionId) {
-    this.session = session;
-    this.entityPath = entityPath;
-    this.partitionId = partitionId;
-    this.destinationAddress = this.getDestinationAddress();
-  }
-
-  public void send(String data) throws EventHubException {
-    try {
-      if (this.sender == null) {
-        this.ensureSenderCreated();
-      }
-
-      //For interop with other language, convert string to bytes
-      Binary bin = new Binary(data.getBytes());
-      Message message = new Message(new Data(bin));
-      this.sender.send(message);
-
-    } catch (LinkDetachedException e) {
-      logger.error(e.getMessage());
-
-      EventHubException eventHubException = new EventHubException("Sender has been closed");
-      throw eventHubException;
-    } catch (TimeoutException e) {
-      logger.error(e.getMessage());
-
-      EventHubException eventHubException = new EventHubException("Timed out while waiting to get credit to send");
-      throw eventHubException;
-    } catch (Exception e) {
-      logger.error(e.getMessage());
-    }
-  }
-
-  public void close() {
-    try {
-      this.sender.close();
-    } catch (Sender.SenderClosingException e) {
-      logger.error("Closing a sender encountered error: " + e.getMessage());
-    }
-  }
-
-  private String getDestinationAddress() {
-    if (this.partitionId == null || this.partitionId.equals("")) {
-      return this.entityPath;
-    } else {
-      return String.format(Constants.DestinationAddressFormatString, this.entityPath, this.partitionId);
-    }
-  }
-
-  private synchronized void ensureSenderCreated() throws Exception {
-    if (this.sender == null) {
-      this.sender = this.session.createSender(this.destinationAddress);
-    }
-  }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.client;
+
+import java.util.concurrent.TimeoutException;
+import org.apache.qpid.amqp_1_0.client.LinkDetachedException;
+import org.apache.qpid.amqp_1_0.client.Message;
+import org.apache.qpid.amqp_1_0.client.Sender;
+import org.apache.qpid.amqp_1_0.client.Session;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.messaging.Data;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EventHubSender {
+
+  private static final Logger logger = LoggerFactory.getLogger(EventHubSender.class);
+
+  private final Session session;
+  private final String entityPath;
+  private final String partitionId;
+  private final String destinationAddress;
+
+  private Sender sender;
+
+  public EventHubSender(Session session, String entityPath, String partitionId) {
+    this.session = session;
+    this.entityPath = entityPath;
+    this.partitionId = partitionId;
+    this.destinationAddress = this.getDestinationAddress();
+  }
+  
+  public void send(byte[] data) throws EventHubException {
+    try {
+      if (this.sender == null) {
+        this.ensureSenderCreated();
+      }
+
+      Binary bin = new Binary(data);
+      Message message = new Message(new Data(bin));
+      this.sender.send(message);
+
+    } catch (LinkDetachedException e) {
+      logger.error(e.getMessage());
+
+      EventHubException eventHubException = new EventHubException("Sender has been closed");
+      throw eventHubException;
+    } catch (TimeoutException e) {
+      logger.error(e.getMessage());
+
+      EventHubException eventHubException = new EventHubException("Timed out while waiting to get credit to send");
+      throw eventHubException;
+    } catch (Exception e) {
+      logger.error(e.getMessage());
+    }
+  }
+
+  public void send(String data) throws EventHubException {
+    //For interop with other language, convert string to bytes
+    send(data.getBytes());
+  }
+
+  public void close() {
+    try {
+      this.sender.close();
+    } catch (Sender.SenderClosingException e) {
+      logger.error("Closing a sender encountered error: " + e.getMessage());
+    }
+  }
+
+  private String getDestinationAddress() {
+    if (this.partitionId == null || this.partitionId.equals("")) {
+      return this.entityPath;
+    } else {
+      return String.format(Constants.DestinationAddressFormatString, this.entityPath, this.partitionId);
+    }
+  }
+
+  private synchronized void ensureSenderCreated() throws Exception {
+    if (this.sender == null) {
+      this.sender = this.session.createSender(this.destinationAddress);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e5154928/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventCount.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventCount.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventCount.java
index dd53e42..94fdb49 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventCount.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventCount.java
@@ -77,6 +77,7 @@ public class EventCount {
     if(enqueueTimeDiff != 0) {
       enqueueTimeFilter = System.currentTimeMillis() - enqueueTimeDiff*1000;
     }
+    String consumerGroupName = properties.getProperty("eventhubspout.consumer.group.name");
     
     System.out.println("Eventhub spout config: ");
     System.out.println("  partition count: " + partitionCount);
@@ -84,12 +85,14 @@ public class EventCount {
     System.out.println("  receiver credits: " + receiverCredits);
     spoutConfig = new EventHubSpoutConfig(username, password,
       namespaceName, entityPath, partitionCount, zkEndpointAddress,
-      checkpointIntervalInSeconds, receiverCredits, maxPendingMsgsPerPartition, enqueueTimeFilter);
+      checkpointIntervalInSeconds, receiverCredits, maxPendingMsgsPerPartition,
+      enqueueTimeFilter);
 
     if(targetFqnAddress != null)
     {
       spoutConfig.setTargetAddress(targetFqnAddress);      
     }
+    spoutConfig.setConsumerGroupName(consumerGroupName);
 
     //set the number of workers to be the same as partition number.
     //the idea is to have a spout and a partial count bolt co-exist in one

http://git-wip-us.apache.org/repos/asf/storm/blob/e5154928/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventHubLoop.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
index cae0573..c908f9d 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
@@ -1,51 +1,52 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.samples;
-
-import backtype.storm.generated.StormTopology;
-import backtype.storm.topology.TopologyBuilder;
-
-import org.apache.storm.eventhubs.bolt.EventHubBolt;
-import org.apache.storm.eventhubs.spout.EventHubSpout;
-
-/**
- * A sample topology that loops message back to EventHub
- */
-public class EventHubLoop extends EventCount {
-
-  @Override
-  protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
-    TopologyBuilder topologyBuilder = new TopologyBuilder();
-
-    topologyBuilder.setSpout("EventHubsSpout", eventHubSpout, spoutConfig.getPartitionCount())
-      .setNumTasks(spoutConfig.getPartitionCount());
-    
-    EventHubBolt eventHubBolt = new EventHubBolt(spoutConfig.getConnectionString(),
-        spoutConfig.getEntityPath());
-    //For every spout, let's create multiple bolts because send is much slower
-    int boltTasks = spoutConfig.getPartitionCount() * 50;
-    topologyBuilder.setBolt("EventHubsBolt", eventHubBolt, boltTasks)
-      .localOrShuffleGrouping("EventHubsSpout").setNumTasks(boltTasks);
-    return topologyBuilder.createTopology();
-  }
-  
-  public static void main(String[] args) throws Exception {
-    EventHubLoop scenario = new EventHubLoop();
-    scenario.runScenario(args);
-  }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.TopologyBuilder;
+
+import org.apache.storm.eventhubs.bolt.EventHubBolt;
+import org.apache.storm.eventhubs.bolt.EventHubBoltConfig;
+import org.apache.storm.eventhubs.spout.EventHubSpout;
+
+/**
+ * A sample topology that loops message back to EventHub
+ */
+public class EventHubLoop extends EventCount {
+
+  @Override
+  protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
+    TopologyBuilder topologyBuilder = new TopologyBuilder();
+
+    topologyBuilder.setSpout("EventHubsSpout", eventHubSpout, spoutConfig.getPartitionCount())
+      .setNumTasks(spoutConfig.getPartitionCount());
+    EventHubBoltConfig boltConfig = new EventHubBoltConfig(spoutConfig.getConnectionString(),
+        spoutConfig.getEntityPath(), true);
+    
+    EventHubBolt eventHubBolt = new EventHubBolt(boltConfig);
+    int boltTasks = spoutConfig.getPartitionCount();
+    topologyBuilder.setBolt("EventHubsBolt", eventHubBolt, boltTasks)
+      .localOrShuffleGrouping("EventHubsSpout").setNumTasks(boltTasks);
+    return topologyBuilder.createTopology();
+  }
+  
+  public static void main(String[] args) throws Exception {
+    EventHubLoop scenario = new EventHubLoop();
+    scenario.runScenario(args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e5154928/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
index 5600873..68302af 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
@@ -46,6 +46,7 @@ public class EventHubReceiverImpl implements IEventHubReceiver {
   private final String entityName;
   private final String partitionId;
   private final int defaultCredits;
+  private final String consumerGroupName;
 
   private EventHubReceiver receiver;
   private String lastOffset = null;
@@ -58,6 +59,7 @@ public class EventHubReceiverImpl implements IEventHubReceiver {
     this.entityName = config.getEntityPath();
     this.defaultCredits = config.getReceiverCredits();
     this.partitionId = partitionId;
+    this.consumerGroupName = config.getConsumerGroupName();
     receiveApiLatencyMean = new ReducedMetric(new MeanReducer());
     receiveApiCallCount = new CountMetric();
     receiveMessageCount = new CountMetric();
@@ -70,14 +72,20 @@ public class EventHubReceiverImpl implements IEventHubReceiver {
     long start = System.currentTimeMillis();
     EventHubClient eventHubClient = EventHubClient.create(connectionString, entityName);
     if(filter.getOffset() != null) {
-      receiver = eventHubClient.getDefaultConsumerGroup().createReceiver(partitionId, filter.getOffset(), defaultCredits);
+      receiver = eventHubClient
+          .getConsumerGroup(consumerGroupName)
+          .createReceiver(partitionId, filter.getOffset(), defaultCredits);
     }
     else if(filter.getEnqueueTime() != 0) {
-      receiver = eventHubClient.getDefaultConsumerGroup().createReceiver(partitionId, filter.getEnqueueTime(), defaultCredits);
+      receiver = eventHubClient
+          .getConsumerGroup(consumerGroupName)
+          .createReceiver(partitionId, filter.getEnqueueTime(), defaultCredits);
     }
     else {
       logger.error("Invalid IEventHubReceiverFilter, use default offset as filter");
-      receiver = eventHubClient.getDefaultConsumerGroup().createReceiver(partitionId, Constants.DefaultStartingOffset, defaultCredits);
+      receiver = eventHubClient
+          .getConsumerGroup(consumerGroupName)
+          .createReceiver(partitionId, Constants.DefaultStartingOffset, defaultCredits);
     }
     long end = System.currentTimeMillis();
     logger.info("created eventhub receiver, time taken(ms): " + (end-start));
@@ -107,6 +115,12 @@ public class EventHubReceiverImpl implements IEventHubReceiver {
     receiveApiCallCount.incr();
 
     if (message == null) {
+      //Temporary workaround for AMQP/EH bug of failing to receive messages
+      if(timeoutInMilliseconds > 100 && millis < timeoutInMilliseconds/2) {
+        throw new RuntimeException(
+            "Restart EventHubSpout due to failure of receiving messages in "
+            + millis + " millisecond");
+      }
       return null;
     }
     receiveMessageCount.incr();

http://git-wip-us.apache.org/repos/asf/storm/blob/e5154928/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java
index 9290e6e..d08ec3a 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java
@@ -50,6 +50,11 @@ public class EventHubSpout extends BaseRichSpout {
   private long lastCheckpointTime;
   private int currentPartitionIndex = -1;
 
+  public EventHubSpout(String username, String password, String namespace,
+      String entityPath, int partitionCount) {
+    this(new EventHubSpoutConfig(username, password, namespace, entityPath, partitionCount));
+  }
+
   public EventHubSpout(EventHubSpoutConfig spoutConfig) {
     this(spoutConfig, null, null, null);
   }

http://git-wip-us.apache.org/repos/asf/storm/blob/e5154928/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
index ae11680..0238e40 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
@@ -24,28 +24,41 @@ import java.util.ArrayList;
 import java.util.List;
 
 public class EventHubSpoutConfig implements Serializable {
+  private static final long serialVersionUID = 1L; 
 
-  private static final long serialVersionUID = 1L;
+  public static final String EH_SERVICE_FQDN_SUFFIX = "servicebus.windows.net";
   private final String userName;
   private final String password;
   private final String namespace;
   private final String entityPath;
-  private final String zkConnectionString;
   private final int partitionCount;
-  private final int checkpointIntervalInSeconds;
-  private final int receiverCredits;
-  private final int maxPendingMsgsPerPartition;
-  private final long enqueueTimeFilter; //timestamp in millisecond
 
+  private String zkConnectionString = null; //if null then use zookeeper used by Storm
+  private int checkpointIntervalInSeconds = 10;
+  private int receiverCredits = 1024;
+  private int maxPendingMsgsPerPartition = 1024;
+  private long enqueueTimeFilter = 0; //timestamp in millisecond, 0 means disabling filter
   private String connectionString;
-  private String targetFqnAddress;
   private String topologyName;
-  private IEventDataScheme scheme;
+  private IEventDataScheme scheme = new EventDataScheme();
+  private String consumerGroupName = null; //if null then use default consumer group
 
+  //These are mandatory parameters
+  public EventHubSpoutConfig(String username, String password, String namespace,
+      String entityPath, int partitionCount) {
+    this.userName = username;
+    this.password = password;
+    this.connectionString = buildConnectionString(username, password, namespace);
+    this.namespace = namespace;
+    this.entityPath = entityPath;
+    this.partitionCount = partitionCount;
+  }
+
+  //Keep this constructor for backward compatibility
   public EventHubSpoutConfig(String username, String password, String namespace,
       String entityPath, int partitionCount, String zkConnectionString) {
-    this(username, password, namespace, entityPath, partitionCount,
-        zkConnectionString, 10, 1024, 1024, 0);
+    this(username, password, namespace, entityPath, partitionCount);
+    setZkConnectionString(zkConnectionString);
   }
   
   //Keep this constructor for backward compatibility
@@ -53,28 +66,20 @@ public class EventHubSpoutConfig implements Serializable {
       String entityPath, int partitionCount, String zkConnectionString,
       int checkpointIntervalInSeconds, int receiverCredits) {
     this(username, password, namespace, entityPath, partitionCount,
-        zkConnectionString, checkpointIntervalInSeconds, receiverCredits, 1024, 0);
+        zkConnectionString);
+    setCheckpointIntervalInSeconds(checkpointIntervalInSeconds);
+    setReceiverCredits(receiverCredits);
   }
-      
+
+  //Keep this constructor for backward compatibility
   public EventHubSpoutConfig(String username, String password, String namespace,
     String entityPath, int partitionCount, String zkConnectionString,
     int checkpointIntervalInSeconds, int receiverCredits, int maxPendingMsgsPerPartition, long enqueueTimeFilter) {
-    this.userName = username;
-    this.password = password;
-    this.connectionString = buildConnectionString(username, password, namespace);
-    this.namespace = namespace;
-    this.entityPath = entityPath;
-    this.partitionCount = partitionCount;
-    this.zkConnectionString = zkConnectionString;
-    this.checkpointIntervalInSeconds = checkpointIntervalInSeconds;
-    this.receiverCredits = receiverCredits;
-    this.maxPendingMsgsPerPartition = maxPendingMsgsPerPartition;
-    this.enqueueTimeFilter = enqueueTimeFilter;
-    this.scheme = new EventDataScheme();
-  }
-
-  public String getConnectionString() {
-    return connectionString;
+    
+    this(username, password, namespace, entityPath, partitionCount,
+        zkConnectionString, checkpointIntervalInSeconds, receiverCredits);
+    setMaxPendingMsgsPerPartition(maxPendingMsgsPerPartition);
+    setEnqueueTimeFilter(enqueueTimeFilter);
   }
 
   public String getNamespace() {
@@ -85,30 +90,50 @@ public class EventHubSpoutConfig implements Serializable {
     return entityPath;
   }
 
+  public int getPartitionCount() {
+    return partitionCount;
+  }
+
   public String getZkConnectionString() {
     return zkConnectionString;
   }
 
+  public void setZkConnectionString(String value) {
+    zkConnectionString = value;
+  }
+
   public int getCheckpointIntervalInSeconds() {
     return checkpointIntervalInSeconds;
   }
 
-  public int getPartitionCount() {
-    return partitionCount;
+  public void setCheckpointIntervalInSeconds(int value) {
+    checkpointIntervalInSeconds = value;
   }
   
   public int getReceiverCredits() {
     return receiverCredits;
   }
+
+  public void setReceiverCredits(int value) {
+    receiverCredits = value;
+  }
   
   public int getMaxPendingMsgsPerPartition() {
     return maxPendingMsgsPerPartition;
   }
+
+  public void setMaxPendingMsgsPerPartition(int value) {
+    maxPendingMsgsPerPartition = value;
+  }
   
   public long getEnqueueTimeFilter() {
     return enqueueTimeFilter;
   }
 
+  public void setEnqueueTimeFilter(long value) {
+    enqueueTimeFilter = value;
+  }
+
   public String getTopologyName() {
     return topologyName;
   }
@@ -125,6 +150,14 @@ public class EventHubSpoutConfig implements Serializable {
     this.scheme = scheme;
   }
 
+  public String getConsumerGroupName() {
+    return consumerGroupName;
+  }
+
+  public void setConsumerGroupName(String value) {
+    consumerGroupName = value;
+  }
+
   public List<String> getPartitionList() {
     List<String> partitionList = new ArrayList<String>();
 
@@ -134,16 +167,18 @@ public class EventHubSpoutConfig implements Serializable {
 
     return partitionList;
   }
-  
+
+  public String getConnectionString() {
+    return connectionString;
+  }
+
   public void setTargetAddress(String targetFqnAddress) {
-    this.targetFqnAddress = targetFqnAddress;
     this.connectionString = buildConnectionString(
-        this.userName, this.password, this.namespace, this.targetFqnAddress);
+        userName, password, namespace, targetFqnAddress);
   }
 
   public static String buildConnectionString(String username, String password, String namespace) {
-    String targetFqnAddress = "servicebus.windows.net";
-    return buildConnectionString(username, password, namespace, targetFqnAddress);
+    return buildConnectionString(username, password, namespace, EH_SERVICE_FQDN_SUFFIX);
   }
 
   public static String buildConnectionString(String username, String password,

http://git-wip-us.apache.org/repos/asf/storm/blob/e5154928/external/storm-eventhubs/src/main/resources/config.properties
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/resources/config.properties b/external/storm-eventhubs/src/main/resources/config.properties
index 82abb48..a8a520e 100755
--- a/external/storm-eventhubs/src/main/resources/config.properties
+++ b/external/storm-eventhubs/src/main/resources/config.properties
@@ -24,4 +24,7 @@ eventhubspout.max.pending.messages.per.partition = 1024
 # the EventHubs entity when we first create the Storm topology. If offsets
 # have been saved in Zookeeper, we'll ignore this configuration.
 # A value of 0 means disable time based filtering when creating the receiver.
-eventhub.receiver.filter.timediff = 0
\ No newline at end of file
+eventhub.receiver.filter.timediff = 0
+
+# Uncomment to specify consumer group name here, or use the default
+#eventhubspout.consumer.group.name = yourconsumergroupname
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e5154928/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java
index 6a0d163..49e544b 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java
@@ -37,7 +37,9 @@ public class TestEventHubSpout {
   @Test
   public void testSpoutConfig() {
     EventHubSpoutConfig conf = new EventHubSpoutConfig("username", "pas\\s+w/ord",
-        "namespace", "entityname", 16, "zookeeper");
+        "namespace", "entityname", 16);
+    conf.setZkConnectionString("zookeeper");
+    conf.setCheckpointIntervalInSeconds(1);
     assertEquals(conf.getConnectionString(), "amqps://username:pas%5Cs%2Bw%2Ford@namespace.servicebus.windows.net");
   }