You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/06/04 04:06:16 UTC
[34/50] [abbrv] storm git commit: Fix line endings so that the diff
is meaningful.
Fix line endings so that the diff is meaningful.
Signed-off-by: Shanyu Zhao <sh...@microsoft.com>
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1f13f15d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1f13f15d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1f13f15d
Branch: refs/heads/0.10.x-branch
Commit: 1f13f15d0c0de233fd4cc4ff6d6de586c4736142
Parents: e515492
Author: Shanyu Zhao <sh...@microsoft.com>
Authored: Wed May 13 14:50:02 2015 -0700
Committer: Shanyu Zhao <sh...@microsoft.com>
Committed: Wed May 13 14:50:02 2015 -0700
----------------------------------------------------------------------
external/storm-eventhubs/pom.xml | 236 +++++++++----------
.../eventhubs/bolt/DefaultEventDataFormat.java | 94 ++++----
.../storm/eventhubs/bolt/EventHubBolt.java | 202 ++++++++--------
.../eventhubs/bolt/EventHubBoltConfig.java | 214 ++++++++---------
.../storm/eventhubs/bolt/IEventDataFormat.java | 56 ++---
.../storm/eventhubs/client/EventHubClient.java | 190 +++++++--------
.../storm/eventhubs/client/EventHubSender.java | 198 ++++++++--------
.../storm/eventhubs/samples/EventHubLoop.java | 104 ++++----
8 files changed, 647 insertions(+), 647 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/1f13f15d/external/storm-eventhubs/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/pom.xml b/external/storm-eventhubs/pom.xml
index 2ceed09..2dfb739 100755
--- a/external/storm-eventhubs/pom.xml
+++ b/external/storm-eventhubs/pom.xml
@@ -1,119 +1,119 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <artifactId>storm</artifactId>
- <groupId>org.apache.storm</groupId>
- <version>0.11.0-SNAPSHOT</version>
- <relativePath>../../pom.xml</relativePath>
- </parent>
-
- <artifactId>storm-eventhubs</artifactId>
- <version>0.11.0-SNAPSHOT</version>
- <packaging>jar</packaging>
- <name>storm-eventhubs</name>
- <description>EventHubs Storm Spout</description>
-
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <qpid.version>0.32</qpid.version>
- </properties>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <version>2.3</version>
- <executions>
- <execution>
- <goals>
- <goal>shade</goal>
- </goals>
- <phase>package</phase>
- </execution>
- </executions>
- <configuration>
- <transformers>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer">
- </transformer>
- </transformers>
- <outputFile>target/${project.artifactId}-${project.version}-jar-with-dependencies.jar</outputFile>
- </configuration>
- </plugin>
- <plugin>
- <artifactId>maven-antrun-plugin</artifactId>
- <executions>
- <execution>
- <phase>package</phase>
- <configuration>
- <tasks>
- <copy file="src/main/resources/config.properties" tofile="target/eventhubs-config.properties"/>
- </tasks>
- </configuration>
- <goals>
- <goal>run</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- <dependencies>
- <dependency>
- <groupId>org.apache.qpid</groupId>
- <artifactId>qpid-client</artifactId>
- <version>${qpid.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.qpid</groupId>
- <artifactId>qpid-amqp-1-0-client-jms</artifactId>
- <version>${qpid.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
- <version>${project.version}</version>
- <!-- keep storm out of the jar-with-dependencies -->
- <type>jar</type>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-framework</artifactId>
- <version>${curator.version}</version>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.11</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>0.11.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>storm-eventhubs</artifactId>
+ <version>0.11.0-SNAPSHOT</version>
+ <packaging>jar</packaging>
+ <name>storm-eventhubs</name>
+ <description>EventHubs Storm Spout</description>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <qpid.version>0.32</qpid.version>
+ </properties>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.3</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <phase>package</phase>
+ </execution>
+ </executions>
+ <configuration>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer">
+ </transformer>
+ </transformers>
+ <outputFile>target/${project.artifactId}-${project.version}-jar-with-dependencies.jar</outputFile>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <configuration>
+ <tasks>
+ <copy file="src/main/resources/config.properties" tofile="target/eventhubs-config.properties"/>
+ </tasks>
+ </configuration>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-client</artifactId>
+ <version>${qpid.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-amqp-1-0-client-jms</artifactId>
+ <version>${qpid.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${project.version}</version>
+ <!-- keep storm out of the jar-with-dependencies -->
+ <type>jar</type>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ <version>${curator.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.11</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/1f13f15d/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/DefaultEventDataFormat.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/DefaultEventDataFormat.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/DefaultEventDataFormat.java
index 1bd8288..6b3eba7 100644
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/DefaultEventDataFormat.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/DefaultEventDataFormat.java
@@ -1,47 +1,47 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.bolt;
-
-import backtype.storm.tuple.Tuple;
-
-/**
- * A default implementation of IEventDataFormat that converts the tuple
- * into a delimited string.
- */
-public class DefaultEventDataFormat implements IEventDataFormat {
- private static final long serialVersionUID = 1L;
- private String delimiter = ",";
-
- public DefaultEventDataFormat withFieldDelimiter(String delimiter) {
- this.delimiter = delimiter;
- return this;
- }
-
- @Override
- public byte[] serialize(Tuple tuple) {
- StringBuilder sb = new StringBuilder();
- for(Object obj : tuple.getValues()) {
- if(sb.length() != 0) {
- sb.append(delimiter);
- }
- sb.append(obj.toString());
- }
- return sb.toString().getBytes();
- }
-
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.bolt;
+
+import backtype.storm.tuple.Tuple;
+
+/**
+ * A default implementation of IEventDataFormat that converts the tuple
+ * into a delimited string.
+ */
+public class DefaultEventDataFormat implements IEventDataFormat {
+ private static final long serialVersionUID = 1L;
+ private String delimiter = ",";
+
+ public DefaultEventDataFormat withFieldDelimiter(String delimiter) {
+ this.delimiter = delimiter;
+ return this;
+ }
+
+ @Override
+ public byte[] serialize(Tuple tuple) {
+ StringBuilder sb = new StringBuilder();
+ for(Object obj : tuple.getValues()) {
+ if(sb.length() != 0) {
+ sb.append(delimiter);
+ }
+ sb.append(obj.toString());
+ }
+ return sb.toString().getBytes();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/1f13f15d/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
index 09f90b1..a817744 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
@@ -1,101 +1,101 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.bolt;
-
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.storm.eventhubs.client.EventHubClient;
-import org.apache.storm.eventhubs.client.EventHubException;
-import org.apache.storm.eventhubs.client.EventHubSender;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Tuple;
-
-/**
- * A bolt that writes event message to EventHub.
- */
-public class EventHubBolt extends BaseRichBolt {
- private static final long serialVersionUID = 1L;
- private static final Logger logger = LoggerFactory
- .getLogger(EventHubBolt.class);
-
- protected OutputCollector collector;
- protected EventHubSender sender;
- protected EventHubBoltConfig boltConfig;
-
-
- public EventHubBolt(String connectionString, String entityPath) {
- boltConfig = new EventHubBoltConfig(connectionString, entityPath);
- }
-
- public EventHubBolt(String userName, String password, String namespace,
- String entityPath, boolean partitionMode) {
- boltConfig = new EventHubBoltConfig(userName, password, namespace,
- entityPath, partitionMode);
- }
-
- public EventHubBolt(EventHubBoltConfig config) {
- boltConfig = config;
- }
-
- @Override
- public void prepare(Map config, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
- String myPartitionId = null;
- if(boltConfig.getPartitionMode()) {
- //We can use the task index (starting from 0) as the partition ID
- myPartitionId = "" + context.getThisTaskIndex();
- }
- logger.info("creating sender: " + boltConfig.getConnectionString()
- + ", " + boltConfig.getEntityPath() + ", " + myPartitionId);
- try {
- EventHubClient eventHubClient = EventHubClient.create(
- boltConfig.getConnectionString(), boltConfig.getEntityPath());
- sender = eventHubClient.createPartitionSender(myPartitionId);
- }
- catch(Exception ex) {
- logger.error(ex.getMessage());
- throw new RuntimeException(ex);
- }
-
- }
-
- @Override
- public void execute(Tuple tuple) {
- try {
- sender.send(boltConfig.getEventDataFormat().serialize(tuple));
- collector.ack(tuple);
- }
- catch(EventHubException ex) {
- logger.error(ex.getMessage());
- collector.fail(tuple);
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
- }
-
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.bolt;
+
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.eventhubs.client.EventHubClient;
+import org.apache.storm.eventhubs.client.EventHubException;
+import org.apache.storm.eventhubs.client.EventHubSender;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
+
+/**
+ * A bolt that writes event message to EventHub.
+ */
+public class EventHubBolt extends BaseRichBolt {
+ private static final long serialVersionUID = 1L;
+ private static final Logger logger = LoggerFactory
+ .getLogger(EventHubBolt.class);
+
+ protected OutputCollector collector;
+ protected EventHubSender sender;
+ protected EventHubBoltConfig boltConfig;
+
+
+ public EventHubBolt(String connectionString, String entityPath) {
+ boltConfig = new EventHubBoltConfig(connectionString, entityPath);
+ }
+
+ public EventHubBolt(String userName, String password, String namespace,
+ String entityPath, boolean partitionMode) {
+ boltConfig = new EventHubBoltConfig(userName, password, namespace,
+ entityPath, partitionMode);
+ }
+
+ public EventHubBolt(EventHubBoltConfig config) {
+ boltConfig = config;
+ }
+
+ @Override
+ public void prepare(Map config, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ String myPartitionId = null;
+ if(boltConfig.getPartitionMode()) {
+ //We can use the task index (starting from 0) as the partition ID
+ myPartitionId = "" + context.getThisTaskIndex();
+ }
+ logger.info("creating sender: " + boltConfig.getConnectionString()
+ + ", " + boltConfig.getEntityPath() + ", " + myPartitionId);
+ try {
+ EventHubClient eventHubClient = EventHubClient.create(
+ boltConfig.getConnectionString(), boltConfig.getEntityPath());
+ sender = eventHubClient.createPartitionSender(myPartitionId);
+ }
+ catch(Exception ex) {
+ logger.error(ex.getMessage());
+ throw new RuntimeException(ex);
+ }
+
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ try {
+ sender.send(boltConfig.getEventDataFormat().serialize(tuple));
+ collector.ack(tuple);
+ }
+ catch(EventHubException ex) {
+ logger.error(ex.getMessage());
+ collector.fail(tuple);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/1f13f15d/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java
index 909e8ac..4383a72 100644
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java
@@ -1,107 +1,107 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.bolt;
-
-import java.io.Serializable;
-
-import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
-
-/*
- * EventHubs bolt configurations
- *
- * Partition mode:
- * With partitionMode=true you need to create the same number of tasks as the number of
- * EventHubs partitions, and each bolt task will only send data to one partition.
- * The partition ID is the task ID of the bolt.
- *
- * Event format:
- * The formatter to convert tuple to bytes for EventHubs.
- * if null, the default format is common delimited tuple fields.
- */
-public class EventHubBoltConfig implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private String connectionString;
- private final String entityPath;
- protected boolean partitionMode;
- protected IEventDataFormat dataFormat;
-
- public EventHubBoltConfig(String connectionString, String entityPath) {
- this(connectionString, entityPath, false, null);
- }
-
- public EventHubBoltConfig(String connectionString, String entityPath,
- boolean partitionMode) {
- this(connectionString, entityPath, partitionMode, null);
- }
-
- public EventHubBoltConfig(String userName, String password, String namespace,
- String entityPath, boolean partitionMode) {
- this(userName, password, namespace,
- EventHubSpoutConfig.EH_SERVICE_FQDN_SUFFIX, entityPath, partitionMode);
- }
-
- public EventHubBoltConfig(String connectionString, String entityPath,
- boolean partitionMode, IEventDataFormat dataFormat) {
- this.connectionString = connectionString;
- this.entityPath = entityPath;
- this.partitionMode = partitionMode;
- this.dataFormat = dataFormat;
- if(this.dataFormat == null) {
- this.dataFormat = new DefaultEventDataFormat();
- }
- }
-
- public EventHubBoltConfig(String userName, String password, String namespace,
- String targetFqnAddress, String entityPath) {
- this(userName, password, namespace, targetFqnAddress, entityPath, false, null);
- }
-
- public EventHubBoltConfig(String userName, String password, String namespace,
- String targetFqnAddress, String entityPath, boolean partitionMode) {
- this(userName, password, namespace, targetFqnAddress, entityPath, partitionMode, null);
- }
-
- public EventHubBoltConfig(String userName, String password, String namespace,
- String targetFqnAddress, String entityPath, boolean partitionMode,
- IEventDataFormat dataFormat) {
- this.connectionString = EventHubSpoutConfig.buildConnectionString(userName, password, namespace, targetFqnAddress);
- this.entityPath = entityPath;
- this.partitionMode = partitionMode;
- this.dataFormat = dataFormat;
- if(this.dataFormat == null) {
- this.dataFormat = new DefaultEventDataFormat();
- }
- }
-
- public String getConnectionString() {
- return connectionString;
- }
-
- public String getEntityPath() {
- return entityPath;
- }
-
- public boolean getPartitionMode() {
- return partitionMode;
- }
-
- public IEventDataFormat getEventDataFormat() {
- return dataFormat;
- }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.bolt;
+
+import java.io.Serializable;
+
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+
+/*
+ * EventHubs bolt configurations
+ *
+ * Partition mode:
+ * With partitionMode=true you need to create the same number of tasks as the number of
+ * EventHubs partitions, and each bolt task will only send data to one partition.
+ * The partition ID is the task ID of the bolt.
+ *
+ * Event format:
+ * The formatter to convert tuple to bytes for EventHubs.
+ * if null, the default format is common delimited tuple fields.
+ */
+public class EventHubBoltConfig implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private String connectionString;
+ private final String entityPath;
+ protected boolean partitionMode;
+ protected IEventDataFormat dataFormat;
+
+ public EventHubBoltConfig(String connectionString, String entityPath) {
+ this(connectionString, entityPath, false, null);
+ }
+
+ public EventHubBoltConfig(String connectionString, String entityPath,
+ boolean partitionMode) {
+ this(connectionString, entityPath, partitionMode, null);
+ }
+
+ public EventHubBoltConfig(String userName, String password, String namespace,
+ String entityPath, boolean partitionMode) {
+ this(userName, password, namespace,
+ EventHubSpoutConfig.EH_SERVICE_FQDN_SUFFIX, entityPath, partitionMode);
+ }
+
+ public EventHubBoltConfig(String connectionString, String entityPath,
+ boolean partitionMode, IEventDataFormat dataFormat) {
+ this.connectionString = connectionString;
+ this.entityPath = entityPath;
+ this.partitionMode = partitionMode;
+ this.dataFormat = dataFormat;
+ if(this.dataFormat == null) {
+ this.dataFormat = new DefaultEventDataFormat();
+ }
+ }
+
+ public EventHubBoltConfig(String userName, String password, String namespace,
+ String targetFqnAddress, String entityPath) {
+ this(userName, password, namespace, targetFqnAddress, entityPath, false, null);
+ }
+
+ public EventHubBoltConfig(String userName, String password, String namespace,
+ String targetFqnAddress, String entityPath, boolean partitionMode) {
+ this(userName, password, namespace, targetFqnAddress, entityPath, partitionMode, null);
+ }
+
+ public EventHubBoltConfig(String userName, String password, String namespace,
+ String targetFqnAddress, String entityPath, boolean partitionMode,
+ IEventDataFormat dataFormat) {
+ this.connectionString = EventHubSpoutConfig.buildConnectionString(userName, password, namespace, targetFqnAddress);
+ this.entityPath = entityPath;
+ this.partitionMode = partitionMode;
+ this.dataFormat = dataFormat;
+ if(this.dataFormat == null) {
+ this.dataFormat = new DefaultEventDataFormat();
+ }
+ }
+
+ public String getConnectionString() {
+ return connectionString;
+ }
+
+ public String getEntityPath() {
+ return entityPath;
+ }
+
+ public boolean getPartitionMode() {
+ return partitionMode;
+ }
+
+ public IEventDataFormat getEventDataFormat() {
+ return dataFormat;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/1f13f15d/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/IEventDataFormat.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/IEventDataFormat.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/IEventDataFormat.java
index cb05c0f..2003c34 100644
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/IEventDataFormat.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/IEventDataFormat.java
@@ -1,28 +1,28 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.bolt;
-
-import java.io.Serializable;
-import backtype.storm.tuple.Tuple;
-
-/**
- * Serialize a tuple to a byte array to be sent to EventHubs
- */
-public interface IEventDataFormat extends Serializable {
- public byte[] serialize(Tuple tuple);
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.bolt;
+
+import java.io.Serializable;
+import backtype.storm.tuple.Tuple;
+
+/**
+ * Serialize a tuple to a byte array to be sent to EventHubs
+ */
+public interface IEventDataFormat extends Serializable {
+ public byte[] serialize(Tuple tuple);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/1f13f15d/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubClient.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubClient.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubClient.java
index 2afe5b4..564a26f 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubClient.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubClient.java
@@ -1,95 +1,95 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.client;
-
-import org.apache.qpid.amqp_1_0.client.Connection;
-import org.apache.qpid.amqp_1_0.client.ConnectionErrorException;
-import org.apache.qpid.amqp_1_0.client.ConnectionException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class EventHubClient {
-
- private static final String DefaultConsumerGroupName = "$default";
- private static final Logger logger = LoggerFactory.getLogger(EventHubClient.class);
- private static final long ConnectionSyncTimeout = 60000L;
-
- private final String connectionString;
- private final String entityPath;
- private final Connection connection;
-
- private EventHubClient(String connectionString, String entityPath) throws EventHubException {
- this.connectionString = connectionString;
- this.entityPath = entityPath;
- this.connection = this.createConnection();
- }
-
- /**
- * creates a new instance of EventHubClient using the supplied connection string and entity path.
- *
- * @param connectionString connection string to the namespace of event hubs. connection string format:
- * amqps://{userId}:{password}@{namespaceName}.servicebus.windows.net
- * @param entityPath the name of event hub entity.
- *
- * @return EventHubClient
- * @throws org.apache.storm.eventhubs.client.EventHubException
- */
- public static EventHubClient create(String connectionString, String entityPath) throws EventHubException {
- return new EventHubClient(connectionString, entityPath);
- }
-
- public EventHubSender createPartitionSender(String partitionId) throws Exception {
- return new EventHubSender(this.connection.createSession(), this.entityPath, partitionId);
- }
-
- public EventHubConsumerGroup getConsumerGroup(String cgName) {
- if(cgName == null || cgName.length() == 0) {
- cgName = DefaultConsumerGroupName;
- }
- return new EventHubConsumerGroup(connection, entityPath, cgName);
- }
-
- public void close() {
- try {
- this.connection.close();
- } catch (ConnectionErrorException e) {
- logger.error(e.toString());
- }
- }
-
- private Connection createConnection() throws EventHubException {
- ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder(this.connectionString);
- Connection clientConnection;
-
- try {
- clientConnection = new Connection(
- connectionStringBuilder.getHost(),
- connectionStringBuilder.getPort(),
- connectionStringBuilder.getUserName(),
- connectionStringBuilder.getPassword(),
- connectionStringBuilder.getHost(),
- connectionStringBuilder.getSsl());
- } catch (ConnectionException e) {
- logger.error(e.toString());
- throw new EventHubException(e);
- }
- clientConnection.getEndpoint().setSyncTimeout(ConnectionSyncTimeout);
- SelectorFilterWriter.register(clientConnection.getEndpoint().getDescribedTypeRegistry());
- return clientConnection;
- }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.client;
+
+import org.apache.qpid.amqp_1_0.client.Connection;
+import org.apache.qpid.amqp_1_0.client.ConnectionErrorException;
+import org.apache.qpid.amqp_1_0.client.ConnectionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EventHubClient {
+
+ private static final String DefaultConsumerGroupName = "$default";
+ private static final Logger logger = LoggerFactory.getLogger(EventHubClient.class);
+ private static final long ConnectionSyncTimeout = 60000L;
+
+ private final String connectionString;
+ private final String entityPath;
+ private final Connection connection;
+
+ private EventHubClient(String connectionString, String entityPath) throws EventHubException {
+ this.connectionString = connectionString;
+ this.entityPath = entityPath;
+ this.connection = this.createConnection();
+ }
+
+ /**
+ * creates a new instance of EventHubClient using the supplied connection string and entity path.
+ *
+ * @param connectionString connection string to the namespace of event hubs. connection string format:
+ * amqps://{userId}:{password}@{namespaceName}.servicebus.windows.net
+ * @param entityPath the name of event hub entity.
+ *
+ * @return EventHubClient
+ * @throws org.apache.storm.eventhubs.client.EventHubException
+ */
+ public static EventHubClient create(String connectionString, String entityPath) throws EventHubException {
+ return new EventHubClient(connectionString, entityPath);
+ }
+
+ public EventHubSender createPartitionSender(String partitionId) throws Exception {
+ return new EventHubSender(this.connection.createSession(), this.entityPath, partitionId);
+ }
+
+ public EventHubConsumerGroup getConsumerGroup(String cgName) {
+ if(cgName == null || cgName.length() == 0) {
+ cgName = DefaultConsumerGroupName;
+ }
+ return new EventHubConsumerGroup(connection, entityPath, cgName);
+ }
+
+ public void close() {
+ try {
+ this.connection.close();
+ } catch (ConnectionErrorException e) {
+ logger.error(e.toString());
+ }
+ }
+
+ private Connection createConnection() throws EventHubException {
+ ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder(this.connectionString);
+ Connection clientConnection;
+
+ try {
+ clientConnection = new Connection(
+ connectionStringBuilder.getHost(),
+ connectionStringBuilder.getPort(),
+ connectionStringBuilder.getUserName(),
+ connectionStringBuilder.getPassword(),
+ connectionStringBuilder.getHost(),
+ connectionStringBuilder.getSsl());
+ } catch (ConnectionException e) {
+ logger.error(e.toString());
+ throw new EventHubException(e);
+ }
+ clientConnection.getEndpoint().setSyncTimeout(ConnectionSyncTimeout);
+ SelectorFilterWriter.register(clientConnection.getEndpoint().getDescribedTypeRegistry());
+ return clientConnection;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/1f13f15d/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSender.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSender.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSender.java
index 7c45578..435893e 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSender.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSender.java
@@ -1,99 +1,99 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.client;
-
-import java.util.concurrent.TimeoutException;
-import org.apache.qpid.amqp_1_0.client.LinkDetachedException;
-import org.apache.qpid.amqp_1_0.client.Message;
-import org.apache.qpid.amqp_1_0.client.Sender;
-import org.apache.qpid.amqp_1_0.client.Session;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.messaging.Data;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class EventHubSender {
-
- private static final Logger logger = LoggerFactory.getLogger(EventHubSender.class);
-
- private final Session session;
- private final String entityPath;
- private final String partitionId;
- private final String destinationAddress;
-
- private Sender sender;
-
- public EventHubSender(Session session, String entityPath, String partitionId) {
- this.session = session;
- this.entityPath = entityPath;
- this.partitionId = partitionId;
- this.destinationAddress = this.getDestinationAddress();
- }
-
- public void send(byte[] data) throws EventHubException {
- try {
- if (this.sender == null) {
- this.ensureSenderCreated();
- }
-
- Binary bin = new Binary(data);
- Message message = new Message(new Data(bin));
- this.sender.send(message);
-
- } catch (LinkDetachedException e) {
- logger.error(e.getMessage());
-
- EventHubException eventHubException = new EventHubException("Sender has been closed");
- throw eventHubException;
- } catch (TimeoutException e) {
- logger.error(e.getMessage());
-
- EventHubException eventHubException = new EventHubException("Timed out while waiting to get credit to send");
- throw eventHubException;
- } catch (Exception e) {
- logger.error(e.getMessage());
- }
- }
-
- public void send(String data) throws EventHubException {
- //For interop with other language, convert string to bytes
- send(data.getBytes());
- }
-
- public void close() {
- try {
- this.sender.close();
- } catch (Sender.SenderClosingException e) {
- logger.error("Closing a sender encountered error: " + e.getMessage());
- }
- }
-
- private String getDestinationAddress() {
- if (this.partitionId == null || this.partitionId.equals("")) {
- return this.entityPath;
- } else {
- return String.format(Constants.DestinationAddressFormatString, this.entityPath, this.partitionId);
- }
- }
-
- private synchronized void ensureSenderCreated() throws Exception {
- if (this.sender == null) {
- this.sender = this.session.createSender(this.destinationAddress);
- }
- }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.client;
+
+import java.util.concurrent.TimeoutException;
+import org.apache.qpid.amqp_1_0.client.LinkDetachedException;
+import org.apache.qpid.amqp_1_0.client.Message;
+import org.apache.qpid.amqp_1_0.client.Sender;
+import org.apache.qpid.amqp_1_0.client.Session;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.messaging.Data;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EventHubSender {
+
+ private static final Logger logger = LoggerFactory.getLogger(EventHubSender.class);
+
+ private final Session session;
+ private final String entityPath;
+ private final String partitionId;
+ private final String destinationAddress;
+
+ private Sender sender;
+
+ public EventHubSender(Session session, String entityPath, String partitionId) {
+ this.session = session;
+ this.entityPath = entityPath;
+ this.partitionId = partitionId;
+ this.destinationAddress = this.getDestinationAddress();
+ }
+
+ public void send(byte[] data) throws EventHubException {
+ try {
+ if (this.sender == null) {
+ this.ensureSenderCreated();
+ }
+
+ Binary bin = new Binary(data);
+ Message message = new Message(new Data(bin));
+ this.sender.send(message);
+
+ } catch (LinkDetachedException e) {
+ logger.error(e.getMessage());
+
+ EventHubException eventHubException = new EventHubException("Sender has been closed");
+ throw eventHubException;
+ } catch (TimeoutException e) {
+ logger.error(e.getMessage());
+
+ EventHubException eventHubException = new EventHubException("Timed out while waiting to get credit to send");
+ throw eventHubException;
+ } catch (Exception e) {
+ logger.error(e.getMessage());
+ }
+ }
+
+ public void send(String data) throws EventHubException {
+ //For interop with other language, convert string to bytes
+ send(data.getBytes());
+ }
+
+ public void close() {
+ try {
+ this.sender.close();
+ } catch (Sender.SenderClosingException e) {
+ logger.error("Closing a sender encountered error: " + e.getMessage());
+ }
+ }
+
+ private String getDestinationAddress() {
+ if (this.partitionId == null || this.partitionId.equals("")) {
+ return this.entityPath;
+ } else {
+ return String.format(Constants.DestinationAddressFormatString, this.entityPath, this.partitionId);
+ }
+ }
+
+ private synchronized void ensureSenderCreated() throws Exception {
+ if (this.sender == null) {
+ this.sender = this.session.createSender(this.destinationAddress);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/1f13f15d/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventHubLoop.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
index c908f9d..2f62a23 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
@@ -1,52 +1,52 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.samples;
-
-import backtype.storm.generated.StormTopology;
-import backtype.storm.topology.TopologyBuilder;
-
-import org.apache.storm.eventhubs.bolt.EventHubBolt;
-import org.apache.storm.eventhubs.bolt.EventHubBoltConfig;
-import org.apache.storm.eventhubs.spout.EventHubSpout;
-
-/**
- * A sample topology that loops message back to EventHub
- */
-public class EventHubLoop extends EventCount {
-
- @Override
- protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
- TopologyBuilder topologyBuilder = new TopologyBuilder();
-
- topologyBuilder.setSpout("EventHubsSpout", eventHubSpout, spoutConfig.getPartitionCount())
- .setNumTasks(spoutConfig.getPartitionCount());
- EventHubBoltConfig boltConfig = new EventHubBoltConfig(spoutConfig.getConnectionString(),
- spoutConfig.getEntityPath(), true);
-
- EventHubBolt eventHubBolt = new EventHubBolt(boltConfig);
- int boltTasks = spoutConfig.getPartitionCount();
- topologyBuilder.setBolt("EventHubsBolt", eventHubBolt, boltTasks)
- .localOrShuffleGrouping("EventHubsSpout").setNumTasks(boltTasks);
- return topologyBuilder.createTopology();
- }
-
- public static void main(String[] args) throws Exception {
- EventHubLoop scenario = new EventHubLoop();
- scenario.runScenario(args);
- }
-}
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.samples;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.TopologyBuilder;
+
+import org.apache.storm.eventhubs.bolt.EventHubBolt;
+import org.apache.storm.eventhubs.bolt.EventHubBoltConfig;
+import org.apache.storm.eventhubs.spout.EventHubSpout;
+
+/**
+ * A sample topology that loops message back to EventHub
+ */
+public class EventHubLoop extends EventCount {
+
+ @Override
+ protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
+ TopologyBuilder topologyBuilder = new TopologyBuilder();
+
+ topologyBuilder.setSpout("EventHubsSpout", eventHubSpout, spoutConfig.getPartitionCount())
+ .setNumTasks(spoutConfig.getPartitionCount());
+ EventHubBoltConfig boltConfig = new EventHubBoltConfig(spoutConfig.getConnectionString(),
+ spoutConfig.getEntityPath(), true);
+
+ EventHubBolt eventHubBolt = new EventHubBolt(boltConfig);
+ int boltTasks = spoutConfig.getPartitionCount();
+ topologyBuilder.setBolt("EventHubsBolt", eventHubBolt, boltTasks)
+ .localOrShuffleGrouping("EventHubsSpout").setNumTasks(boltTasks);
+ return topologyBuilder.createTopology();
+ }
+
+ public static void main(String[] args) throws Exception {
+ EventHubLoop scenario = new EventHubLoop();
+ scenario.runScenario(args);
+ }
+}