You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2017/04/18 21:26:34 UTC
[1/3] storm git commit: Storm 2371: Replace existing AMQP eventhub
client with the lastest one from Microsoft eventhubs
Repository: storm
Updated Branches:
refs/heads/master f794d7230 -> 2307b8023
Storm 2371: Replace existing AMQP eventhub client with the lastest one from Microsoft eventhubs
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fdb9136b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fdb9136b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fdb9136b
Branch: refs/heads/master
Commit: fdb9136bc69f28175f4dc3bee9297352440f2f13
Parents: 1850dd5
Author: Ranjan Banerjee <ra...@microsoft.com>
Authored: Tue Feb 21 13:51:16 2017 -0800
Committer: Ranjan Banerjee <ra...@microsoft.com>
Committed: Sun Apr 9 13:06:00 2017 -0700
----------------------------------------------------------------------
external/storm-eventhubs/pom.xml | 29 +---
.../storm/eventhubs/bolt/EventHubBolt.java | 82 ++++++++---
.../eventhubs/bolt/EventHubBoltConfig.java | 10 +-
.../eventhubs/spout/BinaryEventDataScheme.java | 43 +++---
.../apache/storm/eventhubs/spout/EventData.java | 48 -------
.../storm/eventhubs/spout/EventDataScheme.java | 51 +++----
.../storm/eventhubs/spout/EventDataWrap.java | 48 +++++++
.../eventhubs/spout/EventHubException.java | 41 ++++++
.../storm/eventhubs/spout/EventHubFilter.java | 46 ++++++
.../eventhubs/spout/EventHubReceiverImpl.java | 141 +++++++++++--------
.../storm/eventhubs/spout/EventHubSpout.java | 24 ++--
.../eventhubs/spout/EventHubSpoutConfig.java | 18 ++-
.../eventhubs/spout/EventHubSpoutException.java | 37 -----
.../storm/eventhubs/spout/FieldConstants.java | 1 +
.../storm/eventhubs/spout/IEventDataScheme.java | 7 +-
.../storm/eventhubs/spout/IEventFilter.java | 28 ++++
.../eventhubs/spout/IEventHubReceiver.java | 11 +-
.../eventhubs/spout/IPartitionManager.java | 2 +-
.../storm/eventhubs/spout/PartitionManager.java | 28 ++--
.../spout/SerializeDeserializeUtil.java | 34 +++++
.../eventhubs/spout/SimplePartitionManager.java | 32 ++---
.../spout/StaticPartitionCoordinator.java | 1 -
.../eventhubs/spout/StringEventDataScheme.java | 43 +++---
.../trident/ITridentPartitionManager.java | 6 +-
.../TransactionalTridentEventHubEmitter.java | 36 ++---
.../trident/TridentPartitionManager.java | 29 ++--
.../eventhubs/spout/EventHubReceiverMock.java | 36 ++---
.../spout/PartitionManagerCallerMock.java | 6 +-
.../storm/eventhubs/spout/TestEventData.java | 8 +-
.../eventhubs/spout/TestEventHubSpout.java | 7 +-
pom.xml | 2 +-
31 files changed, 531 insertions(+), 404 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/pom.xml b/external/storm-eventhubs/pom.xml
index 7fe6071..12a0b2f 100755
--- a/external/storm-eventhubs/pom.xml
+++ b/external/storm-eventhubs/pom.xml
@@ -52,9 +52,9 @@
</build>
<dependencies>
<dependency>
- <groupId>com.microsoft.eventhubs.client</groupId>
- <artifactId>eventhubs-client</artifactId>
- <version>${eventhubs.client.version}</version>
+ <groupId>com.microsoft.azure</groupId>
+ <artifactId>azure-eventhubs</artifactId>
+ <version>${azure-eventhubs.version}</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
@@ -86,26 +86,5 @@
</exclusion>
</exclusions>
</dependency>
- <dependency>
- <groupId>org.apache.qpid</groupId>
- <artifactId>qpid-amqp-1-0-client</artifactId>
- <version>${qpid.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.qpid</groupId>
- <artifactId>qpid-amqp-1-0-client-jms</artifactId>
- <version>${qpid.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.qpid</groupId>
- <artifactId>qpid-amqp-1-0-common</artifactId>
- <version>${qpid.version}</version>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.11</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
+ </dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
index 604c62d..7503db3 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
@@ -17,33 +17,37 @@
*******************************************************************************/
package org.apache.storm.eventhubs.bolt;
-import java.util.Map;
-
-import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
-import org.apache.storm.utils.TupleUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.microsoft.eventhubs.client.EventHubClient;
-import com.microsoft.eventhubs.client.EventHubException;
-import com.microsoft.eventhubs.client.EventHubSender;
+import com.microsoft.azure.eventhubs.EventData;
+import com.microsoft.azure.eventhubs.EventHubClient;
+import com.microsoft.azure.eventhubs.PartitionSender;
+import com.microsoft.azure.servicebus.ServiceBusException;
+import org.apache.storm.eventhubs.spout.EventHubException;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
/**
* A bolt that writes event message to EventHub.
*/
-public class EventHubBolt extends BaseTickTupleAwareRichBolt {
+public class EventHubBolt extends BaseRichBolt {
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory
.getLogger(EventHubBolt.class);
protected OutputCollector collector;
- protected EventHubSender sender;
+ protected PartitionSender sender;
+ protected EventHubClient ehClient;
protected EventHubBoltConfig boltConfig;
public EventHubBolt(String connectionString, String entityPath) {
@@ -72,10 +76,10 @@ public class EventHubBolt extends BaseTickTupleAwareRichBolt {
logger.info("creating sender: " + boltConfig.getConnectionString()
+ ", " + boltConfig.getEntityPath() + ", " + myPartitionId);
try {
- EventHubClient eventHubClient = EventHubClient.create(
- boltConfig.getConnectionString(),
- boltConfig.getEntityPath());
- sender = eventHubClient.createPartitionSender(myPartitionId);
+ ehClient = EventHubClient.createFromConnectionStringSync(boltConfig.getConnectionString());
+ if (boltConfig.getPartitionMode()) {
+ sender = ehClient.createPartitionSenderSync(Integer.toString(context.getThisTaskIndex()));
+ }
} catch (Exception ex) {
collector.reportError(ex);
throw new RuntimeException(ex);
@@ -84,13 +88,53 @@ public class EventHubBolt extends BaseTickTupleAwareRichBolt {
}
@Override
- protected void process(Tuple tuple) {
+ public void execute(Tuple tuple) {
try {
- sender.send(boltConfig.getEventDataFormat().serialize(tuple));
+ EventData sendEvent = new EventData(boltConfig.getEventDataFormat().serialize(tuple));
+ if (boltConfig.getPartitionMode() && sender!=null) {
+ sender.sendSync(sendEvent);
+ }
+ else if (boltConfig.getPartitionMode() && sender==null) {
+ throw new EventHubException("Sender is null");
+ }
+ else if (!boltConfig.getPartitionMode() && ehClient!=null) {
+ ehClient.sendSync(sendEvent);
+ }
+ else if (!boltConfig.getPartitionMode() && ehClient==null) {
+ throw new EventHubException("ehclient is null");
+ }
collector.ack(tuple);
- } catch (EventHubException ex) {
+ } catch (EventHubException ex ) {
collector.reportError(ex);
collector.fail(tuple);
+ } catch (ServiceBusException e) {
+ collector.reportError(e);
+ collector.fail(tuple);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ if(sender != null) {
+ try {
+ sender.close().whenComplete((voidargs,error)->{
+ try{
+ if(error!=null){
+ logger.error("Exception during sender cleanup phase"+error.toString());
+ }
+ ehClient.closeSync();
+ }catch (Exception e){
+ logger.error("Exception during ehclient cleanup phase"+e.toString());
+ }
+ }).get();
+ } catch (InterruptedException e) {
+ logger.error("Exception occured during cleanup phase"+e.toString());
+ } catch (ExecutionException e) {
+ logger.error("Exception occured during cleanup phase"+e.toString());
+ }
+ logger.info("Eventhub Bolt cleaned up");
+ sender = null;
+ ehClient = null;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java
index 10b4e39..fe2d989 100644
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java
@@ -17,10 +17,12 @@
*******************************************************************************/
package org.apache.storm.eventhubs.bolt;
+import com.microsoft.azure.servicebus.ConnectionStringBuilder;
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+
import java.io.Serializable;
-import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
-import com.microsoft.eventhubs.client.ConnectionStringBuilder;
+import java.io.Serializable;
/*
* EventHubs bolt configurations
@@ -81,8 +83,8 @@ public class EventHubBoltConfig implements Serializable {
public EventHubBoltConfig(String userName, String password, String namespace,
String targetFqnAddress, String entityPath, boolean partitionMode,
IEventDataFormat dataFormat) {
- this.connectionString = new ConnectionStringBuilder(userName, password,
- namespace, targetFqnAddress).getConnectionString();
+ this.connectionString = new ConnectionStringBuilder(namespace,entityPath,
+ userName,password).toString();
this.entityPath = entityPath;
this.partitionMode = partitionMode;
this.dataFormat = dataFormat;
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java
index 7b0d7e5..99d764c 100644
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java
@@ -17,17 +17,17 @@
*******************************************************************************/
package org.apache.storm.eventhubs.spout;
-import org.apache.qpid.amqp_1_0.client.Message;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
-import org.apache.qpid.amqp_1_0.type.messaging.Data;
+import com.microsoft.azure.eventhubs.EventData;
import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
+
/**
* An Event Data Scheme which deserializes message payload into the raw bytes.
*
@@ -37,25 +37,30 @@ import java.util.Map;
*/
public class BinaryEventDataScheme implements IEventDataScheme {
+ private static final Logger logger = LoggerFactory.getLogger(BinaryEventDataScheme.class);
@Override
- public List<Object> deserialize(Message message) {
+ public List<Object> deserialize(EventData eventData){
final List<Object> fieldContents = new ArrayList<Object>();
-
- Map metaDataMap = new HashMap();
- byte[] messageData = new byte[0];
-
- for (Section section : message.getPayload()) {
- if (section instanceof Data) {
- Data data = (Data) section;
- messageData = data.getValue().getArray();
- } else if (section instanceof ApplicationProperties) {
- final ApplicationProperties applicationProperties = (ApplicationProperties) section;
- metaDataMap = applicationProperties.getValue();
+ byte [] messageData = null;
+ if (eventData.getBytes() != null) {
+ messageData = eventData.getBytes();
+ }
+ else if (eventData.getObject()!=null) {
+ try {
+ messageData = SerializeDeserializeUtil.serialize(eventData.getObject());
+ } catch (IOException e) {
+ logger.error("Failed to serialize EventData payload class"
+ + eventData.getObject().getClass());
+ logger.error("Exception encountered while serializing EventData payload is"
+ + e.toString());
+ throw new RuntimeException(e);
}
}
-
+ Map metaDataMap = eventData.getProperties().size() > 0 ? eventData.getProperties() : null;
fieldContents.add(messageData);
- fieldContents.add(metaDataMap);
+ if ( metaDataMap != null ) {
+ fieldContents.add(metaDataMap);
+ }
return fieldContents;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventData.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventData.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventData.java
deleted file mode 100755
index e5834b4..0000000
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventData.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.spout;
-
-import org.apache.qpid.amqp_1_0.client.Message;
-
-public class EventData implements Comparable<EventData> {
- private final Message message;
- private final MessageId messageId;
-
- public EventData(Message message, MessageId messageId) {
- this.message = message;
- this.messageId = messageId;
- }
-
- public static EventData create(Message message, MessageId messageId) {
- return new EventData(message, messageId);
- }
-
- public Message getMessage() {
- return this.message;
- }
-
- public MessageId getMessageId() {
- return this.messageId;
- }
-
- @Override
- public int compareTo(EventData ed) {
- return messageId.getSequenceNumber().
- compareTo(ed.getMessageId().getSequenceNumber());
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
index 90cad0a..a537975 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
@@ -17,18 +17,15 @@
*******************************************************************************/
package org.apache.storm.eventhubs.spout;
+import com.microsoft.azure.eventhubs.EventData;
import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.qpid.amqp_1_0.client.Message;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
-import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
-import org.apache.qpid.amqp_1_0.type.messaging.Data;
-
/**
* An Event Data Scheme which deserializes message payload into the Strings. No
* encoding is assumed. The receiver will need to handle parsing of the string
@@ -44,29 +41,35 @@ import org.apache.qpid.amqp_1_0.type.messaging.Data;
public class EventDataScheme implements IEventDataScheme {
private static final long serialVersionUID = 1L;
-
+ private static final Logger logger = LoggerFactory.getLogger(EventDataScheme.class);
@Override
- public List<Object> deserialize(Message message) {
+ public List<Object> deserialize(EventData eventData) {
final List<Object> fieldContents = new ArrayList<Object>();
-
- Map metaDataMap = new HashMap();
String messageData = "";
-
- for (Section section : message.getPayload()) {
- if (section instanceof Data) {
- Data data = (Data) section;
- messageData = new String(data.getValue().getArray());
- } else if (section instanceof AmqpValue) {
- AmqpValue amqpValue = (AmqpValue) section;
- messageData = amqpValue.getValue().toString();
- } else if (section instanceof ApplicationProperties) {
- final ApplicationProperties applicationProperties = (ApplicationProperties) section;
- metaDataMap = applicationProperties.getValue();
+ if (eventData.getBytes()!=null) {
+ messageData = new String(eventData.getBytes());
+ }
+ /*Will only serialize AMQPValue type*/
+ else if (eventData.getObject()!=null) {
+ try {
+ if (!(eventData.getObject() instanceof List)) {
+ messageData = eventData.getObject().toString();
+ } else {
+ throw new RuntimeException("Cannot serialize the given AMQP type");
+ }
+ } catch (RuntimeException e) {
+ logger.error("Failed to serialize EventData payload class"
+ + eventData.getObject().getClass());
+ logger.error("Exception encountered while serializing EventData payload is"
+ + e.toString());
+ throw e;
}
}
-
+ Map metaDataMap = eventData.getProperties().size() > 0 ? eventData.getProperties() : null;
fieldContents.add(messageData);
- fieldContents.add(metaDataMap);
+ if ( metaDataMap != null ) {
+ fieldContents.add(metaDataMap);
+ }
return fieldContents;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataWrap.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataWrap.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataWrap.java
new file mode 100644
index 0000000..5eeb4d2
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataWrap.java
@@ -0,0 +1,48 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import com.microsoft.azure.eventhubs.EventData;
+
+public class EventDataWrap implements Comparable<EventDataWrap> {
+ private final EventData eventData;
+ private final MessageId messageId;
+
+ public EventDataWrap(EventData eventdata, MessageId messageId) {
+ this.eventData = eventdata;
+ this.messageId = messageId;
+ }
+
+ public static EventDataWrap create(EventData eventData, MessageId messageId) {
+ return new EventDataWrap(eventData, messageId);
+ }
+
+ public EventData getEventData() {
+ return this.eventData;
+ }
+
+ public MessageId getMessageId() {
+ return this.messageId;
+ }
+
+ @Override
+ public int compareTo(EventDataWrap ed) {
+ return messageId.getSequenceNumber().
+ compareTo(ed.getMessageId().getSequenceNumber());
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubException.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubException.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubException.java
new file mode 100644
index 0000000..f81197a
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubException.java
@@ -0,0 +1,41 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+
+package org.apache.storm.eventhubs.spout;
+
+public class EventHubException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ public EventHubException() {
+ super();
+ }
+
+ public EventHubException(String message) {
+ super(message);
+ }
+
+ public EventHubException(Throwable cause) {
+ super(cause);
+ }
+
+ public EventHubException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubFilter.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubFilter.java
new file mode 100644
index 0000000..a375380
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubFilter.java
@@ -0,0 +1,46 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import java.time.Instant;
+
+public class EventHubFilter implements IEventFilter{
+
+ String offset = null;
+ Instant time = null;
+
+ public EventHubFilter(String offset){
+ this.offset = offset;
+ this.time = null;
+ }
+
+ public EventHubFilter(Instant time){
+ this.time = time;
+ this.offset = null;
+ }
+
+ @Override
+ public String getOffset(){
+ return offset;
+ }
+
+ @Override
+ public Instant getTime(){
+ return time;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
index 5f9acbd..4949024 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
@@ -17,38 +17,31 @@
*******************************************************************************/
package org.apache.storm.eventhubs.spout;
-import org.apache.qpid.amqp_1_0.client.Message;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import com.microsoft.azure.eventhubs.EventData;
+import com.microsoft.azure.eventhubs.EventHubClient;
+import com.microsoft.azure.eventhubs.PartitionReceiver;
+import com.microsoft.azure.servicebus.ServiceBusException;
import org.apache.storm.metric.api.CountMetric;
import org.apache.storm.metric.api.MeanReducer;
import org.apache.storm.metric.api.ReducedMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import com.microsoft.eventhubs.client.Constants;
-import com.microsoft.eventhubs.client.EventHubException;
-import com.microsoft.eventhubs.client.IEventHubFilter;
-import com.microsoft.eventhubs.client.ResilientEventHubReceiver;
-
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
-
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.Symbol;
-import org.apache.qpid.amqp_1_0.type.messaging.MessageAnnotations;
+import java.util.concurrent.ExecutionException;
public class EventHubReceiverImpl implements IEventHubReceiver {
private static final Logger logger = LoggerFactory.getLogger(EventHubReceiverImpl.class);
- private static final Symbol OffsetKey = Symbol.valueOf(Constants.OffsetKey);
- private static final Symbol SequenceNumberKey = Symbol.valueOf(Constants.SequenceNumberKey);
private final String connectionString;
private final String entityName;
private final String partitionId;
- private final int defaultCredits;
private final String consumerGroupName;
- private ResilientEventHubReceiver receiver;
+ private PartitionReceiver receiver;
+ private EventHubClient ehClient;
private ReducedMetric receiveApiLatencyMean;
private CountMetric receiveApiCallCount;
private CountMetric receiveMessageCount;
@@ -56,7 +49,6 @@ public class EventHubReceiverImpl implements IEventHubReceiver {
public EventHubReceiverImpl(EventHubSpoutConfig config, String partitionId) {
this.connectionString = config.getConnectionString();
this.entityName = config.getEntityPath();
- this.defaultCredits = config.getReceiverCredits();
this.partitionId = partitionId;
this.consumerGroupName = config.getConsumerGroupName();
receiveApiLatencyMean = new ReducedMetric(new MeanReducer());
@@ -65,14 +57,41 @@ public class EventHubReceiverImpl implements IEventHubReceiver {
}
@Override
- public void open(IEventHubFilter filter) throws EventHubException {
- logger.info("creating eventhub receiver: partitionId=" + partitionId +
- ", filterString=" + filter.getFilterString());
+ public void open(IEventFilter filter) throws EventHubException {
+ logger.info("creating eventhub receiver: partitionId=" + partitionId +
+ ", filter=" + filter.getOffset() != null ?
+ filter.getOffset() : Long.toString(filter.getTime().toEpochMilli()));
long start = System.currentTimeMillis();
- receiver = new ResilientEventHubReceiver(connectionString, entityName,
- partitionId, consumerGroupName, defaultCredits, filter);
- receiver.initialize();
-
+ try {
+ ehClient = EventHubClient.createFromConnectionStringSync(connectionString);
+
+ if (filter.getOffset()!=null) {
+ receiver = ehClient.createEpochReceiverSync(
+ consumerGroupName,
+ partitionId,
+ filter.getOffset(),
+ false,
+ 1);
+ }
+ else if (filter.getTime()!=null) {
+ receiver = ehClient.createEpochReceiverSync(
+ consumerGroupName,
+ partitionId,
+ filter.getTime(),
+ 1);
+ }
+ else{
+ throw new RuntimeException("Eventhub receiver must have " +
+ "an offset or time to be created");
+ }
+ } catch (IOException e) {
+ logger.error("Exception in creating ehclient"+ e.toString());
+ throw new EventHubException(e);
+ }
+ catch (ServiceBusException e) {
+ logger.error("Exception in creating Receiver"+e.toString());
+ throw new EventHubException(e);
+ }
long end = System.currentTimeMillis();
logger.info("created eventhub receiver, time taken(ms): " + (end-start));
}
@@ -80,62 +99,60 @@ public class EventHubReceiverImpl implements IEventHubReceiver {
@Override
public void close() {
if(receiver != null) {
- receiver.close();
+ try {
+ receiver.close().whenComplete((voidargs,error)->{
+ try {
+ if (error!=null) {
+ logger.error("Exception during receiver close phase"+error.toString());
+ }
+ ehClient.closeSync();
+ } catch (Exception e) {
+ logger.error("Exception during ehclient close phase"+e.toString());
+ }
+ }).get();
+ } catch (InterruptedException e) {
+ logger.error("Exception occured during close phase"+e.toString());
+ } catch (ExecutionException e) {
+ logger.error("Exception occured during close phase"+e.toString());
+ }
logger.info("closed eventhub receiver: partitionId=" + partitionId );
receiver = null;
+ ehClient = null;
}
}
-
+
+
@Override
public boolean isOpen() {
return (receiver != null);
}
@Override
- public EventData receive(long timeoutInMilliseconds) {
+ public EventDataWrap receive() {
long start = System.currentTimeMillis();
- Message message = receiver.receive(timeoutInMilliseconds);
+ Iterable<EventData> receivedEvents=null;
+ /*Get one message at a time for backward compatibility behaviour*/
+ try {
+ receivedEvents = receiver.receiveSync(1);
+ } catch (ServiceBusException e) {
+ logger.error("Exception occured during receive"+e.toString());
+ return null;
+ }
long end = System.currentTimeMillis();
long millis = (end - start);
receiveApiLatencyMean.update(millis);
receiveApiCallCount.incr();
-
- if (message == null) {
- //Temporary workaround for AMQP/EH bug of failing to receive messages
- /*if(timeoutInMilliseconds > 100 && millis < timeoutInMilliseconds/2) {
- throw new RuntimeException(
- "Restart EventHubSpout due to failure of receiving messages in "
- + millis + " millisecond");
- }*/
+
+ if (receivedEvents == null || receivedEvents.spliterator().getExactSizeIfKnown() == 0) {
return null;
}
-
receiveMessageCount.incr();
+ EventData receivedEvent = receivedEvents.iterator().next();
+ MessageId messageId = new MessageId(partitionId,
+ receivedEvent.getSystemProperties().getOffset(),
+ receivedEvent.getSystemProperties().getSequenceNumber());
- MessageId messageId = createMessageId(message);
- return EventData.create(message, messageId);
- }
-
- private MessageId createMessageId(Message message) {
- String offset = null;
- long sequenceNumber = 0;
-
- for (Section section : message.getPayload()) {
- if (section instanceof MessageAnnotations) {
- MessageAnnotations annotations = (MessageAnnotations) section;
- HashMap annonationMap = (HashMap) annotations.getValue();
-
- if (annonationMap.containsKey(OffsetKey)) {
- offset = (String) annonationMap.get(OffsetKey);
- }
-
- if (annonationMap.containsKey(SequenceNumberKey)) {
- sequenceNumber = (Long) annonationMap.get(SequenceNumberKey);
- }
- }
- }
-
- return MessageId.create(partitionId, offset, sequenceNumber);
+ return EventDataWrap.create(receivedEvent,messageId);
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java
index 6adef42..23a7453 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java
@@ -24,16 +24,14 @@ import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import org.apache.qpid.amqp_1_0.client.Message;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
public class EventHubSpout extends BaseRichSpout {
private static final Logger logger = LoggerFactory.getLogger(EventHubSpout.class);
@@ -138,7 +136,7 @@ public class EventHubSpout extends BaseRichSpout {
@Override
public void open(Map config, TopologyContext context, SpoutOutputCollector collector) {
- logger.info("begin: open()");
+ logger.info("begin:start open()");
String topologyName = (String) config.get(Config.TOPOLOGY_NAME);
eventHubConfig.setTopologyName(topologyName);
@@ -174,7 +172,7 @@ public class EventHubSpout extends BaseRichSpout {
@Override
public void nextTuple() {
- EventData eventData = null;
+ EventDataWrap eventDatawrap = null;
List<IPartitionManager> partitionManagers = partitionCoordinator.getMyPartitionManagers();
for (int i = 0; i < partitionManagers.size(); i++) {
@@ -185,20 +183,16 @@ public class EventHubSpout extends BaseRichSpout {
throw new RuntimeException("partitionManager doesn't exist.");
}
- eventData = partitionManager.receive();
+ eventDatawrap = partitionManager.receive();
- if (eventData != null) {
+ if (eventDatawrap != null) {
break;
}
}
-
- if (eventData != null) {
- MessageId messageId = eventData.getMessageId();
- Message message = eventData.getMessage();
-
- List<Object> tuples = scheme.deserialize(message);
-
+ if (eventDatawrap != null) {
+ MessageId messageId = eventDatawrap.getMessageId();
+ List<Object> tuples = scheme.deserialize(eventDatawrap.getEventData());
if (tuples != null) {
collector.emit(tuples, messageId);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
index e06953a..5556970 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
@@ -17,10 +17,12 @@
*******************************************************************************/
package org.apache.storm.eventhubs.spout;
+import com.microsoft.azure.eventhubs.EventHubClient;
+import com.microsoft.azure.servicebus.ConnectionStringBuilder;
+
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
-import com.microsoft.eventhubs.client.ConnectionStringBuilder;
public class EventHubSpoutConfig implements Serializable {
private static final long serialVersionUID = 1L;
@@ -42,8 +44,7 @@ public class EventHubSpoutConfig implements Serializable {
private String connectionString;
private String topologyName;
private IEventDataScheme scheme = new StringEventDataScheme();
- private String consumerGroupName = null; // if null then use default
- // consumer group
+ private String consumerGroupName = EventHubClient.DEFAULT_CONSUMER_GROUP_NAME;
private String outputStreamId;
@@ -52,8 +53,8 @@ public class EventHubSpoutConfig implements Serializable {
String namespace, String entityPath, int partitionCount) {
this.userName = username;
this.password = password;
- this.connectionString = new ConnectionStringBuilder(username, password,
- namespace).getConnectionString();
+ this.connectionString = new ConnectionStringBuilder(namespace,entityPath,
+ username,password).toString();
this.namespace = namespace;
this.entityPath = entityPath;
this.partitionCount = partitionCount;
@@ -232,9 +233,12 @@ public class EventHubSpoutConfig implements Serializable {
return connectionString;
}
+ /*Keeping it for backward compatibility*/
public void setTargetAddress(String targetFqnAddress) {
- this.connectionString = new ConnectionStringBuilder(userName, password,
- namespace, targetFqnAddress).getConnectionString();
+ }
+
+ public void setTargetAddress(){
+
}
public EventHubSpoutConfig withTargetAddress(String targetFqnAddress) {
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutException.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutException.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutException.java
deleted file mode 100755
index 0fd6ac4..0000000
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutException.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.storm.eventhubs.spout;
-
-public class EventHubSpoutException extends Exception {
-
- public EventHubSpoutException() {
- super();
- }
-
- public EventHubSpoutException(String message) {
- super(message);
- }
-
- public EventHubSpoutException(Throwable cause) {
- super(cause);
- }
-
- public EventHubSpoutException(String message, Throwable cause) {
- super(message, cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java
index b238391..079d1da 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java
@@ -23,4 +23,5 @@ public class FieldConstants {
public static final String Offset = "offset";
public static final String Message = "message";
public static final String META_DATA = "metadata";
+ public static final String DefaultStartingOffset = "-1";
}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java
index b8101b9..6c78524 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java
@@ -17,10 +17,11 @@
*******************************************************************************/
package org.apache.storm.eventhubs.spout;
+import com.microsoft.azure.eventhubs.EventData;
import org.apache.storm.tuple.Fields;
+
import java.io.Serializable;
import java.util.List;
-import org.apache.qpid.amqp_1_0.client.Message;
public interface IEventDataScheme extends Serializable {
@@ -29,10 +30,10 @@ public interface IEventDataScheme extends Serializable {
*
* @see #getOutputFields() for the list of fields the tuple will contain.
*
- * @param message The Message to Deserialize.
+ * @param eventData The EventData to Deserialize.
* @return A tuple containing the deserialized fields of the message.
*/
- List<Object> deserialize(Message message);
+ List<Object> deserialize(EventData eventData);
/**
* Retrieve the Fields that are present on tuples created by this object.
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventFilter.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventFilter.java
new file mode 100644
index 0000000..c67823d
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventFilter.java
@@ -0,0 +1,28 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+
+package org.apache.storm.eventhubs.spout;
+
+import java.time.Instant;
+
+public interface IEventFilter {
+
+ String getOffset();
+
+ Instant getTime();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java
index bc2db14..4ae78c5 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java
@@ -19,18 +19,15 @@ package org.apache.storm.eventhubs.spout;
import java.util.Map;
-import com.microsoft.eventhubs.client.EventHubException;
-import com.microsoft.eventhubs.client.IEventHubFilter;
-
public interface IEventHubReceiver {
- void open(IEventHubFilter filter) throws EventHubException;
+ void open(IEventFilter filter) throws EventHubException;
void close();
-
+
boolean isOpen();
- EventData receive(long timeoutInMilliseconds);
-
+ EventDataWrap receive();
+
Map getMetricsData();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java
index ac986d3..845f508 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java
@@ -25,7 +25,7 @@ public interface IPartitionManager {
void close();
- EventData receive();
+ EventDataWrap receive();
void checkpoint();
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/PartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/PartitionManager.java
index 1054742..20e021a 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/PartitionManager.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/PartitionManager.java
@@ -29,9 +29,9 @@ public class PartitionManager extends SimplePartitionManager {
private final int ehReceiveTimeoutMs = 5000;
//all sent events are stored in pending
- private final Map<String, EventData> pending;
+ private final Map<String, EventDataWrap> pending;
//all failed events are put in toResend, which is sorted by event's offset
- private final TreeSet<EventData> toResend;
+ private final TreeSet<EventDataWrap> toResend;
public PartitionManager(
EventHubSpoutConfig spoutConfig,
@@ -41,29 +41,29 @@ public class PartitionManager extends SimplePartitionManager {
super(spoutConfig, partitionId, stateStore, receiver);
- this.pending = new LinkedHashMap<String, EventData>();
- this.toResend = new TreeSet<EventData>();
+ this.pending = new LinkedHashMap<String, EventDataWrap>();
+ this.toResend = new TreeSet<EventDataWrap>();
}
@Override
- public EventData receive() {
+ public EventDataWrap receive() {
if(pending.size() >= config.getMaxPendingMsgsPerPartition()) {
return null;
}
- EventData eventData;
+ EventDataWrap eventDatawrap;
if (toResend.isEmpty()) {
- eventData = receiver.receive(ehReceiveTimeoutMs);
+ eventDatawrap = receiver.receive();
} else {
- eventData = toResend.pollFirst();
+ eventDatawrap = toResend.pollFirst();
}
- if (eventData != null) {
- lastOffset = eventData.getMessageId().getOffset();
- pending.put(lastOffset, eventData);
+ if (eventDatawrap != null) {
+ lastOffset = eventDatawrap.getMessageId().getOffset();
+ pending.put(lastOffset, eventDatawrap);
}
- return eventData;
+ return eventDatawrap;
}
@Override
@@ -74,8 +74,8 @@ public class PartitionManager extends SimplePartitionManager {
@Override
public void fail(String offset) {
logger.warn("fail on " + offset);
- EventData eventData = pending.remove(offset);
- toResend.add(eventData);
+ EventDataWrap eventDataWrap = pending.remove(offset);
+ toResend.add(eventDataWrap);
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SerializeDeserializeUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SerializeDeserializeUtil.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SerializeDeserializeUtil.java
new file mode 100644
index 0000000..0439a4f
--- /dev/null
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SerializeDeserializeUtil.java
@@ -0,0 +1,34 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+
+package org.apache.storm.eventhubs.spout;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+
+public class SerializeDeserializeUtil {
+ public static byte[] serialize(Object obj) throws IOException {
+ try (ByteArrayOutputStream b = new ByteArrayOutputStream()) {
+ try (ObjectOutputStream o = new ObjectOutputStream(b)) {
+ o.writeObject(obj);
+ }
+ return b.toByteArray();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java
index b66a785..b76b342 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java
@@ -17,16 +17,11 @@
*******************************************************************************/
package org.apache.storm.eventhubs.spout;
-import java.util.Map;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.time.Instant;
-import com.microsoft.eventhubs.client.Constants;
-import com.microsoft.eventhubs.client.EventHubEnqueueTimeFilter;
-import com.microsoft.eventhubs.client.EventHubOffsetFilter;
-import com.microsoft.eventhubs.client.IEventHubFilter;
-
+import java.util.Map;
/**
* A simple partition manager that does not re-send failed messages
*/
@@ -62,16 +57,15 @@ public class SimplePartitionManager implements IPartitionManager {
String offset = stateStore.readData(statePath);
logger.info("read offset from state store: " + offset);
if(offset == null) {
- offset = Constants.DefaultStartingOffset;
+ offset = FieldConstants.DefaultStartingOffset;
}
-
- IEventHubFilter filter;
- if (offset.equals(Constants.DefaultStartingOffset)
+ IEventFilter filter;
+ if (offset.equals(FieldConstants.DefaultStartingOffset)
&& config.getEnqueueTimeFilter() != 0) {
- filter = new EventHubEnqueueTimeFilter(config.getEnqueueTimeFilter());
+ filter = new EventHubFilter(Instant.ofEpochMilli(config.getEnqueueTimeFilter()));
}
- else {
- filter = new EventHubOffsetFilter(offset);
+ else{
+ filter = new EventHubFilter(offset);
}
receiver.open(filter);
@@ -98,12 +92,12 @@ public class SimplePartitionManager implements IPartitionManager {
}
@Override
- public EventData receive() {
- EventData eventData = receiver.receive(5000);
- if (eventData != null) {
- lastOffset = eventData.getMessageId().getOffset();
+ public EventDataWrap receive() {
+ EventDataWrap eventDatawrap = receiver.receive();
+ if (eventDatawrap != null) {
+ lastOffset = eventDatawrap.getEventData().getSystemProperties().getOffset();
}
- return eventData;
+ return eventDatawrap;
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StaticPartitionCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StaticPartitionCoordinator.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StaticPartitionCoordinator.java
index 8d2c485..abbecac 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StaticPartitionCoordinator.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StaticPartitionCoordinator.java
@@ -25,7 +25,6 @@ import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.microsoft.eventhubs.client.Constants;
public class StaticPartitionCoordinator implements IPartitionCoordinator {
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java
index 0c6f8b6..a2024bd 100644
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java
@@ -17,19 +17,13 @@
*******************************************************************************/
package org.apache.storm.eventhubs.spout;
+import com.microsoft.azure.eventhubs.EventData;
import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-
-import org.apache.qpid.amqp_1_0.client.Message;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
-import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
-import org.apache.qpid.amqp_1_0.type.messaging.Data;
-import org.apache.storm.tuple.Fields;
/**
* An Event Data Scheme which deserializes message payload into the Strings.
@@ -44,21 +38,32 @@ import org.apache.storm.tuple.Fields;
public class StringEventDataScheme implements IEventDataScheme {
private static final long serialVersionUID = 1L;
+ private static final Logger logger = LoggerFactory.getLogger(StringEventDataScheme.class);
@Override
- public List<Object> deserialize(Message message) {
+ public List<Object> deserialize(EventData eventData) {
final List<Object> fieldContents = new ArrayList<Object>();
-
- for (Section section : message.getPayload()) {
- if (section instanceof Data) {
- Data data = (Data) section;
- fieldContents.add(new String(data.getValue().getArray()));
- } else if (section instanceof AmqpValue) {
- AmqpValue amqpValue = (AmqpValue) section;
- fieldContents.add(amqpValue.getValue().toString());
+ String messageData = "";
+ if (eventData.getBytes()!=null) {
+ messageData = new String(eventData.getBytes());
+ }
+ /*Will only serialize AMQPValue type*/
+ else if (eventData.getObject()!=null) {
+ try {
+ if (!(eventData.getObject() instanceof List)) {
+ messageData = eventData.getObject().toString();
+ } else {
+ throw new RuntimeException("Cannot serialize the given AMQP type.");
+ }
+ } catch (RuntimeException e){
+ logger.error("Failed to serialize EventData payload class"
+ + eventData.getObject().getClass());
+ logger.error("Exception encountered while serializing EventData payload is"
+ + e.toString());
+ throw e;
}
}
-
+ fieldContents.add(messageData);
return fieldContents;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java
index fbe779d..069d819 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java
@@ -17,9 +17,9 @@
*******************************************************************************/
package org.apache.storm.eventhubs.trident;
-import java.util.List;
+import org.apache.storm.eventhubs.spout.EventDataWrap;
-import org.apache.storm.eventhubs.spout.EventData;
+import java.util.List;
public interface ITridentPartitionManager {
boolean open(String offset);
@@ -31,5 +31,5 @@ public interface ITridentPartitionManager {
* @param count max number of messages in this batch
* @return list of EventData, if failed to receive, return empty list
*/
- public List<EventData> receiveBatch(String offset, int count);
+ public List<EventDataWrap> receiveBatch(String offset, int count);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java
index e5c1c50..b78fcd3 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java
@@ -17,24 +17,16 @@
*******************************************************************************/
package org.apache.storm.eventhubs.trident;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.storm.eventhubs.spout.EventData;
-import org.apache.storm.eventhubs.spout.EventHubReceiverImpl;
-import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
-import org.apache.storm.eventhubs.spout.IEventHubReceiver;
-import org.apache.storm.eventhubs.spout.IEventHubReceiverFactory;
-import com.microsoft.eventhubs.client.Constants;
-
+import org.apache.storm.eventhubs.spout.*;
import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
import org.apache.storm.trident.spout.IPartitionedTridentSpout;
import org.apache.storm.trident.topology.TransactionAttempt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
public class TransactionalTridentEventHubEmitter
@@ -111,15 +103,15 @@ public class TransactionalTridentEventHubEmitter
int count = Integer.parseInt((String)meta.get("count"));
logger.info("re-emit for partition " + partition.getId() + ", offset=" + offset + ", count=" + count);
ITridentPartitionManager pm = getOrCreatePartitionManager(partition);
- List<EventData> listEvents = pm.receiveBatch(offset, count);
+ List<EventDataWrap> listEvents = pm.receiveBatch(offset, count);
if(listEvents.size() != count) {
logger.error("failed to refetch eventhub messages, new count=" + listEvents.size());
return;
}
- for(EventData ed: listEvents) {
+ for(EventDataWrap ed: listEvents) {
List<Object> tuples =
- spoutConfig.getEventDataScheme().deserialize(ed.getMessage());
+ spoutConfig.getEventDataScheme().deserialize(ed.getEventData());
collector.emit(tuples);
}
}
@@ -128,20 +120,20 @@ public class TransactionalTridentEventHubEmitter
public Map emitPartitionBatchNew(TransactionAttempt attempt,
TridentCollector collector, Partition partition, Map meta) {
ITridentPartitionManager pm = getOrCreatePartitionManager(partition);
- String offset = Constants.DefaultStartingOffset;
+ String offset = FieldConstants.DefaultStartingOffset;
if(meta != null && meta.containsKey("nextOffset")) {
offset = (String)meta.get("nextOffset");
}
//logger.info("emit for partition " + partition.getId() + ", offset=" + offset);
String nextOffset = offset;
- List<EventData> listEvents = pm.receiveBatch(offset, batchSize);
+ List<EventDataWrap> listEvents = pm.receiveBatch(offset, batchSize);
- for(EventData ed: listEvents) {
+ for(EventDataWrap ed: listEvents) {
//update nextOffset;
nextOffset = ed.getMessageId().getOffset();
List<Object> tuples =
- spoutConfig.getEventDataScheme().deserialize(ed.getMessage());
+ spoutConfig.getEventDataScheme().deserialize(ed.getEventData());
collector.emit(tuples);
}
//logger.info("emitted new batches: " + listEvents.size());
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java
index 159fe41..a384667 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java
@@ -17,27 +17,20 @@
*******************************************************************************/
package org.apache.storm.eventhubs.trident;
-import java.util.ArrayList;
-import java.util.List;
-
+import org.apache.storm.eventhubs.spout.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.microsoft.eventhubs.client.Constants;
-import com.microsoft.eventhubs.client.EventHubEnqueueTimeFilter;
-import com.microsoft.eventhubs.client.EventHubException;
-import com.microsoft.eventhubs.client.EventHubOffsetFilter;
-
-import org.apache.storm.eventhubs.spout.EventData;
-import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
-import org.apache.storm.eventhubs.spout.IEventHubReceiver;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
public class TridentPartitionManager implements ITridentPartitionManager {
private static final Logger logger = LoggerFactory.getLogger(TridentPartitionManager.class);
private final int receiveTimeoutMs = 5000;
private final IEventHubReceiver receiver;
private final EventHubSpoutConfig spoutConfig;
- private String lastOffset = Constants.DefaultStartingOffset;
+ private String lastOffset = FieldConstants.DefaultStartingOffset;
public TridentPartitionManager(EventHubSpoutConfig spoutConfig, IEventHubReceiver receiver) {
this.receiver = receiver;
@@ -47,12 +40,12 @@ public class TridentPartitionManager implements ITridentPartitionManager {
@Override
public boolean open(String offset) {
try {
- if((offset == null || offset.equals(Constants.DefaultStartingOffset))
+ if((offset == null || offset.equals(FieldConstants.DefaultStartingOffset))
&& spoutConfig.getEnqueueTimeFilter() != 0) {
- receiver.open(new EventHubEnqueueTimeFilter(spoutConfig.getEnqueueTimeFilter()));
+ receiver.open(new EventHubFilter(Instant.ofEpochMilli(spoutConfig.getEnqueueTimeFilter())));
}
else {
- receiver.open(new EventHubOffsetFilter(offset));
+ receiver.open(new EventHubFilter(offset));
}
lastOffset = offset;
return true;
@@ -69,8 +62,8 @@ public class TridentPartitionManager implements ITridentPartitionManager {
}
@Override
- public List<EventData> receiveBatch(String offset, int count) {
- List<EventData> batch = new ArrayList<EventData>(count);
+ public List<EventDataWrap> receiveBatch(String offset, int count) {
+ List<EventDataWrap> batch = new ArrayList<EventDataWrap>(count);
if(!offset.equals(lastOffset) || !receiver.isOpen()) {
//re-establish connection to eventhub servers using the right offset
//TBD: might be optimized with cache.
@@ -81,7 +74,7 @@ public class TridentPartitionManager implements ITridentPartitionManager {
}
for(int i=0; i<count; ++i) {
- EventData ed = receiver.receive(receiveTimeoutMs);
+ EventDataWrap ed = receiver.receive();
if(ed == null) {
break;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubReceiverMock.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubReceiverMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubReceiverMock.java
index b176598..7538d4b 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubReceiverMock.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubReceiverMock.java
@@ -17,22 +17,9 @@
*******************************************************************************/
package org.apache.storm.eventhubs.spout;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.storm.eventhubs.spout.MessageId;
-import org.apache.storm.eventhubs.spout.EventData;
-import org.apache.storm.eventhubs.spout.IEventHubReceiver;
-import org.apache.qpid.amqp_1_0.client.Message;
-import org.apache.qpid.amqp_1_0.jms.impl.TextMessageImpl;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.messaging.Data;
+import com.microsoft.azure.eventhubs.EventData;
-import com.microsoft.eventhubs.client.EventHubException;
-import com.microsoft.eventhubs.client.EventHubOffsetFilter;
-import com.microsoft.eventhubs.client.IEventHubFilter;
+import java.util.Map;
/**
* A mock receiver that emits fake data with offset starting from given offset
@@ -59,8 +46,10 @@ public class EventHubReceiverMock implements IEventHubReceiver {
}
@Override
- public void open(IEventHubFilter filter) throws EventHubException {
- currentOffset = Long.parseLong(filter.getFilterValue());
+ public void open(IEventFilter filter) throws EventHubException {
+ currentOffset = filter.getOffset() != null ?
+ Long.parseLong(filter.getOffset()) :
+ filter.getTime().toEpochMilli();
isOpen = true;
}
@@ -68,26 +57,25 @@ public class EventHubReceiverMock implements IEventHubReceiver {
public void close() {
isOpen = false;
}
-
+
@Override
public boolean isOpen() {
return isOpen;
}
@Override
- public EventData receive(long timeoutInMilliseconds) {
+ public EventDataWrap receive() {
if(isPaused) {
return null;
}
currentOffset++;
- List<Section> body = new ArrayList<Section>();
+
//the body of the message is "message" + currentOffset, e.g. "message123"
- body.add(new Data(new Binary(("message" + currentOffset).getBytes())));
- Message m = new Message(body);
+
MessageId mid = new MessageId(partitionId, "" + currentOffset, currentOffset);
- EventData ed = new EventData(m, mid);
- return ed;
+ EventData ed = new EventData(("message" + currentOffset).getBytes());
+ return EventDataWrap.create(ed,mid);
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/PartitionManagerCallerMock.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/PartitionManagerCallerMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/PartitionManagerCallerMock.java
index dd63d5d..467461c 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/PartitionManagerCallerMock.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/PartitionManagerCallerMock.java
@@ -17,10 +17,6 @@
*******************************************************************************/
package org.apache.storm.eventhubs.spout;
-import org.apache.storm.eventhubs.spout.PartitionManager;
-import org.apache.storm.eventhubs.spout.EventData;
-import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
-
/**
* This mock exercises PartitionManager
*/
@@ -71,7 +67,7 @@ public class PartitionManagerCallerMock {
count = Integer.parseInt(cmd.substring(1));
}
for(int i=0; i<count; ++i) {
- EventData ed = pm.receive();
+ EventDataWrap ed = pm.receive();
if(ed == null) {
ret.append("null,");
}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java
index aa8d097..f260dea 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java
@@ -17,12 +17,12 @@
*******************************************************************************/
package org.apache.storm.eventhubs.spout;
-import static org.junit.Assert.*;
-
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+
public class TestEventData {
@Before
@@ -37,10 +37,10 @@ public class TestEventData {
public void testEventDataComparision() {
MessageId messageId1 = MessageId.create(null, "3", 1);
- EventData eventData1 = EventData.create(null, messageId1);
+ EventDataWrap eventData1 = EventDataWrap.create(null, messageId1);
MessageId messageId2 = MessageId.create(null, "13", 2);
- EventData eventData2 = EventData.create(null, messageId2);
+ EventDataWrap eventData2 = EventDataWrap.create(null, messageId2);
assertTrue(eventData2.compareTo(eventData1) > 0);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java
index 49e544b..a7b3588 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java
@@ -17,12 +17,13 @@
*******************************************************************************/
package org.apache.storm.eventhubs.spout;
-import static org.junit.Assert.*;
-
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+
+
public class TestEventHubSpout {
@@ -40,7 +41,7 @@ public class TestEventHubSpout {
"namespace", "entityname", 16);
conf.setZkConnectionString("zookeeper");
conf.setCheckpointIntervalInSeconds(1);
- assertEquals(conf.getConnectionString(), "amqps://username:pas%5Cs%2Bw%2Ford@namespace.servicebus.windows.net");
+ assertEquals(conf.getConnectionString(), "Endpoint=amqps://namespace.servicebus.windows.net;EntityPath=entityname;SharedAccessKeyName=username;SharedAccessKey=pas\\s+w/ord;OperationTimeout=PT1M;RetryPolicy=Default");
}
@Test
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb9136b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1281171..d63907c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -306,7 +306,7 @@
<mavenVersion>3.1.0</mavenVersion>
<wagonVersion>1.0</wagonVersion>
<qpid.version>0.32</qpid.version>
- <eventhubs.client.version>1.0.1</eventhubs.client.version>
+ <azure-eventhubs.version>0.13.1</azure-eventhubs.version>
<jersey.version>2.24.1</jersey.version>
<!-- see intellij profile below... This fixes an annoyance with intellij -->
[3/3] storm git commit: Added STORM-2371 to CHANGELOG.
Posted by sr...@apache.org.
Added STORM-2371 to CHANGELOG.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2307b802
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2307b802
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2307b802
Branch: refs/heads/master
Commit: 2307b8023c947a408aac6d522d18e13434c0f57e
Parents: 940096a
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Tue Apr 18 14:08:22 2017 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Tue Apr 18 14:08:22 2017 -0700
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/2307b802/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 116c9b3..5f9d118 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
\ufeff## 2.0.0
+ * STORM-2371: Replace existing AMQP eventhub client with the lastest one from Microsoft eventhubs
* STORM-2475: Fix parsing of host:port to deal with IPv6 addresses
* STORM-832: Allow config validation to be used by plugins/etc.
* STORM-2471: Add metric for thread count
[2/3] storm git commit: Merge branch 'eventhub3' of
https://github.com/rban1/storm
Posted by sr...@apache.org.
Merge branch 'eventhub3' of https://github.com/rban1/storm
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/940096a2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/940096a2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/940096a2
Branch: refs/heads/master
Commit: 940096a275562a25eccf5da7a6b048702b1be788
Parents: f794d72 fdb9136
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Tue Apr 18 14:02:58 2017 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Tue Apr 18 14:02:58 2017 -0700
----------------------------------------------------------------------
external/storm-eventhubs/pom.xml | 29 +---
.../storm/eventhubs/bolt/EventHubBolt.java | 82 ++++++++---
.../eventhubs/bolt/EventHubBoltConfig.java | 10 +-
.../eventhubs/spout/BinaryEventDataScheme.java | 43 +++---
.../apache/storm/eventhubs/spout/EventData.java | 48 -------
.../storm/eventhubs/spout/EventDataScheme.java | 51 +++----
.../storm/eventhubs/spout/EventDataWrap.java | 48 +++++++
.../eventhubs/spout/EventHubException.java | 41 ++++++
.../storm/eventhubs/spout/EventHubFilter.java | 46 ++++++
.../eventhubs/spout/EventHubReceiverImpl.java | 141 +++++++++++--------
.../storm/eventhubs/spout/EventHubSpout.java | 24 ++--
.../eventhubs/spout/EventHubSpoutConfig.java | 18 ++-
.../eventhubs/spout/EventHubSpoutException.java | 37 -----
.../storm/eventhubs/spout/FieldConstants.java | 1 +
.../storm/eventhubs/spout/IEventDataScheme.java | 7 +-
.../storm/eventhubs/spout/IEventFilter.java | 28 ++++
.../eventhubs/spout/IEventHubReceiver.java | 11 +-
.../eventhubs/spout/IPartitionManager.java | 2 +-
.../storm/eventhubs/spout/PartitionManager.java | 28 ++--
.../spout/SerializeDeserializeUtil.java | 34 +++++
.../eventhubs/spout/SimplePartitionManager.java | 32 ++---
.../spout/StaticPartitionCoordinator.java | 1 -
.../eventhubs/spout/StringEventDataScheme.java | 43 +++---
.../trident/ITridentPartitionManager.java | 6 +-
.../TransactionalTridentEventHubEmitter.java | 36 ++---
.../trident/TridentPartitionManager.java | 29 ++--
.../eventhubs/spout/EventHubReceiverMock.java | 36 ++---
.../spout/PartitionManagerCallerMock.java | 6 +-
.../storm/eventhubs/spout/TestEventData.java | 8 +-
.../eventhubs/spout/TestEventHubSpout.java | 7 +-
pom.xml | 2 +-
31 files changed, 531 insertions(+), 404 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/940096a2/external/storm-eventhubs/pom.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/940096a2/pom.xml
----------------------------------------------------------------------