You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/06/03 19:35:56 UTC

[42/50] [abbrv] storm git commit: Fix line endings so that the diff is meaningful.

Fix line endings so that the diff is meaningful.

Signed-off-by: Shanyu Zhao <sh...@microsoft.com>


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1f13f15d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1f13f15d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1f13f15d

Branch: refs/heads/master
Commit: 1f13f15d0c0de233fd4cc4ff6d6de586c4736142
Parents: e515492
Author: Shanyu Zhao <sh...@microsoft.com>
Authored: Wed May 13 14:50:02 2015 -0700
Committer: Shanyu Zhao <sh...@microsoft.com>
Committed: Wed May 13 14:50:02 2015 -0700

----------------------------------------------------------------------
 external/storm-eventhubs/pom.xml                | 236 +++++++++----------
 .../eventhubs/bolt/DefaultEventDataFormat.java  |  94 ++++----
 .../storm/eventhubs/bolt/EventHubBolt.java      | 202 ++++++++--------
 .../eventhubs/bolt/EventHubBoltConfig.java      | 214 ++++++++---------
 .../storm/eventhubs/bolt/IEventDataFormat.java  |  56 ++---
 .../storm/eventhubs/client/EventHubClient.java  | 190 +++++++--------
 .../storm/eventhubs/client/EventHubSender.java  | 198 ++++++++--------
 .../storm/eventhubs/samples/EventHubLoop.java   | 104 ++++----
 8 files changed, 647 insertions(+), 647 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/1f13f15d/external/storm-eventhubs/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/pom.xml b/external/storm-eventhubs/pom.xml
