You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/10/27 05:26:34 UTC
[2/2] samza git commit: SAMZA-1438;
SystemProducer, Consumer and Admin interfaces for EventHubs
SAMZA-1438; SystemProducer, Consumer and Admin interfaces for EventHubs
Author: Daniel Chen <29...@users.noreply.github.com>
Reviewers: Jagadish <ja...@apache.org>, Prateek M<pm...@linkedin.com>
Closes #308 from dxichen/eventHub-Connector
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/663b8daf
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/663b8daf
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/663b8daf
Branch: refs/heads/master
Commit: 663b8daffa56e6a5ca1e00641c6566304d8c832d
Parents: 8b21209
Author: Daniel Chen <29...@users.noreply.github.com>
Authored: Thu Oct 26 22:26:29 2017 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Thu Oct 26 22:26:29 2017 -0700
----------------------------------------------------------------------
build.gradle | 11 +
.../system/eventhub/EventHubClientManager.java | 69 ++++
.../eventhub/EventHubClientManagerFactory.java | 32 ++
.../samza/system/eventhub/EventHubConfig.java | 181 +++++++++
.../system/eventhub/EventHubSystemFactory.java | 65 +++
.../samza/system/eventhub/Interceptor.java | 34 ++
.../eventhub/SamzaEventHubClientManager.java | 104 +++++
.../eventhub/admin/EventHubSystemAdmin.java | 199 +++++++++
.../eventhub/admin/PassThroughInterceptor.java | 33 ++
.../EventHubIncomingMessageEnvelope.java | 42 ++
.../consumer/EventHubSystemConsumer.java | 401 +++++++++++++++++++
.../system/eventhub/metrics/SamzaHistogram.java | 62 +++
.../producer/EventHubSystemProducer.java | 345 ++++++++++++++++
.../samza/system/eventhub/MockEventData.java | 57 +++
.../MockEventHubClientManagerFactory.java | 196 +++++++++
.../eventhub/MockEventHubConfigFactory.java | 61 +++
.../system/eventhub/TestMetricsRegistry.java | 85 ++++
.../eventhub/admin/TestEventHubSystemAdmin.java | 113 ++++++
.../consumer/ITestEventHubSystemConsumer.java | 76 ++++
.../consumer/TestEventHubSystemConsumer.java | 272 +++++++++++++
.../producer/ITestEventHubSystemProducer.java | 163 ++++++++
.../producer/TestEventHubSystemProducer.java | 153 +++++++
22 files changed, 2754 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/663b8daf/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 3a6f9b6..d9025c4 100644
--- a/build.gradle
+++ b/build.gradle
@@ -192,16 +192,27 @@ project(':samza-azure') {
dependencies {
compile "com.microsoft.azure:azure-storage:5.3.1"
+ compile "com.microsoft.azure:azure-eventhubs:0.14.5"
compile "com.fasterxml.jackson.core:jackson-core:2.8.8"
+ compile "io.dropwizard.metrics:metrics-core:3.1.2"
compile project(':samza-api')
compile project(":samza-core_$scalaVersion")
compile "org.slf4j:slf4j-api:$slf4jVersion"
testCompile "junit:junit:$junitVersion"
+ testCompile "org.mockito:mockito-all:$mockitoVersion"
+ testCompile "org.powermock:powermock-api-mockito:$powerMockVersion"
+ testCompile "org.powermock:powermock-core:$powerMockVersion"
+ testCompile "org.powermock:powermock-module-junit4:$powerMockVersion"
}
checkstyle {
configFile = new File(rootDir, "checkstyle/checkstyle.xml")
toolVersion = "$checkstyleVersion"
}
+ test {
+ // Exclude integration tests that require connection to EventHub
+ exclude 'org/apache/samza/system/eventhub/producer/*ITest*'
+ exclude 'org/apache/samza/system/eventhub/consumer/*ITest*'
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/663b8daf/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubClientManager.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubClientManager.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubClientManager.java
new file mode 100644
index 0000000..0b4f18f
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubClientManager.java
@@ -0,0 +1,69 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.eventhub;
+
+import com.microsoft.azure.eventhubs.EventHubClient;
+
+/**
+ * <p>
+ * EventHubClient manager is the interface that must be implemented to wrap the
+ * {@link EventHubClient} with lifecycle hooks for initialization and close.
+ * </p>
+ *
+ * <p>
+ * {@link #init()} should be invoked once during the startup and provides a
+ * hook to perform some initialization before the creation of the underlying
+ * {@link EventHubClient}. {@link #close(long)} is invoked once during shut-down
+ * and can be used to perform clean-ups.
+ * </p>
+ */
+public interface EventHubClientManager {
+ /**
+ * A constant that can be used in the close method's timeout parameter to
+ * denote that the close invocation should block until all the teardown
+ * operations for the {@link EventHubClient} are completed
+ */
+ public static int BLOCK_UNTIL_CLOSE = -1;
+
+ /**
+ * Lifecycle hook to perform initializations for the creation of
+ * the underlying {@link EventHubClient}.
+ */
+ void init();
+
+ /**
+ * Returns the underlying {@link EventHubClient} instance. Multiple invocations
+ * of this method should return the same instance instead of
+ * creating new ones.
+ *
+ * @return EventHub client instance of the wrapper
+ */
+ EventHubClient getEventHubClient();
+
+ /**
+ * Tries to close the {@link EventHubClient} instance within the provided
+ * timeout. Use this method to perform clean-ups after the execution of the
+ * {@link EventHubClient}. Set timeout the {@link #BLOCK_UNTIL_CLOSE} to
+ * block until the client is closed.
+ *
+ * @param timeoutMs Close timeout in Milliseconds
+ */
+ void close(long timeoutMs);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/663b8daf/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubClientManagerFactory.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubClientManagerFactory.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubClientManagerFactory.java
new file mode 100644
index 0000000..0578a50
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubClientManagerFactory.java
@@ -0,0 +1,32 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.eventhub;
+
+public class EventHubClientManagerFactory {
+ public EventHubClientManager getEventHubClientManager(String systemName, String streamName, EventHubConfig config) {
+
+ String eventHubNamespace = config.getStreamNamespace(systemName, streamName);
+ String entityPath = config.getStreamEntityPath(systemName, streamName);
+ String sasKeyName = config.getStreamSasKeyName(systemName, streamName);
+ String sasToken = config.getStreamSasToken(systemName, streamName);
+
+ return new SamzaEventHubClientManager(eventHubNamespace, entityPath, sasKeyName, sasToken);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/663b8daf/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
new file mode 100644
index 0000000..3bc04f8
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
@@ -0,0 +1,181 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.eventhub;
+
+import com.microsoft.azure.eventhubs.EventHubClient;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.system.eventhub.producer.EventHubSystemProducer;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+
+public class EventHubConfig extends MapConfig {
+
+ public static final String CONFIG_STREAM_LIST = "systems.%s.stream.list";
+
+ public static final String CONFIG_STREAM_NAMESPACE = "systems.%s.streams.%s.eventhubs.namespace";
+
+ public static final String CONFIG_STREAM_ENTITYPATH = "systems.%s.streams.%s.eventhubs.entitypath";
+
+ public static final String CONFIG_STREAM_SAS_KEY_NAME = "systems.%s.streams.%s.eventhubs.sas.keyname";
+
+ public static final String CONFIG_STREAM_SAS_TOKEN = "systems.%s.streams.%s.eventhubs.sas.token";
+
+ public static final String CONFIG_STREAM_CONSUMER_GROUP = "systems.%s.streams.%s.eventhubs.consumer.group";
+ public static final String DEFAULT_CONFIG_STREAM_CONSUMER_GROUP = EventHubClient.DEFAULT_CONSUMER_GROUP_NAME;
+
+ public static final String CONFIG_PRODUCER_PARTITION_METHOD = "systems.%s.eventhubs.partition.method";
+ public static final String DEFAULT_CONFIG_PRODUCER_PARTITION_METHOD = EventHubSystemProducer
+ .PartitioningMethod.EVENT_HUB_HASHING.name();
+
+ public static final String CONFIG_SEND_KEY_IN_EVENT_PROPERTIES = "systems.%s.eventhubs.send.key";
+ public static final Boolean DEFAULT_CONFIG_SEND_KEY_IN_EVENT_PROPERTIES = false;
+
+ public static final String CONFIG_FETCH_RUNTIME_INFO_TIMEOUT_MILLIS = "systems.%s.eventhubs.runtime.info.timeout";
+ public static final long DEFAULT_CONFIG_FETCH_RUNTIME_INFO_TIMEOUT_MILLIS = Duration.ofMinutes(1L).toMillis();
+
+ public static final String CONFIG_CONSUMER_BUFFER_CAPACITY = "systems.%s.eventhubs.receive.queue.size";
+ public static final int DEFAULT_CONFIG_CONSUMER_BUFFER_CAPACITY = 100;
+
+
+ public EventHubConfig(Map<String, String> config) {
+ super(config);
+ }
+
+ /**
+ * Get the list of streams that are defined. Each stream has enough
+ * information for connecting to a certain EventHub entity.
+ *
+ * @param systemName name of the system
+ * @return list of stream names
+ */
+ public List<String> getStreams(String systemName) {
+ return getList(String.format(CONFIG_STREAM_LIST, systemName));
+ }
+
+ /**
+ * Get the EventHubs namespace for the stream
+ *
+ * @param systemName name of the system
+ * @param streamName name of stream
+ * @return EventHubs namespace
+ */
+ public String getStreamNamespace(String systemName, String streamName) {
+ return get(String.format(CONFIG_STREAM_NAMESPACE, systemName, streamName));
+ }
+
+ /**
+ * Get the EventHubs entity path (topic name) for the stream
+ *
+ * @param systemName name of the system
+ * @param streamName name of stream
+ * @return EventHubs entity path
+ */
+ public String getStreamEntityPath(String systemName, String streamName) {
+ return get(String.format(CONFIG_STREAM_ENTITYPATH, systemName, streamName));
+ }
+
+ /**
+ * Get the EventHubs SAS (Shared Access Signature) key name for the stream
+ *
+ * @param systemName name of the system
+ * @param streamName name of stream
+ * @return EventHubs SAS key name
+ */
+ public String getStreamSasKeyName(String systemName, String streamName) {
+ return get(String.format(CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName));
+ }
+
+ /**
+ * Get the EventHubs SAS (Shared Access Signature) token for the stream
+ *
+ * @param systemName name of the system
+ * @param streamName name of stream
+ * @return EventHubs SAS token
+ */
+ public String getStreamSasToken(String systemName, String streamName) {
+ return get(String.format(CONFIG_STREAM_SAS_TOKEN, systemName, streamName));
+ }
+
+ /**
+ * Get the EventHubs consumer group used for consumption for the stream
+ *
+ * @param systemName name of the system
+ * @param streamName name of stream
+ * @return EventHubs consumer group
+ */
+ public String getStreamConsumerGroup(String systemName, String streamName) {
+ return get(String.format(CONFIG_STREAM_CONSUMER_GROUP, systemName, streamName), DEFAULT_CONFIG_STREAM_CONSUMER_GROUP);
+ }
+
+ /**
+ * Get the partition method of the systemName. By default partitioning is handed by EventHub.
+ *
+ * @param systemName name of the system
+ * @return The method the producer should use to partition the outgoing data
+ */
+ public EventHubSystemProducer.PartitioningMethod getPartitioningMethod(String systemName) {
+ String partitioningMethod = get(String.format(CONFIG_PRODUCER_PARTITION_METHOD, systemName),
+ DEFAULT_CONFIG_PRODUCER_PARTITION_METHOD);
+ return EventHubSystemProducer.PartitioningMethod.valueOf(partitioningMethod);
+
+ }
+
+ /**
+ * Returns true if the OutgoingMessageEnvelope key should be sent in the outgoing envelope, false otherwise
+ *
+ * @param systemName name of the system
+ * @return Boolean, is send key included
+ */
+ public Boolean getSendKeyInEventProperties(String systemName) {
+ String isSendKeyIncluded = get(String.format(CONFIG_SEND_KEY_IN_EVENT_PROPERTIES, systemName));
+ if (isSendKeyIncluded == null) {
+ return DEFAULT_CONFIG_SEND_KEY_IN_EVENT_PROPERTIES;
+ }
+ return Boolean.valueOf(isSendKeyIncluded);
+ }
+
+ /**
+ * Get the timeout for the getRuntimeInfo request to EventHub client
+ *
+ * @param systemName name of the systems
+ * @return long, timeout in millis for fetching RuntimeInfo
+ */
+ public long getRuntimeInfoWaitTimeMS(String systemName) {
+ return getLong(String.format(CONFIG_FETCH_RUNTIME_INFO_TIMEOUT_MILLIS, systemName),
+ DEFAULT_CONFIG_FETCH_RUNTIME_INFO_TIMEOUT_MILLIS);
+ }
+
+ /**
+ * Get the capacity of the Event Hub consumer buffer - the blocking queue used for storing messages
+ *
+ * @param systemName name of the system
+ * @return int, number of buffered messages per SystemStreamPartition
+ */
+ public int getConsumerBufferCapacity(String systemName) {
+ String bufferCapacity = get(String.format(CONFIG_CONSUMER_BUFFER_CAPACITY, systemName));
+ if (bufferCapacity == null) {
+ return DEFAULT_CONFIG_CONSUMER_BUFFER_CAPACITY;
+ }
+ return Integer.parseInt(bufferCapacity);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/663b8daf/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubSystemFactory.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubSystemFactory.java
new file mode 100644
index 0000000..fd7a99c
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubSystemFactory.java
@@ -0,0 +1,65 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.eventhub;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.eventhub.admin.EventHubSystemAdmin;
+import org.apache.samza.system.eventhub.admin.PassThroughInterceptor;
+import org.apache.samza.system.eventhub.consumer.EventHubSystemConsumer;
+import org.apache.samza.system.eventhub.producer.EventHubSystemProducer;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class EventHubSystemFactory implements SystemFactory {
+
+ private Map<String, Interceptor> getInterceptorsMap(EventHubConfig config, String systemName) {
+ Map<String, Interceptor> interceptors = new HashMap<>();
+ List<String> streamList = config.getStreams(systemName);
+ streamList.forEach((streamName) -> interceptors.put(streamName, new PassThroughInterceptor()));
+ return interceptors;
+ }
+
+ @Override
+ public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
+ EventHubConfig eventHubConfig = new EventHubConfig(config);
+ return new EventHubSystemConsumer(eventHubConfig, systemName, new EventHubClientManagerFactory(),
+ getInterceptorsMap(eventHubConfig, systemName), registry);
+ }
+
+ @Override
+ public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) {
+ EventHubConfig eventHubConfig = new EventHubConfig(config);
+ return new EventHubSystemProducer(eventHubConfig, systemName, new EventHubClientManagerFactory(),
+ getInterceptorsMap(eventHubConfig, systemName),
+ registry);
+ }
+
+ @Override
+ public SystemAdmin getAdmin(String systemName, Config config) {
+ return new EventHubSystemAdmin(systemName, new EventHubConfig(config), new EventHubClientManagerFactory());
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/663b8daf/samza-azure/src/main/java/org/apache/samza/system/eventhub/Interceptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/Interceptor.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/Interceptor.java
new file mode 100644
index 0000000..2a6edfb
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/Interceptor.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.eventhub;
+
+/**
+ * Interface that is used for converting bytes to bytes.
+ */
+public interface Interceptor {
+
+ /**
+ * Intercepts and converts bytes.
+ *
+ * @param bytes bytes to be converted
+ * @return Converted array of bytes converted
+ */
+ byte[] intercept(byte[] bytes);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/663b8daf/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java
new file mode 100644
index 0000000..ada5694
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java
@@ -0,0 +1,104 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.eventhub;
+
+import com.microsoft.azure.eventhubs.EventHubClient;
+import com.microsoft.azure.servicebus.ClientConstants;
+import com.microsoft.azure.servicebus.ConnectionStringBuilder;
+import com.microsoft.azure.servicebus.RetryExponential;
+import com.microsoft.azure.servicebus.RetryPolicy;
+import com.microsoft.azure.servicebus.ServiceBusException;
+import org.apache.samza.SamzaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+public class SamzaEventHubClientManager implements EventHubClientManager {
+ private static final Logger LOG = LoggerFactory.getLogger(SamzaEventHubClientManager.class.getName());
+
+ private static final String EVENTHUB_REMOTE_HOST_FORMAT = "%s.servicebus.windows.net";
+
+ private static final Duration MIN_RETRY_BACKOFF = Duration.ofMillis(100);
+ private static final Duration MAX_RETRY_BACKOFF = Duration.ofMillis(11000);
+ private static final int MAX_RETRY_COUNT = 100;
+ private static final String SAMZA_EVENTHUB_RETRY = "SAMZA_CONNECTOR_RETRY";
+
+ private EventHubClient eventHubClient;
+
+ private final String eventHubNamespace;
+ private final String entityPath;
+ private final String sasKeyName;
+ private final String sasKey;
+ private final RetryPolicy retryPolicy;
+
+ public SamzaEventHubClientManager(String eventHubNamespace, String entityPath, String sasKeyName, String sasKey) {
+ this(eventHubNamespace, entityPath, sasKeyName, sasKey,
+ new RetryExponential(MIN_RETRY_BACKOFF, MAX_RETRY_BACKOFF, MAX_RETRY_COUNT, SAMZA_EVENTHUB_RETRY));
+ }
+
+ public SamzaEventHubClientManager(String eventHubNamespace, String entityPath, String sasKeyName, String sasKey,
+ RetryPolicy retryPolicy) {
+ this.eventHubNamespace = eventHubNamespace;
+ this.entityPath = entityPath;
+ this.sasKeyName = sasKeyName;
+ this.sasKey = sasKey;
+ this.retryPolicy = retryPolicy;
+ }
+
+ @Override
+ public void init() {
+ String remoteHost = String.format(EVENTHUB_REMOTE_HOST_FORMAT, eventHubNamespace);
+ try {
+ ConnectionStringBuilder connectionStringBuilder =
+ new ConnectionStringBuilder(eventHubNamespace, entityPath, sasKeyName, sasKey);
+
+ eventHubClient = EventHubClient.createFromConnectionStringSync(connectionStringBuilder.toString(), retryPolicy);
+ } catch (IOException | ServiceBusException e) {
+ String msg = String.format("Creation of EventHub client failed for eventHub %s %s %s %s on remote host %s:%d",
+ entityPath, eventHubNamespace, sasKeyName, sasKey, remoteHost, ClientConstants.AMQPS_PORT);
+ LOG.error(msg, e);
+ throw new SamzaException(msg, e);
+ }
+ }
+
+ @Override
+ public EventHubClient getEventHubClient() {
+ return eventHubClient;
+ }
+
+ @Override
+ public void close(long timeoutMS) {
+ try {
+ if (timeoutMS == EventHubClientManager.BLOCK_UNTIL_CLOSE) {
+ eventHubClient.closeSync();
+ } else {
+ CompletableFuture<Void> future = eventHubClient.close();
+ future.get(timeoutMS, TimeUnit.MILLISECONDS);
+ }
+ } catch (Exception e) {
+ LOG.error("Closing the EventHub client failed", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/663b8daf/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
new file mode 100644
index 0000000..11998a4
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
@@ -0,0 +1,199 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.eventhub.admin;
+
+import com.microsoft.azure.eventhubs.EventHubPartitionRuntimeInformation;
+import com.microsoft.azure.eventhubs.EventHubRuntimeInformation;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.eventhub.EventHubClientManager;
+import org.apache.samza.system.eventhub.EventHubConfig;
+import org.apache.samza.system.eventhub.consumer.EventHubSystemConsumer;
+import org.apache.samza.system.eventhub.EventHubClientManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class EventHubSystemAdmin implements SystemAdmin {
+ private static final Logger LOG = LoggerFactory.getLogger(EventHubSystemAdmin.class);
+ private static final long DEFAULT_SHUTDOWN_TIMEOUT_MILLIS = Duration.ofMinutes(1L).toMillis();
+
+ private final EventHubClientManagerFactory eventHubClientManagerFactory;
+ private final String systemName;
+ private final EventHubConfig eventHubConfig;
+ private final Map<String, EventHubClientManager> eventHubClients = new HashMap<>();
+ private final Map<String, String[]> streamPartitions = new HashMap<>();
+
+ public EventHubSystemAdmin(String systemName, EventHubConfig eventHubConfig,
+ EventHubClientManagerFactory eventHubClientManagerFactory) {
+ this.systemName = systemName;
+ this.eventHubConfig = eventHubConfig;
+ this.eventHubClientManagerFactory = eventHubClientManagerFactory;
+ }
+
+ private String getNextOffset(String currentOffset) {
+ // EventHub will return the first message AFTER the offset
+ // that was specified in the fetch request.
+ return currentOffset.equals(EventHubSystemConsumer.END_OF_STREAM) ? currentOffset :
+ String.valueOf(Long.parseLong(currentOffset) + 1);
+ }
+
+ @Override
+ public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
+ Map<SystemStreamPartition, String> results = new HashMap<>();
+
+ offsets.forEach((partition, offset) -> results.put(partition, getNextOffset(offset)));
+ return results;
+ }
+
+ @Override
+ public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) {
+ final Map<String, SystemStreamMetadata> requestedMetadata = new HashMap<>();
+ final Map<String, CompletableFuture<EventHubRuntimeInformation>> ehRuntimeInfos = new HashMap<>();
+
+ streamNames.forEach((streamName) -> {
+ if (!streamPartitions.containsKey(streamName)) {
+ EventHubClientManager eventHubClientManager = getOrCreateStreamEventHubClient(streamName);
+ CompletableFuture<EventHubRuntimeInformation> runtimeInfo = eventHubClientManager.getEventHubClient()
+ .getRuntimeInformation();
+
+ ehRuntimeInfos.put(streamName, runtimeInfo);
+ }
+ });
+
+ try {
+ ehRuntimeInfos.forEach((streamName, ehRuntimeInfo) -> {
+
+ if (!streamPartitions.containsKey(streamName)) {
+ LOG.debug(String.format("Partition ids for Stream=%s not found", streamName));
+ try {
+
+ long timeoutMs = eventHubConfig.getRuntimeInfoWaitTimeMS(systemName);
+ EventHubRuntimeInformation ehInfo = ehRuntimeInfo.get(timeoutMs, TimeUnit.MILLISECONDS);
+
+ LOG.debug(String.format("Adding partition ids for Stream=%s", streamName));
+ streamPartitions.put(streamName, ehInfo.getPartitionIds());
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+
+ String msg = String.format("Error while fetching EventHubRuntimeInfo for System:%s, Stream:%s",
+ systemName, streamName);
+ throw new SamzaException(msg);
+ }
+ }
+
+ String[] partitionIds = streamPartitions.get(streamName);
+ Map<Partition, SystemStreamPartitionMetadata> sspMetadataMap = getPartitionMetadata(streamName, partitionIds);
+ SystemStreamMetadata systemStreamMetadata = new SystemStreamMetadata(streamName, sspMetadataMap);
+
+ requestedMetadata.put(streamName, systemStreamMetadata);
+ });
+
+ } finally {
+
+ // Closing clients
+ eventHubClients.forEach((streamName, client) -> client.close(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS));
+ eventHubClients.clear();
+ }
+
+ return requestedMetadata;
+ }
+
+ private EventHubClientManager getOrCreateStreamEventHubClient(String streamName) {
+ if (!eventHubClients.containsKey(streamName)) {
+ LOG.debug(String.format("Creating EventHubClient for Stream=%s", streamName));
+
+ EventHubClientManager eventHubClientManager = eventHubClientManagerFactory
+ .getEventHubClientManager(systemName, streamName, eventHubConfig);
+
+ eventHubClientManager.init();
+ eventHubClients.put(streamName, eventHubClientManager);
+ }
+ return eventHubClients.get(streamName);
+ }
+
+ private Map<Partition, SystemStreamPartitionMetadata> getPartitionMetadata(String streamName, String[] partitionIds) {
+ EventHubClientManager eventHubClientManager = getOrCreateStreamEventHubClient(streamName);
+ Map<Partition, SystemStreamPartitionMetadata> sspMetadataMap = new HashMap<>();
+ Map<String, CompletableFuture<EventHubPartitionRuntimeInformation>> ehRuntimeInfos = new HashMap<>();
+
+ for (String partition : partitionIds) {
+ CompletableFuture<EventHubPartitionRuntimeInformation> partitionRuntimeInfo = eventHubClientManager
+ .getEventHubClient()
+ .getPartitionRuntimeInformation(partition);
+
+ ehRuntimeInfos.put(partition, partitionRuntimeInfo);
+ }
+
+ ehRuntimeInfos.forEach((partitionId, ehPartitionRuntimeInfo) -> {
+ try {
+ long timeoutMs = eventHubConfig.getRuntimeInfoWaitTimeMS(systemName);
+ EventHubPartitionRuntimeInformation ehPartitionInfo = ehPartitionRuntimeInfo.get(timeoutMs, TimeUnit.MILLISECONDS);
+
+ String startingOffset = EventHubSystemConsumer.START_OF_STREAM;
+ String newestOffset = ehPartitionInfo.getLastEnqueuedOffset();
+ String upcomingOffset = getNextOffset(newestOffset);
+ SystemStreamPartitionMetadata sspMetadata = new SystemStreamPartitionMetadata(startingOffset, newestOffset,
+ upcomingOffset);
+
+ Partition partition = new Partition(Integer.parseInt(partitionId));
+
+ sspMetadataMap.put(partition, sspMetadata);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ String msg = String.format(
+ "Error while fetching EventHubPartitionRuntimeInfo for System:%s, Stream:%s, Partition:%s",
+ systemName, streamName, partitionId);
+ throw new SamzaException(msg);
+ }
+ });
+ return sspMetadataMap;
+ }
+
+ public static Integer compareOffsets(String offset1, String offset2) {
+ if (offset1 == null || offset2 == null) {
+ return null;
+ }
+ try {
+ if (offset1.equals(EventHubSystemConsumer.END_OF_STREAM)) {
+ return offset2.equals(EventHubSystemConsumer.END_OF_STREAM) ? 0 : 1;
+ }
+ return offset2.equals(EventHubSystemConsumer.END_OF_STREAM) ? -1 :
+ Long.compare(Long.parseLong(offset1), Long.parseLong(offset2));
+ } catch (NumberFormatException exception) {
+ return null;
+ }
+ }
+
+ @Override
+ public Integer offsetComparator(String offset1, String offset2) {
+ return compareOffsets(offset1, offset2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/663b8daf/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/PassThroughInterceptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/PassThroughInterceptor.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/PassThroughInterceptor.java
new file mode 100644
index 0000000..79dfb33
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/PassThroughInterceptor.java
@@ -0,0 +1,33 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.eventhub.admin;
+
+import org.apache.samza.system.eventhub.Interceptor;
+
+/**
+ * An default {@link Interceptor} that is a pass-through.
+ */
+public class PassThroughInterceptor implements Interceptor {
+
+ @Override
+ public byte[] intercept(byte[] bytes) {
+ return bytes;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/663b8daf/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubIncomingMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubIncomingMessageEnvelope.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubIncomingMessageEnvelope.java
new file mode 100644
index 0000000..8aa7480
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubIncomingMessageEnvelope.java
@@ -0,0 +1,42 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.eventhub.consumer;
+
+import com.microsoft.azure.eventhubs.EventData;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+
+/**
+ * Extension of {@link IncomingMessageEnvelope} which contains {@link EventData} system and user properties metadata
+ */
+public class EventHubIncomingMessageEnvelope extends IncomingMessageEnvelope {
+ private EventData eventData;
+
+ public EventHubIncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, String offset, Object key,
+ Object message, EventData eventData) {
+ super(systemStreamPartition, offset, key, message);
+
+ this.eventData = eventData;
+ }
+
+ public EventData getEventData() {
+ return eventData;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/663b8daf/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
new file mode 100644
index 0000000..4de34de
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
@@ -0,0 +1,401 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.eventhub.consumer;
+
+import com.microsoft.azure.eventhubs.EventData;
+import com.microsoft.azure.eventhubs.PartitionReceiveHandler;
+import com.microsoft.azure.eventhubs.PartitionReceiver;
+import com.microsoft.azure.servicebus.ServiceBusException;
+import org.apache.samza.SamzaException;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.eventhub.EventHubClientManagerFactory;
+import org.apache.samza.system.eventhub.EventHubClientManager;
+import org.apache.samza.system.eventhub.EventHubConfig;
+import org.apache.samza.system.eventhub.Interceptor;
+import org.apache.samza.system.eventhub.admin.EventHubSystemAdmin;
+import org.apache.samza.system.eventhub.metrics.SamzaHistogram;
+import org.apache.samza.util.BlockingEnvelopeMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+
+/**
+ * Implementation of a system consumer for EventHubs. For each system stream
+ * partition, it registers a handler with the EventHubsClient which constantly
+ * push data into a block queue. This class extends the BlockingEnvelopeMap
+ * provided by samza-api to to simplify the logic around those blocking queues.
+ * <p>
+ * A high level architecture:
+ * <p>
+ * ┌───────────────────────────────────────────────┐
+ * │ EventHubsClient │
+ * │ │
+ * │ ┌───────────────────────────────────────┐ │ ┌─────────────────────┐
+ * │ │ │ │ │ │
+ * │ │ PartitionReceiveHandler_1 │───┼───────▶│ SSP1-BlockingQueue ├──────┐
+ * │ │ │ │ │ │ │
+ * │ └───────────────────────────────────────┘ │ └─────────────────────┘ │
+ * │ │ │
+ * │ ┌───────────────────────────────────────┐ │ ┌─────────────────────┐ │
+ * │ │ │ │ │ │ │
+ * │ │ PartitionReceiveHandler_2 │───┼───────▶│ SSP2-BlockingQueue ├──────┤ ┌──────────────────────────┐
+ * │ │ │ │ │ │ ├───────▶│ │
+ * │ └───────────────────────────────────────┘ │ └─────────────────────┘ └───────▶│ SystemConsumer.poll() │
+ * │ │ ┌───────▶│ │
+ * │ │ │ └──────────────────────────┘
+ * │ ... │ ... │
+ * │ │ │
+ * │ │ │
+ * │ ┌───────────────────────────────────────┐ │ ┌─────────────────────┐ │
+ * │ │ │ │ │ │ │
+ * │ │ PartitionReceiveHandler_N │───┼───────▶│ SSPN-BlockingQueue ├──────┘
+ * │ │ │ │ │ │
+ * │ └───────────────────────────────────────┘ │ └─────────────────────┘
+ * │ │
+ * │ │
+ * └───────────────────────────────────────────────┘
+ */
+public class EventHubSystemConsumer extends BlockingEnvelopeMap {
+ private static final Logger LOG = LoggerFactory.getLogger(EventHubSystemConsumer.class);
+ private static final int MAX_EVENT_COUNT_PER_PARTITION_POLL = 50;
+
+ // Overall timeout for EventHubClient exponential backoff policy
+ private static final Duration DEFAULT_EVENTHUB_RECEIVER_TIMEOUT = Duration.ofMinutes(10L);
+ private static final long DEFAULT_SHUTDOWN_TIMEOUT_MILLIS = Duration.ofMinutes(1L).toMillis();
+
+ public static final String START_OF_STREAM = PartitionReceiver.START_OF_STREAM; // -1
+ public static final String END_OF_STREAM = "-2";
+ public static final String EVENT_READ_RATE = "eventReadRate";
+ public static final String EVENT_BYTE_READ_RATE = "eventByteReadRate";
+ public static final String READ_LATENCY = "readLatency";
+ public static final String READ_ERRORS = "readErrors";
+ public static final String AGGREGATE = "aggregate";
+
+ private static final Object AGGREGATE_METRICS_LOCK = new Object();
+
+ private static Counter aggEventReadRate = null;
+ private static Counter aggEventByteReadRate = null;
+ private static SamzaHistogram aggReadLatency = null;
+ private static Counter aggReadErrors = null;
+
+ private final Map<String, Counter> eventReadRates;
+ private final Map<String, Counter> eventByteReadRates;
+ private final Map<String, SamzaHistogram> readLatencies;
+ private final Map<String, Counter> readErrors;
+
+ final ConcurrentHashMap<SystemStreamPartition, PartitionReceiveHandler> streamPartitionHandlers = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<SystemStreamPartition, PartitionReceiver> streamPartitionReceivers = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, EventHubClientManager> streamEventHubManagers = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<SystemStreamPartition, String> streamPartitionOffsets = new ConcurrentHashMap<>();
+ private final Map<String, Interceptor> interceptors;
+ private boolean isStarted = false;
+ private final EventHubConfig config;
+ private final String systemName;
+
+ // Partition receiver error propagation
+ private final AtomicReference<Throwable> eventHubHandlerError = new AtomicReference<>(null);
+
+ public EventHubSystemConsumer(EventHubConfig config, String systemName,
+ EventHubClientManagerFactory eventHubClientManagerFactory,
+ Map<String, Interceptor> interceptors, MetricsRegistry registry) {
+ super(registry, System::currentTimeMillis);
+
+ this.config = config;
+ this.systemName = systemName;
+ this.interceptors = interceptors;
+ List<String> streamNames = config.getStreams(systemName);
+ // Create and initiate connections to Event Hubs
+ for (String streamName : streamNames) {
+ EventHubClientManager eventHubClientManager = eventHubClientManagerFactory
+ .getEventHubClientManager(systemName, streamName, config);
+ streamEventHubManagers.put(streamName, eventHubClientManager);
+ eventHubClientManager.init();
+ }
+
+ // Initiate metrics
+ eventReadRates = streamNames.stream()
+ .collect(Collectors.toMap(Function.identity(), x -> registry.newCounter(x, EVENT_READ_RATE)));
+ eventByteReadRates = streamNames.stream()
+ .collect(Collectors.toMap(Function.identity(), x -> registry.newCounter(x, EVENT_BYTE_READ_RATE)));
+ readLatencies = streamNames.stream()
+ .collect(Collectors.toMap(Function.identity(), x -> new SamzaHistogram(registry, x, READ_LATENCY)));
+ readErrors = streamNames.stream()
+ .collect(Collectors.toMap(Function.identity(), x -> registry.newCounter(x, READ_ERRORS)));
+
+ // Locking to ensure that these aggregated metrics will be created only once across multiple system consumers.
+ synchronized (AGGREGATE_METRICS_LOCK) {
+ if (aggEventReadRate == null) {
+ aggEventReadRate = registry.newCounter(AGGREGATE, EVENT_READ_RATE);
+ aggEventByteReadRate = registry.newCounter(AGGREGATE, EVENT_BYTE_READ_RATE);
+ aggReadLatency = new SamzaHistogram(registry, AGGREGATE, READ_LATENCY);
+ aggReadErrors = registry.newCounter(AGGREGATE, READ_ERRORS);
+ }
+ }
+ }
+
+ @Override
+ public void register(SystemStreamPartition systemStreamPartition, String offset) {
+ super.register(systemStreamPartition, offset);
+
+ if (isStarted) {
+ throw new SamzaException("Trying to add partition when the connection has already started.");
+ }
+
+ if (streamPartitionOffsets.containsKey(systemStreamPartition)) {
+ String prevOffset = streamPartitionOffsets.get(systemStreamPartition);
+ if (EventHubSystemAdmin.compareOffsets(offset, prevOffset) > -1) {
+ // Only update if new offset is lower than previous offset
+ return;
+ }
+ }
+ streamPartitionOffsets.put(systemStreamPartition, offset);
+ }
+
+ @Override
+ public void start() {
+ isStarted = true;
+ // Create receivers for Event Hubs
+ for (Map.Entry<SystemStreamPartition, String> entry : streamPartitionOffsets.entrySet()) {
+
+ SystemStreamPartition ssp = entry.getKey();
+ String streamName = ssp.getStream();
+ Integer partitionId = ssp.getPartition().getPartitionId();
+ String offset = entry.getValue();
+ String consumerGroup = config.getStreamConsumerGroup(systemName, streamName);
+ String namespace = config.getStreamNamespace(systemName, streamName);
+ String entityPath = config.getStreamEntityPath(systemName, streamName);
+ EventHubClientManager eventHubClientManager = streamEventHubManagers.get(streamName);
+
+ try {
+ PartitionReceiver receiver;
+ if (offset.equals(EventHubSystemConsumer.END_OF_STREAM)) {
+ receiver = eventHubClientManager.getEventHubClient()
+ .createReceiverSync(consumerGroup, partitionId.toString(), Instant.now());
+ } else {
+ receiver = eventHubClientManager.getEventHubClient()
+ .createReceiverSync(consumerGroup, partitionId.toString(), offset,
+ !offset.equals(EventHubSystemConsumer.START_OF_STREAM));
+ }
+
+ PartitionReceiveHandler handler = new PartitionReceiverHandlerImpl(ssp, eventReadRates.get(streamName),
+ eventByteReadRates.get(streamName), readLatencies.get(streamName), readErrors.get(streamName),
+ interceptors.getOrDefault(streamName, null));
+
+
+ // Timeout for EventHubClient receive
+ receiver.setReceiveTimeout(DEFAULT_EVENTHUB_RECEIVER_TIMEOUT);
+
+ // Start the receiver thread
+ receiver.setReceiveHandler(handler);
+
+ streamPartitionHandlers.put(ssp, handler);
+ streamPartitionReceivers.put(ssp, receiver);
+ } catch (Exception e) {
+ throw new SamzaException(
+ String.format("Failed to create receiver for EventHubs: namespace=%s, entity=%s, partitionId=%d",
+ namespace, entityPath, partitionId), e);
+ }
+ LOG.debug(String.format("Connection successfully started for namespace=%s, entity=%s ", namespace, entityPath));
+
+ }
+ }
+
+ @Override
+ public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
+ Throwable handlerError = eventHubHandlerError.get();
+
+ if (handlerError != null) {
+ if (isErrorTransient(handlerError)) {
+ // Log a warning if the error is transient
+ // Partition receiver handler OnError should have handled it by recreating the receiver
+ LOG.warn("Received a transient error from event hub partition receiver, restarted receiver", handlerError);
+ } else {
+ // Propagate the error to user if the throwable is either
+ // 1. permanent ServiceBusException error from client
+ // 2. SamzaException thrown bu the EventHubConsumer
+ // 2a. Interrupted during put operation to BEM
+ // 2b. Failure in renewing the Partititon Receiver
+ String msg = "Received a non transient error from event hub partition receiver";
+ throw new SamzaException(msg, handlerError);
+ }
+ }
+
+ return super.poll(systemStreamPartitions, timeout);
+ }
+
+ private void renewPartitionReceiver(SystemStreamPartition ssp) {
+
+ EventHubClientManager eventHubClientManager = streamEventHubManagers.get(ssp.getStream());
+ String offset = streamPartitionOffsets.get(ssp);
+ Integer partitionId = ssp.getPartition().getPartitionId();
+ String consumerGroup = config.getStreamConsumerGroup(ssp.getSystem(), ssp.getStream());
+
+ try {
+ // Close current receiver
+ streamPartitionReceivers.get(ssp).close().get(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+
+ // Recreate receiver
+ PartitionReceiver receiver = eventHubClientManager.getEventHubClient()
+ .createReceiverSync(consumerGroup, partitionId.toString(), offset,
+ !offset.equals(EventHubSystemConsumer.START_OF_STREAM));
+
+ // Timeout for EventHubClient receive
+ receiver.setReceiveTimeout(DEFAULT_EVENTHUB_RECEIVER_TIMEOUT);
+
+ // Create and start receiver thread with handler
+ receiver.setReceiveHandler(streamPartitionHandlers.get(ssp));
+ streamPartitionReceivers.put(ssp, receiver);
+
+ } catch (Exception e) {
+ eventHubHandlerError.set(new SamzaException(
+ String.format("Failed to recreate receiver for EventHubs after ReceiverHandlerError (ssp=%s)", ssp), e));
+ }
+ }
+
+ @Override
+ public void stop() {
+ LOG.debug("Stopping event hub system consumer...");
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ streamPartitionReceivers.values().forEach((receiver) -> futures.add(receiver.close()));
+ CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
+ try {
+ future.get(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException | InterruptedException | TimeoutException e) {
+ LOG.warn("Failed to close receivers", e);
+ }
+ streamEventHubManagers.values().forEach(ehClientManager -> ehClientManager.close(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS));
+ }
+
+ private boolean isErrorTransient(Throwable throwable) {
+ if (throwable instanceof ServiceBusException) {
+ ServiceBusException serviceBusException = (ServiceBusException) throwable;
+ return serviceBusException.getIsTransient();
+ }
+ return false;
+ }
+
+ @Override
+ protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
+ return new LinkedBlockingQueue<>(config.getConsumerBufferCapacity(systemName));
+ }
+
+ protected class PartitionReceiverHandlerImpl extends PartitionReceiveHandler {
+
+ private final Counter eventReadRate;
+ private final Counter eventByteReadRate;
+ private final SamzaHistogram readLatency;
+ private final Counter errorRate;
+ private final Interceptor interceptor;
+ SystemStreamPartition ssp;
+
+ PartitionReceiverHandlerImpl(SystemStreamPartition ssp, Counter eventReadRate, Counter eventByteReadRate,
+ SamzaHistogram readLatency, Counter readErrors, Interceptor interceptor) {
+ super(MAX_EVENT_COUNT_PER_PARTITION_POLL);
+ this.ssp = ssp;
+ this.eventReadRate = eventReadRate;
+ this.eventByteReadRate = eventByteReadRate;
+ this.readLatency = readLatency;
+ this.errorRate = readErrors;
+ this.interceptor = interceptor;
+ }
+
+ @Override
+ public void onReceive(Iterable<EventData> events) {
+ if (events != null) {
+ events.forEach(event -> {
+ byte[] eventDataBody = event.getBytes();
+ if (interceptor != null) {
+ eventDataBody = interceptor.intercept(eventDataBody);
+ }
+ String offset = event.getSystemProperties().getOffset();
+ Object partitionKey = event.getSystemProperties().getPartitionKey();
+ try {
+ updateMetrics(event);
+
+ // note that the partition key can be null
+ put(ssp, new EventHubIncomingMessageEnvelope(ssp, offset, partitionKey, eventDataBody, event));
+ } catch (InterruptedException e) {
+ String msg = String.format("Interrupted while adding the event from ssp %s to dispatch queue.", ssp);
+ LOG.error(msg, e);
+ throw new SamzaException(msg, e);
+ }
+
+ // Cache latest checkpoint
+ streamPartitionOffsets.put(ssp, offset);
+ });
+ }
+ }
+
+ private void updateMetrics(EventData event) {
+ int eventDataLength = event.getBytes() == null ? 0 : event.getBytes().length;
+ eventReadRate.inc();
+ aggEventReadRate.inc();
+ eventByteReadRate.inc(eventDataLength);
+ aggEventByteReadRate.inc(eventDataLength);
+
+ long latencyMs = Duration.between(Instant.now(), event.getSystemProperties().getEnqueuedTime()).toMillis();
+ readLatency.update(latencyMs);
+ aggReadLatency.update(latencyMs);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ errorRate.inc();
+ aggReadErrors.inc();
+
+ if (throwable instanceof ServiceBusException) {
+ ServiceBusException busException = (ServiceBusException) throwable;
+
+ if (busException.getIsTransient()) {
+
+ // Only set to transient throwable if there has been no previous errors
+ eventHubHandlerError.compareAndSet(null, throwable);
+
+ // Retry creating a receiver since error likely due to timeout
+ renewPartitionReceiver(ssp);
+ return;
+ }
+ }
+
+ // Propagate non transient or unknown errors
+ eventHubHandlerError.set(throwable);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/663b8daf/samza-azure/src/main/java/org/apache/samza/system/eventhub/metrics/SamzaHistogram.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/metrics/SamzaHistogram.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/metrics/SamzaHistogram.java
new file mode 100644
index 0000000..7d6d408
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/metrics/SamzaHistogram.java
@@ -0,0 +1,62 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.eventhub.metrics;
+
+import com.codahale.metrics.ExponentiallyDecayingReservoir;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Snapshot;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsRegistry;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Creates a {@link Histogram} metric using {@link ExponentiallyDecayingReservoir}
+ * Keeps a {@link Gauge} for each percentile
+ */
+public class SamzaHistogram {
+ private static final List<Double> DEFAULT_HISTOGRAM_PERCENTILES = Arrays.asList(50D, 99D);
+ private final Histogram histogram;
+ private final List<Double> percentiles;
+ private final Map<Double, Gauge<Double>> gauges;
+
+ public SamzaHistogram(MetricsRegistry registry, String group, String name) {
+ this(registry, group, name, DEFAULT_HISTOGRAM_PERCENTILES);
+ }
+
+ public SamzaHistogram(MetricsRegistry registry, String group, String name, List<Double> percentiles) {
+ this.histogram = new Histogram(new ExponentiallyDecayingReservoir());
+ this.percentiles = percentiles;
+ this.gauges = this.percentiles.stream()
+ .filter(x -> x > 0 && x <= 100)
+ .collect(
+ Collectors.toMap(Function.identity(), x -> registry.newGauge(group, name + "_" + String.valueOf(0), 0D)));
+ }
+
+ public void update(long value) {
+ histogram.update(value);
+ Snapshot values = histogram.getSnapshot();
+ percentiles.forEach(x -> gauges.get(x).set(values.getValue(x / 100)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/663b8daf/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
new file mode 100644
index 0000000..c8c5538
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
@@ -0,0 +1,345 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.eventhub.producer;
+
+import com.microsoft.azure.eventhubs.EventData;
+import com.microsoft.azure.eventhubs.EventHubClient;
+import com.microsoft.azure.eventhubs.PartitionSender;
+import com.microsoft.azure.servicebus.ServiceBusException;
+import org.apache.samza.SamzaException;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.eventhub.EventHubClientManager;
+import org.apache.samza.system.eventhub.EventHubConfig;
+import org.apache.samza.system.eventhub.EventHubClientManagerFactory;
+import org.apache.samza.system.eventhub.Interceptor;
+import org.apache.samza.system.eventhub.metrics.SamzaHistogram;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class EventHubSystemProducer implements SystemProducer {
+ private static final Logger LOG = LoggerFactory.getLogger(EventHubSystemProducer.class.getName());
+ private static final long DEFAULT_SHUTDOWN_TIMEOUT_MILLIS = Duration.ofMinutes(1L).toMillis();
+ private static final long DEFAULT_FLUSH_TIMEOUT_MILLIS = Duration.ofMinutes(1L).toMillis();
+
+ public static final String PRODUCE_TIMESTAMP = "produce-timestamp";
+
+ // Metrics recording
+ private static final String EVENT_WRITE_RATE = "eventWriteRate";
+ private static final String EVENT_BYTE_WRITE_RATE = "eventByteWriteRate";
+ private static final String SEND_ERRORS = "sendErrors";
+ private static final String SEND_LATENCY = "sendLatency";
+ private static final String SEND_CALLBACK_LATENCY = "sendCallbackLatency";
+ private static Counter aggEventWriteRate = null;
+ private static Counter aggEventByteWriteRate = null;
+ private static Counter aggSendErrors = null;
+ private static SamzaHistogram aggSendLatency = null;
+ private static SamzaHistogram aggSendCallbackLatency = null;
+ private static final String AGGREGATE = "aggregate";
+
+ private static final Object AGGREGATE_METRICS_LOCK = new Object();
+
+ public enum PartitioningMethod {
+ EVENT_HUB_HASHING,
+ PARTITION_KEY_AS_PARTITION,
+ }
+
+ private final HashMap<String, Counter> eventWriteRate = new HashMap<>();
+ private final HashMap<String, Counter> eventByteWriteRate = new HashMap<>();
+ private final HashMap<String, SamzaHistogram> sendLatency = new HashMap<>();
+ private final HashMap<String, SamzaHistogram> sendCallbackLatency = new HashMap<>();
+ private final HashMap<String, Counter> sendErrors = new HashMap<>();
+
+ private final EventHubClientManagerFactory eventHubClientManagerFactory;
+ private final EventHubConfig config;
+ private final MetricsRegistry registry;
+ private final PartitioningMethod partitioningMethod;
+ private final String systemName;
+
+ private final AtomicReference<Throwable> sendExceptionOnCallback = new AtomicReference<>(null);
+ private volatile boolean isStarted = false;
+
+ // Map of the system name to the event hub client.
+ private final Map<String, EventHubClientManager> eventHubClients = new HashMap<>();
+ private final Map<String, Map<Integer, PartitionSender>> streamPartitionSenders = new HashMap<>();
+
+ private final Map<String, Interceptor> interceptors;
+
+ private final Set<CompletableFuture<Void>> pendingFutures = ConcurrentHashMap.newKeySet();
+
+ public EventHubSystemProducer(EventHubConfig config, String systemName,
+ EventHubClientManagerFactory eventHubClientManagerFactory,
+ Map<String, Interceptor> interceptors, MetricsRegistry registry) {
+ this.config = config;
+ this.registry = registry;
+ this.systemName = systemName;
+ this.partitioningMethod = config.getPartitioningMethod(systemName);
+ this.eventHubClientManagerFactory = eventHubClientManagerFactory;
+ this.interceptors = interceptors;
+ }
+
+ @Override
+ public synchronized void register(String streamName) {
+ LOG.debug("Trying to register {}.", streamName);
+ if (isStarted) {
+ String msg = "Cannot register once the producer is started.";
+ throw new SamzaException(msg);
+ }
+
+ if (eventHubClients.containsKey(streamName)) {
+ LOG.warn("Already registered stream {}.", streamName);
+ return;
+ }
+
+ EventHubClientManager ehClient = eventHubClientManagerFactory.getEventHubClientManager(systemName, streamName, config);
+
+ ehClient.init();
+ eventHubClients.put(streamName, ehClient);
+ }
+
+ @Override
+ public synchronized void start() {
+ LOG.debug("Starting system producer.");
+
+ if (PartitioningMethod.PARTITION_KEY_AS_PARTITION.equals(partitioningMethod)) {
+ // Create all partition senders
+ eventHubClients.forEach((streamName, samzaEventHubClient) -> {
+ EventHubClient ehClient = samzaEventHubClient.getEventHubClient();
+
+ try {
+ Map<Integer, PartitionSender> partitionSenders = new HashMap<>();
+ long timeoutMs = config.getRuntimeInfoWaitTimeMS(systemName);
+ Integer numPartitions = ehClient.getRuntimeInformation().get(timeoutMs, TimeUnit.MILLISECONDS)
+ .getPartitionCount();
+
+ for (int i = 0; i < numPartitions; i++) { // 32 partitions max
+ String partitionId = String.valueOf(i);
+ PartitionSender partitionSender = ehClient.createPartitionSenderSync(partitionId);
+ partitionSenders.put(i, partitionSender);
+ }
+
+ streamPartitionSenders.put(streamName, partitionSenders);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ String msg = "Failed to fetch number of Event Hub partitions for partition sender creation";
+ throw new SamzaException(msg, e);
+ } catch (ServiceBusException | IllegalArgumentException e) {
+ String msg = "Creation of partition sender failed with exception";
+ throw new SamzaException(msg, e);
+ }
+ });
+ }
+
+ for (String eventHub : eventHubClients.keySet()) {
+ eventWriteRate.put(eventHub, registry.newCounter(eventHub, EVENT_WRITE_RATE));
+ eventByteWriteRate.put(eventHub, registry.newCounter(eventHub, EVENT_BYTE_WRITE_RATE));
+ sendLatency.put(eventHub, new SamzaHistogram(registry, eventHub, SEND_LATENCY));
+ sendCallbackLatency.put(eventHub, new SamzaHistogram(registry, eventHub, SEND_CALLBACK_LATENCY));
+ sendErrors.put(eventHub, registry.newCounter(eventHub, SEND_ERRORS));
+ }
+
+ // Locking to ensure that these aggregated metrics will be created only once across multiple system consumers.
+ synchronized (AGGREGATE_METRICS_LOCK) {
+ if (aggEventWriteRate == null) {
+ aggEventWriteRate = registry.newCounter(AGGREGATE, EVENT_WRITE_RATE);
+ aggEventByteWriteRate = registry.newCounter(AGGREGATE, EVENT_BYTE_WRITE_RATE);
+ aggSendLatency = new SamzaHistogram(registry, AGGREGATE, SEND_LATENCY);
+ aggSendCallbackLatency = new SamzaHistogram(registry, AGGREGATE, SEND_CALLBACK_LATENCY);
+ aggSendErrors = registry.newCounter(AGGREGATE, SEND_ERRORS);
+ }
+ }
+
+ isStarted = true;
+ }
+
+ @Override
+ public synchronized void send(String destination, OutgoingMessageEnvelope envelope) {
+ if (!isStarted) {
+ throw new SamzaException("Trying to call send before the producer is started.");
+ }
+
+ if (!eventHubClients.containsKey(destination)) {
+ String msg = String.format("Trying to send event to a destination {%s} that is not registered.", destination);
+ throw new SamzaException(msg);
+ }
+
+ checkCallbackThrowable("Received exception on message send");
+
+ EventData eventData = createEventData(destination, envelope);
+ int eventDataLength = eventData.getBytes() == null ? 0 : eventData.getBytes().length;
+ eventWriteRate.get(destination).inc();
+ aggEventWriteRate.inc();
+ eventByteWriteRate.get(destination).inc(eventDataLength);
+ aggEventByteWriteRate.inc(eventDataLength);
+ EventHubClientManager ehClient = eventHubClients.get(destination);
+
+ long beforeSendTimeMs = System.currentTimeMillis();
+
+ // Async send call
+ CompletableFuture<Void> sendResult = sendToEventHub(destination, eventData, getEnvelopePartitionId(envelope),
+ ehClient.getEventHubClient());
+
+ long afterSendTimeMs = System.currentTimeMillis();
+ long latencyMs = afterSendTimeMs - beforeSendTimeMs;
+ sendLatency.get(destination).update(latencyMs);
+ aggSendLatency.update(latencyMs);
+
+ pendingFutures.add(sendResult);
+
+ // Auto update the metrics and possible throwable when futures are complete.
+ sendResult.handle((aVoid, throwable) -> {
+ long callbackLatencyMs = System.currentTimeMillis() - afterSendTimeMs;
+ sendCallbackLatency.get(destination).update(callbackLatencyMs);
+ aggSendCallbackLatency.update(callbackLatencyMs);
+ if (throwable != null) {
+ sendErrors.get(destination).inc();
+ aggSendErrors.inc();
+ LOG.error("Send message to event hub: {} failed with exception: ", destination, throwable);
+ sendExceptionOnCallback.compareAndSet(null, throwable);
+ }
+ return aVoid;
+ });
+ }
+
+ private CompletableFuture<Void> sendToEventHub(String streamName, EventData eventData, Object partitionKey,
+ EventHubClient eventHubClient) {
+ if (partitioningMethod == PartitioningMethod.EVENT_HUB_HASHING) {
+ return eventHubClient.send(eventData, convertPartitionKeyToString(partitionKey));
+ } else if (partitioningMethod == PartitioningMethod.PARTITION_KEY_AS_PARTITION) {
+ if (!(partitionKey instanceof Integer)) {
+ String msg = "Partition key should be of type Integer";
+ throw new SamzaException(msg);
+ }
+
+ Integer numPartition = streamPartitionSenders.get(streamName).size();
+ Integer destinationPartition = (Integer) partitionKey % numPartition;
+
+ PartitionSender sender = streamPartitionSenders.get(streamName).get(destinationPartition);
+ return sender.send(eventData);
+ } else {
+ throw new SamzaException("Unknown partitioning method " + partitioningMethod);
+ }
+ }
+
+ protected Object getEnvelopePartitionId(OutgoingMessageEnvelope envelope) {
+ return envelope.getPartitionKey();
+ }
+
+ private String convertPartitionKeyToString(Object partitionKey) {
+ if (partitionKey instanceof String) {
+ return (String) partitionKey;
+ } else if (partitionKey instanceof Integer) {
+ return String.valueOf(partitionKey);
+ } else if (partitionKey instanceof byte[]) {
+ return new String((byte[]) partitionKey, Charset.defaultCharset());
+ } else {
+ throw new SamzaException("Unsupported key type: " + partitionKey.getClass().toString());
+ }
+ }
+
+ private EventData createEventData(String streamName, OutgoingMessageEnvelope envelope) {
+ Optional<Interceptor> interceptor = Optional.ofNullable(interceptors.getOrDefault(streamName, null));
+ byte[] eventValue = (byte[]) envelope.getMessage();
+ if (interceptor.isPresent()) {
+ eventValue = interceptor.get().intercept(eventValue);
+ }
+
+ EventData eventData = new EventData(eventValue);
+
+ eventData.getProperties().put(PRODUCE_TIMESTAMP, Long.toString(System.currentTimeMillis()));
+
+ if (config.getSendKeyInEventProperties(systemName)) {
+ String keyValue = "";
+ if (envelope.getKey() != null) {
+ keyValue = (envelope.getKey() instanceof byte[]) ? new String((byte[]) envelope.getKey())
+ : envelope.getKey().toString();
+ }
+ eventData.getProperties().put("key", keyValue);
+ }
+ return eventData;
+ }
+
+ @Override
+ public synchronized void flush(String source) {
+ LOG.debug("Trying to flush pending {} sends messages.", pendingFutures.size());
+ checkCallbackThrowable("Received exception on message send");
+
+ CompletableFuture<Void> future = CompletableFuture
+ .allOf(pendingFutures.toArray(new CompletableFuture[pendingFutures.size()]));
+
+ try {
+ // Block until all the pending sends are complete or timeout.
+ future.get(DEFAULT_FLUSH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ String msg = "Flush failed with error";
+ throw new SamzaException(msg, e);
+ }
+
+ checkCallbackThrowable("Sending one or more of the messages failed during flush");
+ }
+
+ private void checkCallbackThrowable(String msg) {
+ // Check for send errors from EventHub side
+ Throwable sendThrowable = sendExceptionOnCallback.get();
+ if (sendThrowable != null) {
+ throw new SamzaException(msg, sendThrowable);
+ }
+ pendingFutures.clear();
+ }
+
+ @Override
+ public synchronized void stop() {
+ LOG.debug("Stopping producer.", pendingFutures.size());
+
+ streamPartitionSenders.values().forEach((streamPartitionSender) -> {
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ streamPartitionSender.forEach((key, value) -> futures.add(value.close()));
+ CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
+ try {
+ future.get(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException | InterruptedException | TimeoutException e) {
+ LOG.error("Closing the partition sender failed ", e);
+ }
+ });
+ eventHubClients.values().forEach(ehClient -> ehClient.close(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS));
+ eventHubClients.clear();
+ }
+
+ Collection<CompletableFuture<Void>> getPendingFutures() {
+ return pendingFutures;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/663b8daf/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventData.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventData.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventData.java
new file mode 100644
index 0000000..b5b55dc
--- /dev/null
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventData.java
@@ -0,0 +1,57 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.eventhub;
+
+import com.microsoft.azure.eventhubs.EventData;
+
+import java.nio.charset.Charset;
+import java.util.*;
+
+public class MockEventData extends EventData {
+
+ private EventData.SystemProperties overridedSystemProperties;
+
+ private MockEventData(byte[] data, String partitionKey, String offset) {
+ super(data);
+ HashMap<String, Object> properties = new HashMap<>();
+ properties.put("x-opt-offset", offset);
+ properties.put("x-opt-partition-key", partitionKey);
+ properties.put("x-opt-enqueued-time", new Date(System.currentTimeMillis()));
+ overridedSystemProperties = new SystemProperties(properties);
+ }
+
+ public static List<EventData> generateEventData(int numEvents) {
+ Random rand = new Random(System.currentTimeMillis());
+ List<EventData> result = new ArrayList<>();
+ for (int i = 0; i < numEvents; i++) {
+ String key = "key_" + rand.nextInt();
+ String message = "message:" + rand.nextInt();
+ String offset = "offset_" + i;
+ EventData eventData = new MockEventData(message.getBytes(Charset.defaultCharset()), key, offset);
+ result.add(eventData);
+ }
+ return result;
+ }
+
+ @Override
+ public EventData.SystemProperties getSystemProperties() {
+ return overridedSystemProperties;
+ }
+}