index 2ceed09..2dfb739 100755
--- a/external/storm-eventhubs/pom.xml
+++ b/external/storm-eventhubs/pom.xml
@@ -1,119 +1,119 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-    
-    <parent>
-        <artifactId>storm</artifactId>
-        <groupId>org.apache.storm</groupId>
-        <version>0.11.0-SNAPSHOT</version>
-        <relativePath>../../pom.xml</relativePath>
-    </parent>
-    
-    <artifactId>storm-eventhubs</artifactId>
-    <version>0.11.0-SNAPSHOT</version>
-    <packaging>jar</packaging>
-    <name>storm-eventhubs</name>
-    <description>EventHubs Storm Spout</description>
-
-    <properties>
-        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <qpid.version>0.32</qpid.version>
-    </properties>
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-shade-plugin</artifactId>
-                <version>2.3</version>
-                <executions>
-                    <execution>
-                        <goals>
-                            <goal>shade</goal>
-                        </goals>
-                        <phase>package</phase>
-                    </execution>
-                </executions>
-                <configuration>
-                    <transformers>
-                        <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer">
-                        </transformer>
-                    </transformers>
-                    <outputFile>target/${project.artifactId}-${project.version}-jar-with-dependencies.jar</outputFile>
-                </configuration>
-	        </plugin>
-            <plugin>
-		        <artifactId>maven-antrun-plugin</artifactId>
-		        <executions>
-		          <execution>
-		            <phase>package</phase>
-		            <configuration>
-		              <tasks>
-		                <copy file="src/main/resources/config.properties" tofile="target/eventhubs-config.properties"/>
-                    </tasks>
-		            </configuration>
-		            <goals>
-		              <goal>run</goal>
-		            </goals>
-		          </execution>
-		        </executions>
-	        </plugin>
-        </plugins>
-    </build>
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.qpid</groupId>
-            <artifactId>qpid-client</artifactId>
-            <version>${qpid.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.qpid</groupId>
-            <artifactId>qpid-amqp-1-0-client-jms</artifactId>
-            <version>${qpid.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-core</artifactId>
-            <version>${project.version}</version>
-            <!-- keep storm out of the jar-with-dependencies -->
-            <type>jar</type>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.curator</groupId>
-            <artifactId>curator-framework</artifactId>
-            <version>${curator.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <version>4.11</version>
-            <scope>test</scope>
-        </dependency>
-    </dependencies> 
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>0.11.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+    
+    <artifactId>storm-eventhubs</artifactId>
+    <version>0.11.0-SNAPSHOT</version>
+    <packaging>jar</packaging>
+    <name>storm-eventhubs</name>
+    <description>EventHubs Storm Spout</description>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <qpid.version>0.32</qpid.version>
+    </properties>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>2.3</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <phase>package</phase>
+                    </execution>
+                </executions>
+                <configuration>
+                    <transformers>
+                        <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer">
+                        </transformer>
+                    </transformers>
+                    <outputFile>target/${project.artifactId}-${project.version}-jar-with-dependencies.jar</outputFile>
+                </configuration>
+	        </plugin>
+            <plugin>
+		        <artifactId>maven-antrun-plugin</artifactId>
+		        <executions>
+		          <execution>
+		            <phase>package</phase>
+		            <configuration>
+		              <tasks>
+		                <copy file="src/main/resources/config.properties" tofile="target/eventhubs-config.properties"/>
+                    </tasks>
+		            </configuration>
+		            <goals>
+		              <goal>run</goal>
+		            </goals>
+		          </execution>
+		        </executions>
+	        </plugin>
+        </plugins>
+    </build>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-client</artifactId>
+            <version>${qpid.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-amqp-1-0-client-jms</artifactId>
+            <version>${qpid.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <!-- keep storm out of the jar-with-dependencies -->
+            <type>jar</type>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-framework</artifactId>
+            <version>${curator.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.11</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies> 
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/1f13f15d/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/DefaultEventDataFormat.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/DefaultEventDataFormat.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/DefaultEventDataFormat.java
index 1bd8288..6b3eba7 100644
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/DefaultEventDataFormat.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/DefaultEventDataFormat.java
@@ -1,47 +1,47 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.bolt;
-
-import backtype.storm.tuple.Tuple;
-
-/**
- * A default implementation of IEventDataFormat that converts the tuple
- * into a delimited string.
- */
-public class DefaultEventDataFormat implements IEventDataFormat {
-  private static final long serialVersionUID = 1L;
-  private String delimiter = ",";
-  
-  public DefaultEventDataFormat withFieldDelimiter(String delimiter) {
-    this.delimiter = delimiter;
-    return this;
-  }
-
-  @Override
-  public byte[] serialize(Tuple tuple) {
-    StringBuilder sb = new StringBuilder();
-    for(Object obj : tuple.getValues()) {
-      if(sb.length() != 0) {
-        sb.append(delimiter);
-      }
-      sb.append(obj.toString());
-    }
-    return sb.toString().getBytes();
-  }
-
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.bolt;
+
+import backtype.storm.tuple.Tuple;
+
+/**
+ * A default implementation of IEventDataFormat that converts the tuple
+ * into a delimited string.
+ */
+public class DefaultEventDataFormat implements IEventDataFormat {
+  private static final long serialVersionUID = 1L;
+  private String delimiter = ",";
+  
+  public DefaultEventDataFormat withFieldDelimiter(String delimiter) {
+    this.delimiter = delimiter;
+    return this;
+  }
+
+  @Override
+  public byte[] serialize(Tuple tuple) {
+    StringBuilder sb = new StringBuilder();
+    for(Object obj : tuple.getValues()) {
+      if(sb.length() != 0) {
+        sb.append(delimiter);
+      }
+      sb.append(obj.toString());
+    }
+    return sb.toString().getBytes();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1f13f15d/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
index 09f90b1..a817744 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
@@ -1,101 +1,101 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.bolt;
-
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.storm.eventhubs.client.EventHubClient;
-import org.apache.storm.eventhubs.client.EventHubException;
-import org.apache.storm.eventhubs.client.EventHubSender;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Tuple;
-
-/**
- * A bolt that writes event message to EventHub.
- */
-public class EventHubBolt extends BaseRichBolt {
-  private static final long serialVersionUID = 1L;
-  private static final Logger logger = LoggerFactory
-      .getLogger(EventHubBolt.class);
-  
-  protected OutputCollector collector;
-  protected EventHubSender sender;
-  protected EventHubBoltConfig boltConfig;
-  
-  
-  public EventHubBolt(String connectionString, String entityPath) {
-    boltConfig = new EventHubBoltConfig(connectionString, entityPath);
-  }
-
-  public EventHubBolt(String userName, String password, String namespace,
-      String entityPath, boolean partitionMode) {
-    boltConfig = new EventHubBoltConfig(userName, password, namespace,
-        entityPath, partitionMode);
-  }
-  
-  public EventHubBolt(EventHubBoltConfig config) {
-    boltConfig = config;
-  }
-
-  @Override
-  public void prepare(Map config, TopologyContext context, OutputCollector collector) {
-    this.collector = collector;
-    String myPartitionId = null;
-    if(boltConfig.getPartitionMode()) {
-      //We can use the task index (starting from 0) as the partition ID
-      myPartitionId = "" + context.getThisTaskIndex();
-    }
-    logger.info("creating sender: " + boltConfig.getConnectionString()
-        + ", " + boltConfig.getEntityPath() + ", " + myPartitionId);
-    try {
-      EventHubClient eventHubClient = EventHubClient.create(
-          boltConfig.getConnectionString(), boltConfig.getEntityPath());
-      sender = eventHubClient.createPartitionSender(myPartitionId);
-    }
-    catch(Exception ex) {
-      logger.error(ex.getMessage());
-      throw new RuntimeException(ex);
-    }
-
-  }
-
-  @Override
-  public void execute(Tuple tuple) {
-    try {
-      sender.send(boltConfig.getEventDataFormat().serialize(tuple));
-      collector.ack(tuple);
-    }
-    catch(EventHubException ex) {
-      logger.error(ex.getMessage());
-      collector.fail(tuple);
-    }
-  }
-
-  @Override
-  public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    
-  }
-
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.bolt;
+
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.eventhubs.client.EventHubClient;
+import org.apache.storm.eventhubs.client.EventHubException;
+import org.apache.storm.eventhubs.client.EventHubSender;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
+
+/**
+ * A bolt that writes event message to EventHub.
+ */
+public class EventHubBolt extends BaseRichBolt {
+  private static final long serialVersionUID = 1L;
+  private static final Logger logger = LoggerFactory
+      .getLogger(EventHubBolt.class);
+  
+  protected OutputCollector collector;
+  protected EventHubSender sender;
+  protected EventHubBoltConfig boltConfig;
+  
+  
+  public EventHubBolt(String connectionString, String entityPath) {
+    boltConfig = new EventHubBoltConfig(connectionString, entityPath);
+  }
+
+  public EventHubBolt(String userName, String password, String namespace,
+      String entityPath, boolean partitionMode) {
+    boltConfig = new EventHubBoltConfig(userName, password, namespace,
+        entityPath, partitionMode);
+  }
+  
+  public EventHubBolt(EventHubBoltConfig config) {
+    boltConfig = config;
+  }
+
+  @Override
+  public void prepare(Map config, TopologyContext context, OutputCollector collector) {
+    this.collector = collector;
+    String myPartitionId = null;
+    if(boltConfig.getPartitionMode()) {
+      //We can use the task index (starting from 0) as the partition ID
+      myPartitionId = "" + context.getThisTaskIndex();
+    }
+    logger.info("creating sender: " + boltConfig.getConnectionString()
+        + ", " + boltConfig.getEntityPath() + ", " + myPartitionId);
+    try {
+      EventHubClient eventHubClient = EventHubClient.create(
+          boltConfig.getConnectionString(), boltConfig.getEntityPath());
+      sender = eventHubClient.createPartitionSender(myPartitionId);
+    }
+    catch(Exception ex) {
+      logger.error(ex.getMessage());
+      throw new RuntimeException(ex);
+    }
+
+  }
+
+  @Override
+  public void execute(Tuple tuple) {
+    try {
+      sender.send(boltConfig.getEventDataFormat().serialize(tuple));
+      collector.ack(tuple);
+    }
+    catch(EventHubException ex) {
+      logger.error(ex.getMessage());
+      collector.fail(tuple);
+    }
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1f13f15d/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java
index 909e8ac..4383a72 100644
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java
@@ -1,107 +1,107 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.bolt;
-
-import java.io.Serializable;
-
-import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
-
-/*
- * EventHubs bolt configurations
- *
- * Partition mode:
- * With partitionMode=true you need to create the same number of tasks as the number of 
- * EventHubs partitions, and each bolt task will only send data to one partition.
- * The partition ID is the task ID of the bolt.
- * 
- * Event format:
- * The formatter to convert tuple to bytes for EventHubs.
- * if null, the default format is common delimited tuple fields.
- */
-public class EventHubBoltConfig implements Serializable {
-  private static final long serialVersionUID = 1L;
-  
-  private String connectionString;
-  private final String entityPath;
-  protected boolean partitionMode;
-  protected IEventDataFormat dataFormat;
-  
-  public EventHubBoltConfig(String connectionString, String entityPath) {
-    this(connectionString, entityPath, false, null);
-  }
-  
-  public EventHubBoltConfig(String connectionString, String entityPath,
-      boolean partitionMode) {
-    this(connectionString, entityPath, partitionMode, null);
-  }
-  
-  public EventHubBoltConfig(String userName, String password, String namespace,
-      String entityPath, boolean partitionMode) {
-    this(userName, password, namespace,
-        EventHubSpoutConfig.EH_SERVICE_FQDN_SUFFIX, entityPath, partitionMode);
-  }
-  
-  public EventHubBoltConfig(String connectionString, String entityPath,
-      boolean partitionMode, IEventDataFormat dataFormat) {
-    this.connectionString = connectionString;
-    this.entityPath = entityPath;
-    this.partitionMode = partitionMode;
-    this.dataFormat = dataFormat;
-    if(this.dataFormat == null) {
-      this.dataFormat = new DefaultEventDataFormat();
-    }
-  }
-  
-  public EventHubBoltConfig(String userName, String password, String namespace,
-      String targetFqnAddress, String entityPath) {
-    this(userName, password, namespace, targetFqnAddress, entityPath, false, null);
-  }
-  
-  public EventHubBoltConfig(String userName, String password, String namespace,
-      String targetFqnAddress, String entityPath, boolean partitionMode) {
-    this(userName, password, namespace, targetFqnAddress, entityPath, partitionMode, null);
-  }
-  
-  public EventHubBoltConfig(String userName, String password, String namespace,
-      String targetFqnAddress, String entityPath, boolean partitionMode,
-      IEventDataFormat dataFormat) {
-    this.connectionString = EventHubSpoutConfig.buildConnectionString(userName, password, namespace, targetFqnAddress);
-    this.entityPath = entityPath;
-    this.partitionMode = partitionMode;
-    this.dataFormat = dataFormat;
-    if(this.dataFormat == null) {
-      this.dataFormat = new DefaultEventDataFormat();
-    }
-  }
-  
-  public String getConnectionString() {
-    return connectionString;
-  }
-  
-  public String getEntityPath() {
-    return entityPath;
-  }
-  
-  public boolean getPartitionMode() {
-    return partitionMode;
-  }
-  
-  public IEventDataFormat getEventDataFormat() {
-    return dataFormat;
-  }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.bolt;
+
+import java.io.Serializable;
+
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+
+/*
+ * EventHubs bolt configurations
+ *
+ * Partition mode:
+ * With partitionMode=true you need to create the same number of tasks as the number of 
+ * EventHubs partitions, and each bolt task will only send data to one partition.
+ * The partition ID is the task ID of the bolt.
+ * 
+ * Event format:
+ * The formatter to convert tuple to bytes for EventHubs.
+ * if null, the default format is common delimited tuple fields.
+ */
+public class EventHubBoltConfig implements Serializable {
+  private static final long serialVersionUID = 1L;
+  
+  private String connectionString;
+  private final String entityPath;
+  protected boolean partitionMode;
+  protected IEventDataFormat dataFormat;
+  
+  public EventHubBoltConfig(String connectionString, String entityPath) {
+    this(connectionString, entityPath, false, null);
+  }
+  
+  public EventHubBoltConfig(String connectionString, String entityPath,
+      boolean partitionMode) {
+    this(connectionString, entityPath, partitionMode, null);
+  }
+  
+  public EventHubBoltConfig(String userName, String password, String namespace,
+      String entityPath, boolean partitionMode) {
+    this(userName, password, namespace,
+        EventHubSpoutConfig.EH_SERVICE_FQDN_SUFFIX, entityPath, partitionMode);
+  }
+  
+  public EventHubBoltConfig(String connectionString, String entityPath,
+      boolean partitionMode, IEventDataFormat dataFormat) {
+    this.connectionString = connectionString;
+    this.entityPath = entityPath;
+    this.partitionMode = partitionMode;
+    this.dataFormat = dataFormat;
+    if(this.dataFormat == null) {
+      this.dataFormat = new DefaultEventDataFormat();
+    }
+  }
+  
+  public EventHubBoltConfig(String userName, String password, String namespace,
+      String targetFqnAddress, String entityPath) {
+    this(userName, password, namespace, targetFqnAddress, entityPath, false, null);
+  }
+  
+  public EventHubBoltConfig(String userName, String password, String namespace,
+      String targetFqnAddress, String entityPath, boolean partitionMode) {
+    this(userName, password, namespace, targetFqnAddress, entityPath, partitionMode, null);
+  }
+  
+  public EventHubBoltConfig(String userName, String password, String namespace,
+      String targetFqnAddress, String entityPath, boolean partitionMode,
+      IEventDataFormat dataFormat) {
+    this.connectionString = EventHubSpoutConfig.buildConnectionString(userName, password, namespace, targetFqnAddress);
+    this.entityPath = entityPath;
+    this.partitionMode = partitionMode;
+    this.dataFormat = dataFormat;
+    if(this.dataFormat == null) {
+      this.dataFormat = new DefaultEventDataFormat();
+    }
+  }
+  
+  public String getConnectionString() {
+    return connectionString;
+  }
+  
+  public String getEntityPath() {
+    return entityPath;
+  }
+  
+  public boolean getPartitionMode() {
+    return partitionMode;
+  }
+  
+  public IEventDataFormat getEventDataFormat() {
+    return dataFormat;
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1f13f15d/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/IEventDataFormat.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/IEventDataFormat.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/IEventDataFormat.java
index cb05c0f..2003c34 100644
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/IEventDataFormat.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/IEventDataFormat.java
@@ -1,28 +1,28 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.bolt;
-
-import java.io.Serializable;
-import backtype.storm.tuple.Tuple;
-
-/**
- * Serialize a tuple to a byte array to be sent to EventHubs
- */
-public interface IEventDataFormat extends Serializable {
-  public byte[] serialize(Tuple tuple);
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.bolt;
+
+import java.io.Serializable;
+import backtype.storm.tuple.Tuple;
+
+/**
+ * Serialize a tuple to a byte array to be sent to EventHubs
+ */
+public interface IEventDataFormat extends Serializable {
+  public byte[] serialize(Tuple tuple);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1f13f15d/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubClient.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubClient.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubClient.java
index 2afe5b4..564a26f 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubClient.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubClient.java
@@ -1,95 +1,95 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.client;
-
-import org.apache.qpid.amqp_1_0.client.Connection;
-import org.apache.qpid.amqp_1_0.client.ConnectionErrorException;
-import org.apache.qpid.amqp_1_0.client.ConnectionException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class EventHubClient {
-
-  private static final String DefaultConsumerGroupName = "$default";
-  private static final Logger logger = LoggerFactory.getLogger(EventHubClient.class);
-  private static final long ConnectionSyncTimeout = 60000L;
-
-  private final String connectionString;
-  private final String entityPath;
-  private final Connection connection;
-
-  private EventHubClient(String connectionString, String entityPath) throws EventHubException {
-    this.connectionString = connectionString;
-    this.entityPath = entityPath;
-    this.connection = this.createConnection();
-  }
-
-  /**
-   * creates a new instance of EventHubClient using the supplied connection string and entity path.
-   *
-   * @param connectionString connection string to the namespace of event hubs. connection string format:
-   * amqps://{userId}:{password}@{namespaceName}.servicebus.windows.net
-   * @param entityPath the name of event hub entity.
-   *
-   * @return EventHubClient
-   * @throws org.apache.storm.eventhubs.client.EventHubException
-   */
-  public static EventHubClient create(String connectionString, String entityPath) throws EventHubException {
-    return new EventHubClient(connectionString, entityPath);
-  }
-
-  public EventHubSender createPartitionSender(String partitionId) throws Exception {
-    return new EventHubSender(this.connection.createSession(), this.entityPath, partitionId);
-  }
-
-  public EventHubConsumerGroup getConsumerGroup(String cgName) {
-    if(cgName == null || cgName.length() == 0) {
-      cgName = DefaultConsumerGroupName;
-    }
-    return new EventHubConsumerGroup(connection, entityPath, cgName);
-  }
-
-  public void close() {
-    try {
-      this.connection.close();
-    } catch (ConnectionErrorException e) {
-      logger.error(e.toString());
-    }
-  }
-
-  private Connection createConnection() throws EventHubException {
-    ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder(this.connectionString);
-    Connection clientConnection;
-
-    try {
-      clientConnection = new Connection(
-        connectionStringBuilder.getHost(),
-        connectionStringBuilder.getPort(),
-        connectionStringBuilder.getUserName(),
-        connectionStringBuilder.getPassword(),
-        connectionStringBuilder.getHost(),
-        connectionStringBuilder.getSsl());
-    } catch (ConnectionException e) {
-      logger.error(e.toString());
-      throw new EventHubException(e);
-    }
-    clientConnection.getEndpoint().setSyncTimeout(ConnectionSyncTimeout);
-    SelectorFilterWriter.register(clientConnection.getEndpoint().getDescribedTypeRegistry());
-    return clientConnection;
-  }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.client;
+
+import org.apache.qpid.amqp_1_0.client.Connection;
+import org.apache.qpid.amqp_1_0.client.ConnectionErrorException;
+import org.apache.qpid.amqp_1_0.client.ConnectionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EventHubClient {
+
+  private static final String DefaultConsumerGroupName = "$default";
+  private static final Logger logger = LoggerFactory.getLogger(EventHubClient.class);
+  private static final long ConnectionSyncTimeout = 60000L;
+
+  private final String connectionString;
+  private final String entityPath;
+  private final Connection connection;
+
+  private EventHubClient(String connectionString, String entityPath) throws EventHubException {
+    this.connectionString = connectionString;
+    this.entityPath = entityPath;
+    this.connection = this.createConnection();
+  }
+
+  /**
+   * creates a new instance of EventHubClient using the supplied connection string and entity path.
+   *
+   * @param connectionString connection string to the namespace of event hubs. connection string format:
+   * amqps://{userId}:{password}@{namespaceName}.servicebus.windows.net
+   * @param entityPath the name of event hub entity.
+   *
+   * @return EventHubClient
+   * @throws org.apache.storm.eventhubs.client.EventHubException
+   */
+  public static EventHubClient create(String connectionString, String entityPath) throws EventHubException {
+    return new EventHubClient(connectionString, entityPath);
+  }
+
+  public EventHubSender createPartitionSender(String partitionId) throws Exception {
+    return new EventHubSender(this.connection.createSession(), this.entityPath, partitionId);
+  }
+
+  public EventHubConsumerGroup getConsumerGroup(String cgName) {
+    if(cgName == null || cgName.length() == 0) {
+      cgName = DefaultConsumerGroupName;
+    }
+    return new EventHubConsumerGroup(connection, entityPath, cgName);
+  }
+
+  public void close() {
+    try {
+      this.connection.close();
+    } catch (ConnectionErrorException e) {
+      logger.error(e.toString());
+    }
+  }
+
+  private Connection createConnection() throws EventHubException {
+    ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder(this.connectionString);
+    Connection clientConnection;
+
+    try {
+      clientConnection = new Connection(
+        connectionStringBuilder.getHost(),
+        connectionStringBuilder.getPort(),
+        connectionStringBuilder.getUserName(),
+        connectionStringBuilder.getPassword(),
+        connectionStringBuilder.getHost(),
+        connectionStringBuilder.getSsl());
+    } catch (ConnectionException e) {
+      logger.error(e.toString());
+      throw new EventHubException(e);
+    }
+    clientConnection.getEndpoint().setSyncTimeout(ConnectionSyncTimeout);
+    SelectorFilterWriter.register(clientConnection.getEndpoint().getDescribedTypeRegistry());
+    return clientConnection;
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1f13f15d/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSender.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSender.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSender.java
index 7c45578..435893e 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSender.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSender.java
@@ -1,99 +1,99 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.client;
-
-import java.util.concurrent.TimeoutException;
-import org.apache.qpid.amqp_1_0.client.LinkDetachedException;
-import org.apache.qpid.amqp_1_0.client.Message;
-import org.apache.qpid.amqp_1_0.client.Sender;
-import org.apache.qpid.amqp_1_0.client.Session;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.messaging.Data;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class EventHubSender {
-
-  private static final Logger logger = LoggerFactory.getLogger(EventHubSender.class);
-
-  private final Session session;
-  private final String entityPath;
-  private final String partitionId;
-  private final String destinationAddress;
-
-  private Sender sender;
-
-  public EventHubSender(Session session, String entityPath, String partitionId) {
-    this.session = session;
-    this.entityPath = entityPath;
-    this.partitionId = partitionId;
-    this.destinationAddress = this.getDestinationAddress();
-  }
-  
-  public void send(byte[] data) throws EventHubException {
-    try {
-      if (this.sender == null) {
-        this.ensureSenderCreated();
-      }
-
-      Binary bin = new Binary(data);
-      Message message = new Message(new Data(bin));
-      this.sender.send(message);
-
-    } catch (LinkDetachedException e) {
-      logger.error(e.getMessage());
-
-      EventHubException eventHubException = new EventHubException("Sender has been closed");
-      throw eventHubException;
-    } catch (TimeoutException e) {
-      logger.error(e.getMessage());
-
-      EventHubException eventHubException = new EventHubException("Timed out while waiting to get credit to send");
-      throw eventHubException;
-    } catch (Exception e) {
-      logger.error(e.getMessage());
-    }
-  }
-
-  public void send(String data) throws EventHubException {
-    //For interop with other language, convert string to bytes
-    send(data.getBytes());
-  }
-
-  public void close() {
-    try {
-      this.sender.close();
-    } catch (Sender.SenderClosingException e) {
-      logger.error("Closing a sender encountered error: " + e.getMessage());
-    }
-  }
-
-  private String getDestinationAddress() {
-    if (this.partitionId == null || this.partitionId.equals("")) {
-      return this.entityPath;
-    } else {
-      return String.format(Constants.DestinationAddressFormatString, this.entityPath, this.partitionId);
-    }
-  }
-
-  private synchronized void ensureSenderCreated() throws Exception {
-    if (this.sender == null) {
-      this.sender = this.session.createSender(this.destinationAddress);
-    }
-  }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.client;
+
+import java.util.concurrent.TimeoutException;
+import org.apache.qpid.amqp_1_0.client.LinkDetachedException;
+import org.apache.qpid.amqp_1_0.client.Message;
+import org.apache.qpid.amqp_1_0.client.Sender;
+import org.apache.qpid.amqp_1_0.client.Session;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.messaging.Data;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EventHubSender {
+
+  private static final Logger logger = LoggerFactory.getLogger(EventHubSender.class);
+
+  private final Session session;
+  private final String entityPath;
+  private final String partitionId;
+  private final String destinationAddress;
+
+  private Sender sender;
+
+  public EventHubSender(Session session, String entityPath, String partitionId) {
+    this.session = session;
+    this.entityPath = entityPath;
+    this.partitionId = partitionId;
+    this.destinationAddress = this.getDestinationAddress();
+  }
+  
+  public void send(byte[] data) throws EventHubException {
+    try {
+      if (this.sender == null) {
+        this.ensureSenderCreated();
+      }
+
+      Binary bin = new Binary(data);
+      Message message = new Message(new Data(bin));
+      this.sender.send(message);
+
+    } catch (LinkDetachedException e) {
+      logger.error(e.getMessage());
+
+      EventHubException eventHubException = new EventHubException("Sender has been closed");
+      throw eventHubException;
+    } catch (TimeoutException e) {
+      logger.error(e.getMessage());
+
+      EventHubException eventHubException = new EventHubException("Timed out while waiting to get credit to send");
+      throw eventHubException;
+    } catch (Exception e) {
+      logger.error(e.getMessage());
+    }
+  }
+
+  public void send(String data) throws EventHubException {
+    //For interop with other language, convert string to bytes
+    send(data.getBytes());
+  }
+
+  public void close() {
+    try {
+      this.sender.close();
+    } catch (Sender.SenderClosingException e) {
+      logger.error("Closing a sender encountered error: " + e.getMessage());
+    }
+  }
+
+  private String getDestinationAddress() {
+    if (this.partitionId == null || this.partitionId.equals("")) {
+      return this.entityPath;
+    } else {
+      return String.format(Constants.DestinationAddressFormatString, this.entityPath, this.partitionId);
+    }
+  }
+
+  private synchronized void ensureSenderCreated() throws Exception {
+    if (this.sender == null) {
+      this.sender = this.session.createSender(this.destinationAddress);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1f13f15d/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventHubLoop.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
index c908f9d..2f62a23 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
@@ -1,52 +1,52 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.samples;
-
-import backtype.storm.generated.StormTopology;
-import backtype.storm.topology.TopologyBuilder;
-
-import org.apache.storm.eventhubs.bolt.EventHubBolt;
-import org.apache.storm.eventhubs.bolt.EventHubBoltConfig;
-import org.apache.storm.eventhubs.spout.EventHubSpout;
-
-/**
- * A sample topology that loops message back to EventHub
- */
-public class EventHubLoop extends EventCount {
-
-  @Override
-  protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
-    TopologyBuilder topologyBuilder = new TopologyBuilder();
-
-    topologyBuilder.setSpout("EventHubsSpout", eventHubSpout, spoutConfig.getPartitionCount())
-      .setNumTasks(spoutConfig.getPartitionCount());
-    EventHubBoltConfig boltConfig = new EventHubBoltConfig(spoutConfig.getConnectionString(),
-        spoutConfig.getEntityPath(), true);
-    
-    EventHubBolt eventHubBolt = new EventHubBolt(boltConfig);
-    int boltTasks = spoutConfig.getPartitionCount();
-    topologyBuilder.setBolt("EventHubsBolt", eventHubBolt, boltTasks)
-      .localOrShuffleGrouping("EventHubsSpout").setNumTasks(boltTasks);
-    return topologyBuilder.createTopology();
-  }
-  
-  public static void main(String[] args) throws Exception {
-    EventHubLoop scenario = new EventHubLoop();
-    scenario.runScenario(args);
-  }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.TopologyBuilder;
+
+import org.apache.storm.eventhubs.bolt.EventHubBolt;
+import org.apache.storm.eventhubs.bolt.EventHubBoltConfig;
+import org.apache.storm.eventhubs.spout.EventHubSpout;
+
+/**
+ * A sample topology that loops message back to EventHub
+ */
+public class EventHubLoop extends EventCount {
+
+  @Override
+  protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
+    TopologyBuilder topologyBuilder = new TopologyBuilder();
+
+    topologyBuilder.setSpout("EventHubsSpout", eventHubSpout, spoutConfig.getPartitionCount())
+      .setNumTasks(spoutConfig.getPartitionCount());
+    EventHubBoltConfig boltConfig = new EventHubBoltConfig(spoutConfig.getConnectionString(),
+        spoutConfig.getEntityPath(), true);
+    
+    EventHubBolt eventHubBolt = new EventHubBolt(boltConfig);
+    int boltTasks = spoutConfig.getPartitionCount();
+    topologyBuilder.setBolt("EventHubsBolt", eventHubBolt, boltTasks)
+      .localOrShuffleGrouping("EventHubsSpout").setNumTasks(boltTasks);
+    return topologyBuilder.createTopology();
+  }
+  
+  public static void main(String[] args) throws Exception {
+    EventHubLoop scenario = new EventHubLoop();
+    scenario.runScenario(args);
+  }
+}