You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/10/27 05:26:34 UTC

[2/2] samza git commit: SAMZA-1438; SystemProducer, Consumer and Admin interfaces for EventHubs

SAMZA-1438; SystemProducer, Consumer and Admin interfaces for EventHubs

Author: Daniel Chen <29...@users.noreply.github.com>

Reviewers: Jagadish <ja...@apache.org>, Prateek M<pm...@linkedin.com>

Closes #308 from dxichen/eventHub-Connector


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/663b8daf
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/663b8daf
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/663b8daf

Branch: refs/heads/master
Commit: 663b8daffa56e6a5ca1e00641c6566304d8c832d
Parents: 8b21209
Author: Daniel Chen <29...@users.noreply.github.com>
Authored: Thu Oct 26 22:26:29 2017 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Thu Oct 26 22:26:29 2017 -0700

----------------------------------------------------------------------
 build.gradle                                    |  11 +
 .../system/eventhub/EventHubClientManager.java  |  69 ++++
 .../eventhub/EventHubClientManagerFactory.java  |  32 ++
 .../samza/system/eventhub/EventHubConfig.java   | 181 +++++++++
 .../system/eventhub/EventHubSystemFactory.java  |  65 +++
 .../samza/system/eventhub/Interceptor.java      |  34 ++
 .../eventhub/SamzaEventHubClientManager.java    | 104 +++++
 .../eventhub/admin/EventHubSystemAdmin.java     | 199 +++++++++
 .../eventhub/admin/PassThroughInterceptor.java  |  33 ++
 .../EventHubIncomingMessageEnvelope.java        |  42 ++
 .../consumer/EventHubSystemConsumer.java        | 401 +++++++++++++++++++
 .../system/eventhub/metrics/SamzaHistogram.java |  62 +++
 .../producer/EventHubSystemProducer.java        | 345 ++++++++++++++++
 .../samza/system/eventhub/MockEventData.java    |  57 +++
 .../MockEventHubClientManagerFactory.java       | 196 +++++++++
 .../eventhub/MockEventHubConfigFactory.java     |  61 +++
 .../system/eventhub/TestMetricsRegistry.java    |  85 ++++
 .../eventhub/admin/TestEventHubSystemAdmin.java | 113 ++++++
 .../consumer/ITestEventHubSystemConsumer.java   |  76 ++++
 .../consumer/TestEventHubSystemConsumer.java    | 272 +++++++++++++
 .../producer/ITestEventHubSystemProducer.java   | 163 ++++++++
 .../producer/TestEventHubSystemProducer.java    | 153 +++++++
 22 files changed, 2754 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/663b8daf/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 3a6f9b6..d9025c4 100644
--- a/build.gradle
+++ b/build.gradle
@@ -192,16 +192,27 @@ project(':samza-azure') {
 
   dependencies {
     compile "com.microsoft.azure:azure-storage:5.3.1"
+    compile "com.microsoft.azure:azure-eventhubs:0.14.5"
     compile "com.fasterxml.jackson.core:jackson-core:2.8.8"
+    compile "io.dropwizard.metrics:metrics-core:3.1.2"
     compile project(':samza-api')
     compile project(":samza-core_$scalaVersion")
     compile "org.slf4j:slf4j-api:$slf4jVersion"
     testCompile "junit:junit:$junitVersion"
+    testCompile "org.mockito:mockito-all:$mockitoVersion"
+    testCompile "org.powermock:powermock-api-mockito:$powerMockVersion"
+    testCompile "org.powermock:powermock-core:$powerMockVersion"
+    testCompile "org.powermock:powermock-module-junit4:$powerMockVersion"
   }
   checkstyle {
     configFile = new File(rootDir, "checkstyle/checkstyle.xml")
     toolVersion = "$checkstyleVersion"
   }
+  test {
+    // Exclude integration tests that require connection to EventHub
+    exclude 'org/apache/samza/system/eventhub/producer/*ITest*'
+    exclude 'org/apache/samza/system/eventhub/consumer/*ITest*'
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/samza/blob/663b8daf/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubClientManager.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubClientManager.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubClientManager.java
new file mode 100644
index 0000000..0b4f18f
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubClientManager.java
@@ -0,0 +1,69 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.eventhub;
+
+import com.microsoft.azure.eventhubs.EventHubClient;
+
+/**
+ * <p>
+ * EventHubClient manager is the interface that must be implemented to wrap the
+ * {@link EventHubClient} with lifecycle hooks for initialization and close.
+ * </p>
+ *
+ * <p>
+ * {@link #init()} should be invoked once during the startup and provides a
+ * hook to perform some initialization before the creation of the underlying
+ * {@link EventHubClient}. {@link #close(long)} is invoked once during shut-down
+ * and can be used to perform clean-ups.
+ * </p>
+ */
+public interface EventHubClientManager {
+  /**
+   * A constant that can be used in the close method's timeout parameter to
+   * denote that the close invocation should block until all the teardown
+   * operations for the {@link EventHubClient} are completed
+   */
+  public static int BLOCK_UNTIL_CLOSE = -1;
+
+  /**
+   * Lifecycle hook to perform initializations for the creation of
+   * the underlying {@link EventHubClient}.
+   */
+  void init();
+
+  /**
+   * Returns the underlying {@link EventHubClient} instance. Multiple invocations
+   * of this method should return the same instance instead of
+   * creating new ones.
+   *
+   * @return EventHub client instance of the wrapper
+   */
+  EventHubClient getEventHubClient();
+
+  /**
+   * Tries to close the {@link EventHubClient} instance within the provided
+   * timeout. Use this method to perform clean-ups after the execution of the
+   * {@link EventHubClient}. Set timeout the {@link #BLOCK_UNTIL_CLOSE} to
+   * block until the client is closed.
+   *
+   * @param timeoutMs Close timeout in Milliseconds
+   */
+  void close(long timeoutMs);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/663b8daf/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubClientManagerFactory.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubClientManagerFactory.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubClientManagerFactory.java
new file mode 100644
index 0000000..0578a50
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubClientManagerFactory.java
@@ -0,0 +1,32 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.eventhub;
+
+public class EventHubClientManagerFactory {
+  public EventHubClientManager getEventHubClientManager(String systemName, String streamName, EventHubConfig config) {
+
+    String eventHubNamespace = config.getStreamNamespace(systemName, streamName);
+    String entityPath = config.getStreamEntityPath(systemName, streamName);
+    String sasKeyName = config.getStreamSasKeyName(systemName, streamName);
+    String sasToken = config.getStreamSasToken(systemName, streamName);
+
+    return new SamzaEventHubClientManager(eventHubNamespace, entityPath, sasKeyName, sasToken);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/663b8daf/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
new file mode 100644
index 0000000..3bc04f8
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
@@ -0,0 +1,181 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.eventhub;
+
+import com.microsoft.azure.eventhubs.EventHubClient;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.system.eventhub.producer.EventHubSystemProducer;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+
+public class EventHubConfig extends MapConfig {
+
+  public static final String CONFIG_STREAM_LIST = "systems.%s.stream.list";
+
+  public static final String CONFIG_STREAM_NAMESPACE = "systems.%s.streams.%s.eventhubs.namespace";
+
+  public static final String CONFIG_STREAM_ENTITYPATH = "systems.%s.streams.%s.eventhubs.entitypath";
+
+  public static final String CONFIG_STREAM_SAS_KEY_NAME = "systems.%s.streams.%s.eventhubs.sas.keyname";
+
+  public static final String CONFIG_STREAM_SAS_TOKEN = "systems.%s.streams.%s.eventhubs.sas.token";
+
+  public static final String CONFIG_STREAM_CONSUMER_GROUP = "systems.%s.streams.%s.eventhubs.consumer.group";
+  public static final String DEFAULT_CONFIG_STREAM_CONSUMER_GROUP = EventHubClient.DEFAULT_CONSUMER_GROUP_NAME;
+
+  public static final String CONFIG_PRODUCER_PARTITION_METHOD = "systems.%s.eventhubs.partition.method";
+  public static final String DEFAULT_CONFIG_PRODUCER_PARTITION_METHOD = EventHubSystemProducer
+          .PartitioningMethod.EVENT_HUB_HASHING.name();
+
+  public static final String CONFIG_SEND_KEY_IN_EVENT_PROPERTIES = "systems.%s.eventhubs.send.key";
+  public static final Boolean DEFAULT_CONFIG_SEND_KEY_IN_EVENT_PROPERTIES = false;
+
+  public static final String CONFIG_FETCH_RUNTIME_INFO_TIMEOUT_MILLIS = "systems.%s.eventhubs.runtime.info.timeout";
+  public static final long DEFAULT_CONFIG_FETCH_RUNTIME_INFO_TIMEOUT_MILLIS = Duration.ofMinutes(1L).toMillis();
+
+  public static final String CONFIG_CONSUMER_BUFFER_CAPACITY = "systems.%s.eventhubs.receive.queue.size";
+  public static final int DEFAULT_CONFIG_CONSUMER_BUFFER_CAPACITY = 100;
+
+
+  public EventHubConfig(Map<String, String> config) {
+    super(config);
+  }
+
+  /**
+   * Get the list of streams that are defined. Each stream has enough
+   * information for connecting to a certain EventHub entity.
+   *
+   * @param systemName name of the system
+   * @return list of stream names
+   */
+  public List<String> getStreams(String systemName) {
+    return getList(String.format(CONFIG_STREAM_LIST, systemName));
+  }
+
+  /**
+   * Get the EventHubs namespace for the stream
+   *
+   * @param systemName name of the system
+   * @param streamName name of stream
+   * @return EventHubs namespace
+   */
+  public String getStreamNamespace(String systemName, String streamName) {
+    return get(String.format(CONFIG_STREAM_NAMESPACE, systemName, streamName));
+  }
+
+  /**
+   * Get the EventHubs entity path (topic name) for the stream
+   *
+   * @param systemName name of the system
+   * @param streamName name of stream
+   * @return EventHubs entity path
+   */
+  public String getStreamEntityPath(String systemName, String streamName) {
+    return get(String.format(CONFIG_STREAM_ENTITYPATH, systemName, streamName));
+  }
+
+  /**
+   * Get the EventHubs SAS (Shared Access Signature) key name for the stream
+   *
+   * @param systemName name of the system
+   * @param streamName name of stream
+   * @return EventHubs SAS key name
+   */
+  public String getStreamSasKeyName(String systemName, String streamName) {
+    return get(String.format(CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName));
+  }
+
+  /**
+   * Get the EventHubs SAS (Shared Access Signature) token for the stream
+   *
+   * @param systemName name of the system
+   * @param streamName name of stream
+   * @return EventHubs SAS token
+   */
+  public String getStreamSasToken(String systemName, String streamName) {
+    return get(String.format(CONFIG_STREAM_SAS_TOKEN, systemName, streamName));
+  }
+
+  /**
+   * Get the EventHubs consumer group used for consumption for the stream
+   *
+   * @param systemName name of the system
+   * @param streamName name of stream
+   * @return EventHubs consumer group
+   */
+  public String getStreamConsumerGroup(String systemName, String streamName) {
+    return get(String.format(CONFIG_STREAM_CONSUMER_GROUP, systemName, streamName), DEFAULT_CONFIG_STREAM_CONSUMER_GROUP);
+  }
+
+  /**
+   * Get the partition method of the systemName. By default partitioning is handed by EventHub.
+   *
+   * @param systemName name of the system
+   * @return The method the producer should use to partition the outgoing data
+   */
+  public EventHubSystemProducer.PartitioningMethod getPartitioningMethod(String systemName) {
+    String partitioningMethod = get(String.format(CONFIG_PRODUCER_PARTITION_METHOD, systemName),
+            DEFAULT_CONFIG_PRODUCER_PARTITION_METHOD);
+    return EventHubSystemProducer.PartitioningMethod.valueOf(partitioningMethod);
+
+  }
+
+  /**
+   * Returns true if the OutgoingMessageEnvelope key should be sent in the outgoing envelope, false otherwise
+   *
+   * @param systemName name of the system
+   * @return Boolean, is send key included
+   */
+  public Boolean getSendKeyInEventProperties(String systemName) {
+    String isSendKeyIncluded = get(String.format(CONFIG_SEND_KEY_IN_EVENT_PROPERTIES, systemName));
+    if (isSendKeyIncluded == null) {
+      return DEFAULT_CONFIG_SEND_KEY_IN_EVENT_PROPERTIES;
+    }
+    return Boolean.valueOf(isSendKeyIncluded);
+  }
+
+  /**
+   * Get the timeout for the getRuntimeInfo request to EventHub client
+   *
+   * @param systemName name of the systems
+   * @return long, timeout in millis for fetching RuntimeInfo
+   */
+  public long getRuntimeInfoWaitTimeMS(String systemName) {
+    return getLong(String.format(CONFIG_FETCH_RUNTIME_INFO_TIMEOUT_MILLIS, systemName),
+            DEFAULT_CONFIG_FETCH_RUNTIME_INFO_TIMEOUT_MILLIS);
+  }
+
+  /**
+   * Get the capacity of the Event Hub consumer buffer - the blocking queue used for storing messages
+   *
+   * @param systemName name of the system
+   * @return int, number of buffered messages per SystemStreamPartition
+   */
+  public int getConsumerBufferCapacity(String systemName) {
+    String bufferCapacity = get(String.format(CONFIG_CONSUMER_BUFFER_CAPACITY, systemName));
+    if (bufferCapacity == null) {
+      return DEFAULT_CONFIG_CONSUMER_BUFFER_CAPACITY;
+    }
+    return Integer.parseInt(bufferCapacity);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/663b8daf/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubSystemFactory.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubSystemFactory.java
new file mode 100644
index 0000000..fd7a99c
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubSystemFactory.java
@@ -0,0 +1,65 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.eventhub;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.eventhub.admin.EventHubSystemAdmin;
+import org.apache.samza.system.eventhub.admin.PassThroughInterceptor;
+import org.apache.samza.system.eventhub.consumer.EventHubSystemConsumer;
+import org.apache.samza.system.eventhub.producer.EventHubSystemProducer;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class EventHubSystemFactory implements SystemFactory {
+
+  private Map<String, Interceptor> getInterceptorsMap(EventHubConfig config, String systemName) {
+    Map<String, Interceptor> interceptors = new HashMap<>();
+    List<String> streamList = config.getStreams(systemName);
+    streamList.forEach((streamName) -> interceptors.put(streamName, new PassThroughInterceptor()));
+    return interceptors;
+  }
+
+  @Override
+  public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
+    EventHubConfig eventHubConfig = new EventHubConfig(config);
+    return new EventHubSystemConsumer(eventHubConfig, systemName, new EventHubClientManagerFactory(),
+            getInterceptorsMap(eventHubConfig, systemName), registry);
+  }
+
+  @Override
+  public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) {
+    EventHubConfig eventHubConfig = new EventHubConfig(config);
+    return new EventHubSystemProducer(eventHubConfig, systemName, new EventHubClientManagerFactory(),
+            getInterceptorsMap(eventHubConfig, systemName),
+            registry);
+  }
+
+  @Override
+  public SystemAdmin getAdmin(String systemName, Config config) {
+    return new EventHubSystemAdmin(systemName, new EventHubConfig(config), new EventHubClientManagerFactory());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/663b8daf/samza-azure/src/main/java/org/apache/samza/system/eventhub/Interceptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/Interceptor.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/Interceptor.java
new file mode 100644
index 0000000..2a6edfb
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/Interceptor.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.eventhub;
+
+/**
+ * Interface that is used for converting bytes to bytes.
+ */
+public interface Interceptor {
+
+  /**
+   * Intercepts and converts bytes.
+   *
+   * @param bytes bytes to be converted
+   * @return Converted array of bytes converted
+   */
+  byte[] intercept(byte[] bytes);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/663b8daf/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java
new file mode 100644
index 0000000..ada5694
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java
@@ -0,0 +1,104 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.eventhub;
+
+import com.microsoft.azure.eventhubs.EventHubClient;
+import com.microsoft.azure.servicebus.ClientConstants;
+import com.microsoft.azure.servicebus.ConnectionStringBuilder;
+import com.microsoft.azure.servicebus.RetryExponential;
+import com.microsoft.azure.servicebus.RetryPolicy;
+import com.microsoft.azure.servicebus.ServiceBusException;
+import org.apache.samza.SamzaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+public class SamzaEventHubClientManager implements EventHubClientManager {
+  private static final Logger LOG = LoggerFactory.getLogger(SamzaEventHubClientManager.class.getName());
+
+  private static final String EVENTHUB_REMOTE_HOST_FORMAT = "%s.servicebus.windows.net";
+
+  private static final Duration MIN_RETRY_BACKOFF = Duration.ofMillis(100);
+  private static final Duration MAX_RETRY_BACKOFF = Duration.ofMillis(11000);
+  private static final int MAX_RETRY_COUNT = 100;
+  private static final String SAMZA_EVENTHUB_RETRY = "SAMZA_CONNECTOR_RETRY";
+
+  private EventHubClient eventHubClient;
+
+  private final String eventHubNamespace;
+  private final String entityPath;
+  private final String sasKeyName;
+  private final String sasKey;
+  private final RetryPolicy retryPolicy;
+
+  public SamzaEventHubClientManager(String eventHubNamespace, String entityPath, String sasKeyName, String sasKey) {
+    this(eventHubNamespace, entityPath, sasKeyName, sasKey,
+            new RetryExponential(MIN_RETRY_BACKOFF, MAX_RETRY_BACKOFF, MAX_RETRY_COUNT, SAMZA_EVENTHUB_RETRY));
+  }
+
+  public SamzaEventHubClientManager(String eventHubNamespace, String entityPath, String sasKeyName, String sasKey,
+                                    RetryPolicy retryPolicy) {
+    this.eventHubNamespace = eventHubNamespace;
+    this.entityPath = entityPath;
+    this.sasKeyName = sasKeyName;
+    this.sasKey = sasKey;
+    this.retryPolicy = retryPolicy;
+  }
+
+  @Override
+  public void init() {
+    String remoteHost = String.format(EVENTHUB_REMOTE_HOST_FORMAT, eventHubNamespace);
+    try {
+      ConnectionStringBuilder connectionStringBuilder =
+              new ConnectionStringBuilder(eventHubNamespace, entityPath, sasKeyName, sasKey);
+
+      eventHubClient = EventHubClient.createFromConnectionStringSync(connectionStringBuilder.toString(), retryPolicy);
+    } catch (IOException | ServiceBusException e) {
+      String msg = String.format("Creation of EventHub client failed for eventHub %s %s %s %s on remote host %s:%d",
+              entityPath, eventHubNamespace, sasKeyName, sasKey, remoteHost, ClientConstants.AMQPS_PORT);
+      LOG.error(msg, e);
+      throw new SamzaException(msg, e);
+    }
+  }
+
+  @Override
+  public EventHubClient getEventHubClient() {
+    return eventHubClient;
+  }
+
+  @Override
+  public void close(long timeoutMS) {
+    try {
+      if (timeoutMS == EventHubClientManager.BLOCK_UNTIL_CLOSE) {
+        eventHubClient.closeSync();
+      } else {
+        CompletableFuture<Void> future = eventHubClient.close();
+        future.get(timeoutMS, TimeUnit.MILLISECONDS);
+      }
+    } catch (Exception e) {
+      LOG.error("Closing the EventHub client failed", e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/663b8daf/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
new file mode 100644
index 0000000..11998a4
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
@@ -0,0 +1,199 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.eventhub.admin;
+
+import com.microsoft.azure.eventhubs.EventHubPartitionRuntimeInformation;
+import com.microsoft.azure.eventhubs.EventHubRuntimeInformation;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.eventhub.EventHubClientManager;
+import org.apache.samza.system.eventhub.EventHubConfig;
+import org.apache.samza.system.eventhub.consumer.EventHubSystemConsumer;
+import org.apache.samza.system.eventhub.EventHubClientManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class EventHubSystemAdmin implements SystemAdmin {
+  private static final Logger LOG = LoggerFactory.getLogger(EventHubSystemAdmin.class);
+  private static final long DEFAULT_SHUTDOWN_TIMEOUT_MILLIS = Duration.ofMinutes(1L).toMillis();
+
+  private final EventHubClientManagerFactory eventHubClientManagerFactory;
+  private final String systemName;
+  private final EventHubConfig eventHubConfig;
+  private final Map<String, EventHubClientManager> eventHubClients = new HashMap<>();
+  private final Map<String, String[]> streamPartitions = new HashMap<>();
+
+  public EventHubSystemAdmin(String systemName, EventHubConfig eventHubConfig,
+                             EventHubClientManagerFactory eventHubClientManagerFactory) {
+    this.systemName = systemName;
+    this.eventHubConfig = eventHubConfig;
+    this.eventHubClientManagerFactory = eventHubClientManagerFactory;
+  }
+
+  private String getNextOffset(String currentOffset) {
+    // EventHub will return the first message AFTER the offset
+    // that was specified in the fetch request.
+    return currentOffset.equals(EventHubSystemConsumer.END_OF_STREAM) ? currentOffset :
+            String.valueOf(Long.parseLong(currentOffset) + 1);
+  }
+
+  @Override
+  public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
+    Map<SystemStreamPartition, String> results = new HashMap<>();
+
+    offsets.forEach((partition, offset) -> results.put(partition, getNextOffset(offset)));
+    return results;
+  }
+
+  @Override
+  public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) {
+    final Map<String, SystemStreamMetadata> requestedMetadata = new HashMap<>();
+    final Map<String, CompletableFuture<EventHubRuntimeInformation>> ehRuntimeInfos = new HashMap<>();
+
+    streamNames.forEach((streamName) -> {
+        if (!streamPartitions.containsKey(streamName)) {
+          EventHubClientManager eventHubClientManager = getOrCreateStreamEventHubClient(streamName);
+          CompletableFuture<EventHubRuntimeInformation> runtimeInfo = eventHubClientManager.getEventHubClient()
+                  .getRuntimeInformation();
+
+          ehRuntimeInfos.put(streamName, runtimeInfo);
+        }
+      });
+
+    try {
+      ehRuntimeInfos.forEach((streamName, ehRuntimeInfo) -> {
+
+          if (!streamPartitions.containsKey(streamName)) {
+            LOG.debug(String.format("Partition ids for Stream=%s not found", streamName));
+            try {
+
+              long timeoutMs = eventHubConfig.getRuntimeInfoWaitTimeMS(systemName);
+              EventHubRuntimeInformation ehInfo = ehRuntimeInfo.get(timeoutMs, TimeUnit.MILLISECONDS);
+
+              LOG.debug(String.format("Adding partition ids for Stream=%s", streamName));
+              streamPartitions.put(streamName, ehInfo.getPartitionIds());
+            } catch (InterruptedException | ExecutionException | TimeoutException e) {
+
+              String msg = String.format("Error while fetching EventHubRuntimeInfo for System:%s, Stream:%s",
+                      systemName, streamName);
+              throw new SamzaException(msg);
+            }
+          }
+
+          String[] partitionIds = streamPartitions.get(streamName);
+          Map<Partition, SystemStreamPartitionMetadata> sspMetadataMap = getPartitionMetadata(streamName, partitionIds);
+          SystemStreamMetadata systemStreamMetadata = new SystemStreamMetadata(streamName, sspMetadataMap);
+
+          requestedMetadata.put(streamName, systemStreamMetadata);
+        });
+
+    } finally {
+
+      // Closing clients
+      eventHubClients.forEach((streamName, client) -> client.close(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS));
+      eventHubClients.clear();
+    }
+
+    return requestedMetadata;
+  }
+
+  private EventHubClientManager getOrCreateStreamEventHubClient(String streamName) {
+    if (!eventHubClients.containsKey(streamName)) {
+      LOG.debug(String.format("Creating EventHubClient for Stream=%s", streamName));
+
+      EventHubClientManager eventHubClientManager = eventHubClientManagerFactory
+              .getEventHubClientManager(systemName, streamName, eventHubConfig);
+
+      eventHubClientManager.init();
+      eventHubClients.put(streamName, eventHubClientManager);
+    }
+    return eventHubClients.get(streamName);
+  }
+
+  private Map<Partition, SystemStreamPartitionMetadata> getPartitionMetadata(String streamName, String[] partitionIds) {
+    EventHubClientManager eventHubClientManager = getOrCreateStreamEventHubClient(streamName);
+    Map<Partition, SystemStreamPartitionMetadata> sspMetadataMap = new HashMap<>();
+    Map<String, CompletableFuture<EventHubPartitionRuntimeInformation>> ehRuntimeInfos = new HashMap<>();
+
+    for (String partition : partitionIds) {
+      CompletableFuture<EventHubPartitionRuntimeInformation> partitionRuntimeInfo = eventHubClientManager
+              .getEventHubClient()
+              .getPartitionRuntimeInformation(partition);
+
+      ehRuntimeInfos.put(partition, partitionRuntimeInfo);
+    }
+
+    ehRuntimeInfos.forEach((partitionId, ehPartitionRuntimeInfo) -> {
+        try {
+          long timeoutMs = eventHubConfig.getRuntimeInfoWaitTimeMS(systemName);
+          EventHubPartitionRuntimeInformation ehPartitionInfo = ehPartitionRuntimeInfo.get(timeoutMs, TimeUnit.MILLISECONDS);
+
+          String startingOffset = EventHubSystemConsumer.START_OF_STREAM;
+          String newestOffset = ehPartitionInfo.getLastEnqueuedOffset();
+          String upcomingOffset = getNextOffset(newestOffset);
+          SystemStreamPartitionMetadata sspMetadata = new SystemStreamPartitionMetadata(startingOffset, newestOffset,
+                  upcomingOffset);
+
+          Partition partition = new Partition(Integer.parseInt(partitionId));
+
+          sspMetadataMap.put(partition, sspMetadata);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+          String msg = String.format(
+                  "Error while fetching EventHubPartitionRuntimeInfo for System:%s, Stream:%s, Partition:%s",
+                  systemName, streamName, partitionId);
+          throw new SamzaException(msg);
+        }
+      });
+    return sspMetadataMap;
+  }
+
+  public static Integer compareOffsets(String offset1, String offset2) {
+    if (offset1 == null || offset2 == null) {
+      return null;
+    }
+    try {
+      if (offset1.equals(EventHubSystemConsumer.END_OF_STREAM)) {
+        return offset2.equals(EventHubSystemConsumer.END_OF_STREAM) ? 0 : 1;
+      }
+      return offset2.equals(EventHubSystemConsumer.END_OF_STREAM) ? -1 :
+              Long.compare(Long.parseLong(offset1), Long.parseLong(offset2));
+    } catch (NumberFormatException exception) {
+      return null;
+    }
+  }
+
+  @Override
+  public Integer offsetComparator(String offset1, String offset2) {
+    return compareOffsets(offset1, offset2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/663b8daf/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/PassThroughInterceptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/PassThroughInterceptor.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/PassThroughInterceptor.java
new file mode 100644
index 0000000..79dfb33
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/PassThroughInterceptor.java
@@ -0,0 +1,33 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.eventhub.admin;
+
+import org.apache.samza.system.eventhub.Interceptor;
+
+/**
+ * An default {@link Interceptor} that is a pass-through.
+ */
+public class PassThroughInterceptor implements Interceptor {
+
+  @Override
+  public byte[] intercept(byte[] bytes) {
+    return bytes;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/663b8daf/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubIncomingMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubIncomingMessageEnvelope.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubIncomingMessageEnvelope.java
new file mode 100644
index 0000000..8aa7480
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubIncomingMessageEnvelope.java
@@ -0,0 +1,42 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.eventhub.consumer;
+
+import com.microsoft.azure.eventhubs.EventData;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+
+/**
+ * Extension of {@link IncomingMessageEnvelope} which contains {@link EventData} system and user properties metadata
+ */
+public class EventHubIncomingMessageEnvelope extends IncomingMessageEnvelope {
+  private EventData eventData;
+
+  public EventHubIncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, String offset, Object key,
+                                         Object message, EventData eventData) {
+    super(systemStreamPartition, offset, key, message);
+
+    this.eventData = eventData;
+  }
+
+  public EventData getEventData() {
+    return eventData;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/663b8daf/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
new file mode 100644
index 0000000..4de34de
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
@@ -0,0 +1,401 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.eventhub.consumer;
+
+import com.microsoft.azure.eventhubs.EventData;
+import com.microsoft.azure.eventhubs.PartitionReceiveHandler;
+import com.microsoft.azure.eventhubs.PartitionReceiver;
+import com.microsoft.azure.servicebus.ServiceBusException;
+import org.apache.samza.SamzaException;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.eventhub.EventHubClientManagerFactory;
+import org.apache.samza.system.eventhub.EventHubClientManager;
+import org.apache.samza.system.eventhub.EventHubConfig;
+import org.apache.samza.system.eventhub.Interceptor;
+import org.apache.samza.system.eventhub.admin.EventHubSystemAdmin;
+import org.apache.samza.system.eventhub.metrics.SamzaHistogram;
+import org.apache.samza.util.BlockingEnvelopeMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+
+/**
+ * Implementation of a system consumer for EventHubs. For each system stream
+ * partition, it registers a handler with the EventHubsClient which constantly
+ * push data into a block queue. This class extends the BlockingEnvelopeMap
+ * provided by samza-api to to simplify the logic around those blocking queues.
+ * <p>
+ * A high level architecture:
+ * <p>
+ * ┌───────────────────────────────────────────────┐
+ * │ EventHubsClient                               │
+ * │                                               │
+ * │   ┌───────────────────────────────────────┐   │        ┌─────────────────────┐
+ * │   │                                       │   │        │                     │
+ * │   │       PartitionReceiveHandler_1       │───┼───────▶│ SSP1-BlockingQueue  ├──────┐
+ * │   │                                       │   │        │                     │      │
+ * │   └───────────────────────────────────────┘   │        └─────────────────────┘      │
+ * │                                               │                                     │
+ * │   ┌───────────────────────────────────────┐   │        ┌─────────────────────┐      │
+ * │   │                                       │   │        │                     │      │
+ * │   │       PartitionReceiveHandler_2       │───┼───────▶│ SSP2-BlockingQueue  ├──────┤        ┌──────────────────────────┐
+ * │   │                                       │   │        │                     │      ├───────▶│                          │
+ * │   └───────────────────────────────────────┘   │        └─────────────────────┘      └───────▶│  SystemConsumer.poll()   │
+ * │                                               │                                     ┌───────▶│                          │
+ * │                                               │                                     │        └──────────────────────────┘
+ * │                      ...                      │                  ...                │
+ * │                                               │                                     │
+ * │                                               │                                     │
+ * │   ┌───────────────────────────────────────┐   │        ┌─────────────────────┐      │
+ * │   │                                       │   │        │                     │      │
+ * │   │       PartitionReceiveHandler_N       │───┼───────▶│ SSPN-BlockingQueue  ├──────┘
+ * │   │                                       │   │        │                     │
+ * │   └───────────────────────────────────────┘   │        └─────────────────────┘
+ * │                                               │
+ * │                                               │
+ * └───────────────────────────────────────────────┘
+ */
+public class EventHubSystemConsumer extends BlockingEnvelopeMap {
+  private static final Logger LOG = LoggerFactory.getLogger(EventHubSystemConsumer.class);
+  private static final int MAX_EVENT_COUNT_PER_PARTITION_POLL = 50;
+
+  // Overall timeout for EventHubClient exponential backoff policy
+  private static final Duration DEFAULT_EVENTHUB_RECEIVER_TIMEOUT = Duration.ofMinutes(10L);
+  private static final long DEFAULT_SHUTDOWN_TIMEOUT_MILLIS = Duration.ofMinutes(1L).toMillis();
+
+  public static final String START_OF_STREAM = PartitionReceiver.START_OF_STREAM; // -1
+  public static final String END_OF_STREAM = "-2";
+  public static final String EVENT_READ_RATE = "eventReadRate";
+  public static final String EVENT_BYTE_READ_RATE = "eventByteReadRate";
+  public static final String READ_LATENCY = "readLatency";
+  public static final String READ_ERRORS = "readErrors";
+  public static final String AGGREGATE = "aggregate";
+
+  private static final Object AGGREGATE_METRICS_LOCK = new Object();
+
+  private static Counter aggEventReadRate = null;
+  private static Counter aggEventByteReadRate = null;
+  private static SamzaHistogram aggReadLatency = null;
+  private static Counter aggReadErrors = null;
+
+  private final Map<String, Counter> eventReadRates;
+  private final Map<String, Counter> eventByteReadRates;
+  private final Map<String, SamzaHistogram> readLatencies;
+  private final Map<String, Counter> readErrors;
+
+  final ConcurrentHashMap<SystemStreamPartition, PartitionReceiveHandler> streamPartitionHandlers = new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<SystemStreamPartition, PartitionReceiver> streamPartitionReceivers = new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, EventHubClientManager> streamEventHubManagers = new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<SystemStreamPartition, String> streamPartitionOffsets = new ConcurrentHashMap<>();
+  private final Map<String, Interceptor> interceptors;
+  private boolean isStarted = false;
+  private final EventHubConfig config;
+  private final String systemName;
+
+  // Partition receiver error propagation
+  private final AtomicReference<Throwable> eventHubHandlerError = new AtomicReference<>(null);
+
+  public EventHubSystemConsumer(EventHubConfig config, String systemName,
+                                EventHubClientManagerFactory eventHubClientManagerFactory,
+                                Map<String, Interceptor> interceptors, MetricsRegistry registry) {
+    super(registry, System::currentTimeMillis);
+
+    this.config = config;
+    this.systemName = systemName;
+    this.interceptors = interceptors;
+    List<String> streamNames = config.getStreams(systemName);
+    // Create and initiate connections to Event Hubs
+    for (String streamName : streamNames) {
+      EventHubClientManager eventHubClientManager = eventHubClientManagerFactory
+              .getEventHubClientManager(systemName, streamName, config);
+      streamEventHubManagers.put(streamName, eventHubClientManager);
+      eventHubClientManager.init();
+    }
+
+    // Initiate metrics
+    eventReadRates = streamNames.stream()
+            .collect(Collectors.toMap(Function.identity(), x -> registry.newCounter(x, EVENT_READ_RATE)));
+    eventByteReadRates = streamNames.stream()
+            .collect(Collectors.toMap(Function.identity(), x -> registry.newCounter(x, EVENT_BYTE_READ_RATE)));
+    readLatencies = streamNames.stream()
+            .collect(Collectors.toMap(Function.identity(), x -> new SamzaHistogram(registry, x, READ_LATENCY)));
+    readErrors = streamNames.stream()
+            .collect(Collectors.toMap(Function.identity(), x -> registry.newCounter(x, READ_ERRORS)));
+
+    // Locking to ensure that these aggregated metrics will be created only once across multiple system consumers.
+    synchronized (AGGREGATE_METRICS_LOCK) {
+      if (aggEventReadRate == null) {
+        aggEventReadRate = registry.newCounter(AGGREGATE, EVENT_READ_RATE);
+        aggEventByteReadRate = registry.newCounter(AGGREGATE, EVENT_BYTE_READ_RATE);
+        aggReadLatency = new SamzaHistogram(registry, AGGREGATE, READ_LATENCY);
+        aggReadErrors = registry.newCounter(AGGREGATE, READ_ERRORS);
+      }
+    }
+  }
+
+  @Override
+  public void register(SystemStreamPartition systemStreamPartition, String offset) {
+    super.register(systemStreamPartition, offset);
+
+    if (isStarted) {
+      throw new SamzaException("Trying to add partition when the connection has already started.");
+    }
+
+    if (streamPartitionOffsets.containsKey(systemStreamPartition)) {
+      String prevOffset = streamPartitionOffsets.get(systemStreamPartition);
+      if (EventHubSystemAdmin.compareOffsets(offset, prevOffset) > -1) {
+        // Only update if new offset is lower than previous offset
+        return;
+      }
+    }
+    streamPartitionOffsets.put(systemStreamPartition, offset);
+  }
+
+  @Override
+  public void start() {
+    isStarted = true;
+    // Create receivers for Event Hubs
+    for (Map.Entry<SystemStreamPartition, String> entry : streamPartitionOffsets.entrySet()) {
+
+      SystemStreamPartition ssp = entry.getKey();
+      String streamName = ssp.getStream();
+      Integer partitionId = ssp.getPartition().getPartitionId();
+      String offset = entry.getValue();
+      String consumerGroup = config.getStreamConsumerGroup(systemName, streamName);
+      String namespace = config.getStreamNamespace(systemName, streamName);
+      String entityPath = config.getStreamEntityPath(systemName, streamName);
+      EventHubClientManager eventHubClientManager = streamEventHubManagers.get(streamName);
+
+      try {
+        PartitionReceiver receiver;
+        if (offset.equals(EventHubSystemConsumer.END_OF_STREAM)) {
+          receiver = eventHubClientManager.getEventHubClient()
+                  .createReceiverSync(consumerGroup, partitionId.toString(), Instant.now());
+        } else {
+          receiver = eventHubClientManager.getEventHubClient()
+                  .createReceiverSync(consumerGroup, partitionId.toString(), offset,
+                          !offset.equals(EventHubSystemConsumer.START_OF_STREAM));
+        }
+
+        PartitionReceiveHandler handler = new PartitionReceiverHandlerImpl(ssp, eventReadRates.get(streamName),
+                eventByteReadRates.get(streamName), readLatencies.get(streamName), readErrors.get(streamName),
+                interceptors.getOrDefault(streamName, null));
+
+
+        // Timeout for EventHubClient receive
+        receiver.setReceiveTimeout(DEFAULT_EVENTHUB_RECEIVER_TIMEOUT);
+
+        // Start the receiver thread
+        receiver.setReceiveHandler(handler);
+
+        streamPartitionHandlers.put(ssp, handler);
+        streamPartitionReceivers.put(ssp, receiver);
+      } catch (Exception e) {
+        throw new SamzaException(
+                String.format("Failed to create receiver for EventHubs: namespace=%s, entity=%s, partitionId=%d",
+                        namespace, entityPath, partitionId), e);
+      }
+      LOG.debug(String.format("Connection successfully started for namespace=%s, entity=%s ", namespace, entityPath));
+
+    }
+  }
+
+  @Override
+  public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
+    Throwable handlerError = eventHubHandlerError.get();
+
+    if (handlerError != null) {
+      if (isErrorTransient(handlerError)) {
+        // Log a warning if the error is transient
+        // Partition receiver handler OnError should have handled it by recreating the receiver
+        LOG.warn("Received a transient error from event hub partition receiver, restarted receiver", handlerError);
+      } else {
+        // Propagate the error to user if the throwable is either
+        // 1. permanent ServiceBusException error from client
+        // 2. SamzaException thrown bu the EventHubConsumer
+        //   2a. Interrupted during put operation to BEM
+        //   2b. Failure in renewing the Partititon Receiver
+        String msg = "Received a non transient error from event hub partition receiver";
+        throw new SamzaException(msg, handlerError);
+      }
+    }
+
+    return super.poll(systemStreamPartitions, timeout);
+  }
+
+  private void renewPartitionReceiver(SystemStreamPartition ssp) {
+
+    EventHubClientManager eventHubClientManager = streamEventHubManagers.get(ssp.getStream());
+    String offset = streamPartitionOffsets.get(ssp);
+    Integer partitionId = ssp.getPartition().getPartitionId();
+    String consumerGroup = config.getStreamConsumerGroup(ssp.getSystem(), ssp.getStream());
+
+    try {
+      // Close current receiver
+      streamPartitionReceivers.get(ssp).close().get(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+
+      // Recreate receiver
+      PartitionReceiver receiver = eventHubClientManager.getEventHubClient()
+              .createReceiverSync(consumerGroup, partitionId.toString(), offset,
+                      !offset.equals(EventHubSystemConsumer.START_OF_STREAM));
+
+      // Timeout for EventHubClient receive
+      receiver.setReceiveTimeout(DEFAULT_EVENTHUB_RECEIVER_TIMEOUT);
+
+      // Create and start receiver thread with handler
+      receiver.setReceiveHandler(streamPartitionHandlers.get(ssp));
+      streamPartitionReceivers.put(ssp, receiver);
+
+    } catch (Exception e) {
+      eventHubHandlerError.set(new SamzaException(
+              String.format("Failed to recreate receiver for EventHubs after ReceiverHandlerError (ssp=%s)", ssp), e));
+    }
+  }
+
+  @Override
+  public void stop() {
+    LOG.debug("Stopping event hub system consumer...");
+    List<CompletableFuture<Void>> futures = new ArrayList<>();
+    streamPartitionReceivers.values().forEach((receiver) -> futures.add(receiver.close()));
+    CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
+    try {
+      future.get(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+    } catch (ExecutionException | InterruptedException | TimeoutException e) {
+      LOG.warn("Failed to close receivers", e);
+    }
+    streamEventHubManagers.values().forEach(ehClientManager -> ehClientManager.close(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS));
+  }
+
+  private boolean isErrorTransient(Throwable throwable) {
+    if (throwable instanceof ServiceBusException) {
+      ServiceBusException serviceBusException = (ServiceBusException) throwable;
+      return serviceBusException.getIsTransient();
+    }
+    return false;
+  }
+
+  @Override
+  protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
+    return new LinkedBlockingQueue<>(config.getConsumerBufferCapacity(systemName));
+  }
+
+  protected class PartitionReceiverHandlerImpl extends PartitionReceiveHandler {
+
+    private final Counter eventReadRate;
+    private final Counter eventByteReadRate;
+    private final SamzaHistogram readLatency;
+    private final Counter errorRate;
+    private final Interceptor interceptor;
+    SystemStreamPartition ssp;
+
+    PartitionReceiverHandlerImpl(SystemStreamPartition ssp, Counter eventReadRate, Counter eventByteReadRate,
+                                 SamzaHistogram readLatency, Counter readErrors, Interceptor interceptor) {
+      super(MAX_EVENT_COUNT_PER_PARTITION_POLL);
+      this.ssp = ssp;
+      this.eventReadRate = eventReadRate;
+      this.eventByteReadRate = eventByteReadRate;
+      this.readLatency = readLatency;
+      this.errorRate = readErrors;
+      this.interceptor = interceptor;
+    }
+
+    @Override
+    public void onReceive(Iterable<EventData> events) {
+      if (events != null) {
+        events.forEach(event -> {
+            byte[] eventDataBody = event.getBytes();
+            if (interceptor != null) {
+              eventDataBody = interceptor.intercept(eventDataBody);
+            }
+            String offset = event.getSystemProperties().getOffset();
+            Object partitionKey = event.getSystemProperties().getPartitionKey();
+            try {
+              updateMetrics(event);
+
+              // note that the partition key can be null
+              put(ssp, new EventHubIncomingMessageEnvelope(ssp, offset, partitionKey, eventDataBody, event));
+            } catch (InterruptedException e) {
+              String msg = String.format("Interrupted while adding the event from ssp %s to dispatch queue.", ssp);
+              LOG.error(msg, e);
+              throw new SamzaException(msg, e);
+            }
+
+            // Cache latest checkpoint
+            streamPartitionOffsets.put(ssp, offset);
+          });
+      }
+    }
+
+    private void updateMetrics(EventData event) {
+      int eventDataLength = event.getBytes() == null ? 0 : event.getBytes().length;
+      eventReadRate.inc();
+      aggEventReadRate.inc();
+      eventByteReadRate.inc(eventDataLength);
+      aggEventByteReadRate.inc(eventDataLength);
+
+      long latencyMs = Duration.between(Instant.now(), event.getSystemProperties().getEnqueuedTime()).toMillis();
+      readLatency.update(latencyMs);
+      aggReadLatency.update(latencyMs);
+    }
+
+    @Override
+    public void onError(Throwable throwable) {
+      errorRate.inc();
+      aggReadErrors.inc();
+
+      if (throwable instanceof ServiceBusException) {
+        ServiceBusException busException = (ServiceBusException) throwable;
+
+        if (busException.getIsTransient()) {
+
+          // Only set to transient throwable if there has been no previous errors
+          eventHubHandlerError.compareAndSet(null, throwable);
+
+          // Retry creating a receiver since error likely due to timeout
+          renewPartitionReceiver(ssp);
+          return;
+        }
+      }
+
+      // Propagate non transient or unknown errors
+      eventHubHandlerError.set(throwable);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/663b8daf/samza-azure/src/main/java/org/apache/samza/system/eventhub/metrics/SamzaHistogram.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/metrics/SamzaHistogram.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/metrics/SamzaHistogram.java
new file mode 100644
index 0000000..7d6d408
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/metrics/SamzaHistogram.java
@@ -0,0 +1,62 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.eventhub.metrics;
+
+import com.codahale.metrics.ExponentiallyDecayingReservoir;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Snapshot;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsRegistry;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Creates a {@link Histogram} metric using {@link ExponentiallyDecayingReservoir}
+ * Keeps a {@link Gauge} for each percentile
+ */
+public class SamzaHistogram {
+  private static final List<Double> DEFAULT_HISTOGRAM_PERCENTILES = Arrays.asList(50D, 99D);
+  private final Histogram histogram;
+  private final List<Double> percentiles;
+  private final Map<Double, Gauge<Double>> gauges;
+
+  public SamzaHistogram(MetricsRegistry registry, String group, String name) {
+    this(registry, group, name, DEFAULT_HISTOGRAM_PERCENTILES);
+  }
+
+  public SamzaHistogram(MetricsRegistry registry, String group, String name, List<Double> percentiles) {
+    this.histogram = new Histogram(new ExponentiallyDecayingReservoir());
+    this.percentiles = percentiles;
+    this.gauges = this.percentiles.stream()
+            .filter(x -> x > 0 && x <= 100)
+            .collect(
+                    Collectors.toMap(Function.identity(), x -> registry.newGauge(group, name + "_" + String.valueOf(0), 0D)));
+  }
+
+  public void update(long value) {
+    histogram.update(value);
+    Snapshot values = histogram.getSnapshot();
+    percentiles.forEach(x -> gauges.get(x).set(values.getValue(x / 100)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/663b8daf/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
new file mode 100644
index 0000000..c8c5538
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
@@ -0,0 +1,345 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.eventhub.producer;
+
+import com.microsoft.azure.eventhubs.EventData;
+import com.microsoft.azure.eventhubs.EventHubClient;
+import com.microsoft.azure.eventhubs.PartitionSender;
+import com.microsoft.azure.servicebus.ServiceBusException;
+import org.apache.samza.SamzaException;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.eventhub.EventHubClientManager;
+import org.apache.samza.system.eventhub.EventHubConfig;
+import org.apache.samza.system.eventhub.EventHubClientManagerFactory;
+import org.apache.samza.system.eventhub.Interceptor;
+import org.apache.samza.system.eventhub.metrics.SamzaHistogram;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class EventHubSystemProducer implements SystemProducer {
+  private static final Logger LOG = LoggerFactory.getLogger(EventHubSystemProducer.class.getName());
+  private static final long DEFAULT_SHUTDOWN_TIMEOUT_MILLIS = Duration.ofMinutes(1L).toMillis();
+  private static final long DEFAULT_FLUSH_TIMEOUT_MILLIS = Duration.ofMinutes(1L).toMillis();
+
+  public static final String PRODUCE_TIMESTAMP = "produce-timestamp";
+
+  // Metrics recording
+  private static final String EVENT_WRITE_RATE = "eventWriteRate";
+  private static final String EVENT_BYTE_WRITE_RATE = "eventByteWriteRate";
+  private static final String SEND_ERRORS = "sendErrors";
+  private static final String SEND_LATENCY = "sendLatency";
+  private static final String SEND_CALLBACK_LATENCY = "sendCallbackLatency";
+  private static Counter aggEventWriteRate = null;
+  private static Counter aggEventByteWriteRate = null;
+  private static Counter aggSendErrors = null;
+  private static SamzaHistogram aggSendLatency = null;
+  private static SamzaHistogram aggSendCallbackLatency = null;
+  private static final String AGGREGATE = "aggregate";
+
+  private static final Object AGGREGATE_METRICS_LOCK = new Object();
+
+  public enum PartitioningMethod {
+    EVENT_HUB_HASHING,
+    PARTITION_KEY_AS_PARTITION,
+  }
+
+  private final HashMap<String, Counter> eventWriteRate = new HashMap<>();
+  private final HashMap<String, Counter> eventByteWriteRate = new HashMap<>();
+  private final HashMap<String, SamzaHistogram> sendLatency = new HashMap<>();
+  private final HashMap<String, SamzaHistogram> sendCallbackLatency = new HashMap<>();
+  private final HashMap<String, Counter> sendErrors = new HashMap<>();
+
+  private final EventHubClientManagerFactory eventHubClientManagerFactory;
+  private final EventHubConfig config;
+  private final MetricsRegistry registry;
+  private final PartitioningMethod partitioningMethod;
+  private final String systemName;
+
+  private final AtomicReference<Throwable> sendExceptionOnCallback = new AtomicReference<>(null);
+  private volatile boolean isStarted = false;
+
+  // Map of the system name to the event hub client.
+  private final Map<String, EventHubClientManager> eventHubClients = new HashMap<>();
+  private final Map<String, Map<Integer, PartitionSender>> streamPartitionSenders = new HashMap<>();
+
+  private final Map<String, Interceptor> interceptors;
+
+  private final Set<CompletableFuture<Void>> pendingFutures = ConcurrentHashMap.newKeySet();
+
+  public EventHubSystemProducer(EventHubConfig config, String systemName,
+                                EventHubClientManagerFactory eventHubClientManagerFactory,
+                                Map<String, Interceptor> interceptors, MetricsRegistry registry) {
+    this.config = config;
+    this.registry = registry;
+    this.systemName = systemName;
+    this.partitioningMethod = config.getPartitioningMethod(systemName);
+    this.eventHubClientManagerFactory = eventHubClientManagerFactory;
+    this.interceptors = interceptors;
+  }
+
+  @Override
+  public synchronized void register(String streamName) {
+    LOG.debug("Trying to register {}.", streamName);
+    if (isStarted) {
+      String msg = "Cannot register once the producer is started.";
+      throw new SamzaException(msg);
+    }
+
+    if (eventHubClients.containsKey(streamName)) {
+      LOG.warn("Already registered stream {}.", streamName);
+      return;
+    }
+
+    EventHubClientManager ehClient = eventHubClientManagerFactory.getEventHubClientManager(systemName, streamName, config);
+
+    ehClient.init();
+    eventHubClients.put(streamName, ehClient);
+  }
+
+  @Override
+  public synchronized void start() {
+    LOG.debug("Starting system producer.");
+
+    if (PartitioningMethod.PARTITION_KEY_AS_PARTITION.equals(partitioningMethod)) {
+      // Create all partition senders
+      eventHubClients.forEach((streamName, samzaEventHubClient) -> {
+          EventHubClient ehClient = samzaEventHubClient.getEventHubClient();
+
+          try {
+            Map<Integer, PartitionSender> partitionSenders = new HashMap<>();
+            long timeoutMs = config.getRuntimeInfoWaitTimeMS(systemName);
+            Integer numPartitions = ehClient.getRuntimeInformation().get(timeoutMs, TimeUnit.MILLISECONDS)
+                    .getPartitionCount();
+
+            for (int i = 0; i < numPartitions; i++) { // 32 partitions max
+              String partitionId = String.valueOf(i);
+              PartitionSender partitionSender = ehClient.createPartitionSenderSync(partitionId);
+              partitionSenders.put(i, partitionSender);
+            }
+
+            streamPartitionSenders.put(streamName, partitionSenders);
+          } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            String msg = "Failed to fetch number of Event Hub partitions for partition sender creation";
+            throw new SamzaException(msg, e);
+          } catch (ServiceBusException | IllegalArgumentException e) {
+            String msg = "Creation of partition sender failed with exception";
+            throw new SamzaException(msg, e);
+          }
+        });
+    }
+
+    for (String eventHub : eventHubClients.keySet()) {
+      eventWriteRate.put(eventHub, registry.newCounter(eventHub, EVENT_WRITE_RATE));
+      eventByteWriteRate.put(eventHub, registry.newCounter(eventHub, EVENT_BYTE_WRITE_RATE));
+      sendLatency.put(eventHub, new SamzaHistogram(registry, eventHub, SEND_LATENCY));
+      sendCallbackLatency.put(eventHub, new SamzaHistogram(registry, eventHub, SEND_CALLBACK_LATENCY));
+      sendErrors.put(eventHub, registry.newCounter(eventHub, SEND_ERRORS));
+    }
+
+    // Locking to ensure that these aggregated metrics will be created only once across multiple system consumers.
+    synchronized (AGGREGATE_METRICS_LOCK) {
+      if (aggEventWriteRate == null) {
+        aggEventWriteRate = registry.newCounter(AGGREGATE, EVENT_WRITE_RATE);
+        aggEventByteWriteRate = registry.newCounter(AGGREGATE, EVENT_BYTE_WRITE_RATE);
+        aggSendLatency = new SamzaHistogram(registry, AGGREGATE, SEND_LATENCY);
+        aggSendCallbackLatency = new SamzaHistogram(registry, AGGREGATE, SEND_CALLBACK_LATENCY);
+        aggSendErrors = registry.newCounter(AGGREGATE, SEND_ERRORS);
+      }
+    }
+
+    isStarted = true;
+  }
+
+  @Override
+  public synchronized void send(String destination, OutgoingMessageEnvelope envelope) {
+    if (!isStarted) {
+      throw new SamzaException("Trying to call send before the producer is started.");
+    }
+
+    if (!eventHubClients.containsKey(destination)) {
+      String msg = String.format("Trying to send event to a destination {%s} that is not registered.", destination);
+      throw new SamzaException(msg);
+    }
+
+    checkCallbackThrowable("Received exception on message send");
+
+    EventData eventData = createEventData(destination, envelope);
+    int eventDataLength =  eventData.getBytes() == null ? 0 : eventData.getBytes().length;
+    eventWriteRate.get(destination).inc();
+    aggEventWriteRate.inc();
+    eventByteWriteRate.get(destination).inc(eventDataLength);
+    aggEventByteWriteRate.inc(eventDataLength);
+    EventHubClientManager ehClient = eventHubClients.get(destination);
+
+    long beforeSendTimeMs = System.currentTimeMillis();
+
+    // Async send call
+    CompletableFuture<Void> sendResult = sendToEventHub(destination, eventData, getEnvelopePartitionId(envelope),
+            ehClient.getEventHubClient());
+
+    long afterSendTimeMs = System.currentTimeMillis();
+    long latencyMs = afterSendTimeMs - beforeSendTimeMs;
+    sendLatency.get(destination).update(latencyMs);
+    aggSendLatency.update(latencyMs);
+
+    pendingFutures.add(sendResult);
+
+    // Auto update the metrics and possible throwable when futures are complete.
+    sendResult.handle((aVoid, throwable) -> {
+        long callbackLatencyMs = System.currentTimeMillis() - afterSendTimeMs;
+        sendCallbackLatency.get(destination).update(callbackLatencyMs);
+        aggSendCallbackLatency.update(callbackLatencyMs);
+        if (throwable != null) {
+          sendErrors.get(destination).inc();
+          aggSendErrors.inc();
+          LOG.error("Send message to event hub: {} failed with exception: ", destination, throwable);
+          sendExceptionOnCallback.compareAndSet(null, throwable);
+        }
+        return aVoid;
+      });
+  }
+
+  private CompletableFuture<Void> sendToEventHub(String streamName, EventData eventData, Object partitionKey,
+                                                 EventHubClient eventHubClient) {
+    if (partitioningMethod == PartitioningMethod.EVENT_HUB_HASHING) {
+      return eventHubClient.send(eventData, convertPartitionKeyToString(partitionKey));
+    } else if (partitioningMethod == PartitioningMethod.PARTITION_KEY_AS_PARTITION) {
+      if (!(partitionKey instanceof Integer)) {
+        String msg = "Partition key should be of type Integer";
+        throw new SamzaException(msg);
+      }
+
+      Integer numPartition = streamPartitionSenders.get(streamName).size();
+      Integer destinationPartition = (Integer) partitionKey % numPartition;
+
+      PartitionSender sender = streamPartitionSenders.get(streamName).get(destinationPartition);
+      return sender.send(eventData);
+    } else {
+      throw new SamzaException("Unknown partitioning method " + partitioningMethod);
+    }
+  }
+
+  protected Object getEnvelopePartitionId(OutgoingMessageEnvelope envelope) {
+    return envelope.getPartitionKey();
+  }
+
+  private String convertPartitionKeyToString(Object partitionKey) {
+    if (partitionKey instanceof String) {
+      return (String) partitionKey;
+    } else if (partitionKey instanceof Integer) {
+      return String.valueOf(partitionKey);
+    } else if (partitionKey instanceof byte[]) {
+      return new String((byte[]) partitionKey, Charset.defaultCharset());
+    } else {
+      throw new SamzaException("Unsupported key type: " + partitionKey.getClass().toString());
+    }
+  }
+
+  private EventData createEventData(String streamName, OutgoingMessageEnvelope envelope) {
+    Optional<Interceptor> interceptor = Optional.ofNullable(interceptors.getOrDefault(streamName, null));
+    byte[] eventValue = (byte[]) envelope.getMessage();
+    if (interceptor.isPresent()) {
+      eventValue = interceptor.get().intercept(eventValue);
+    }
+
+    EventData eventData = new EventData(eventValue);
+
+    eventData.getProperties().put(PRODUCE_TIMESTAMP, Long.toString(System.currentTimeMillis()));
+
+    if (config.getSendKeyInEventProperties(systemName)) {
+      String keyValue = "";
+      if (envelope.getKey() != null) {
+        keyValue = (envelope.getKey() instanceof byte[]) ? new String((byte[]) envelope.getKey())
+                : envelope.getKey().toString();
+      }
+      eventData.getProperties().put("key", keyValue);
+    }
+    return eventData;
+  }
+
+  @Override
+  public synchronized void flush(String source) {
+    LOG.debug("Trying to flush pending {} sends messages.", pendingFutures.size());
+    checkCallbackThrowable("Received exception on message send");
+
+    CompletableFuture<Void> future = CompletableFuture
+              .allOf(pendingFutures.toArray(new CompletableFuture[pendingFutures.size()]));
+
+    try {
+      // Block until all the pending sends are complete or timeout.
+      future.get(DEFAULT_FLUSH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException | ExecutionException | TimeoutException e) {
+      String msg = "Flush failed with error";
+      throw new SamzaException(msg, e);
+    }
+
+    checkCallbackThrowable("Sending one or more of the messages failed during flush");
+  }
+
+  private void checkCallbackThrowable(String msg) {
+    // Check for send errors from EventHub side
+    Throwable sendThrowable = sendExceptionOnCallback.get();
+    if (sendThrowable != null) {
+      throw new SamzaException(msg, sendThrowable);
+    }
+    pendingFutures.clear();
+  }
+
+  @Override
+  public synchronized void stop() {
+    LOG.debug("Stopping producer.", pendingFutures.size());
+
+    streamPartitionSenders.values().forEach((streamPartitionSender) -> {
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        streamPartitionSender.forEach((key, value) -> futures.add(value.close()));
+        CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
+        try {
+          future.get(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException | InterruptedException | TimeoutException e) {
+          LOG.error("Closing the partition sender failed ", e);
+        }
+      });
+    eventHubClients.values().forEach(ehClient -> ehClient.close(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS));
+    eventHubClients.clear();
+  }
+
+  Collection<CompletableFuture<Void>> getPendingFutures() {
+    return pendingFutures;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/663b8daf/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventData.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventData.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventData.java
new file mode 100644
index 0000000..b5b55dc
--- /dev/null
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventData.java
@@ -0,0 +1,57 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.eventhub;
+
+import com.microsoft.azure.eventhubs.EventData;
+
+import java.nio.charset.Charset;
+import java.util.*;
+
+public class MockEventData extends EventData {
+
+  private EventData.SystemProperties overridedSystemProperties;
+
+  private MockEventData(byte[] data, String partitionKey, String offset) {
+    super(data);
+    HashMap<String, Object> properties = new HashMap<>();
+    properties.put("x-opt-offset", offset);
+    properties.put("x-opt-partition-key", partitionKey);
+    properties.put("x-opt-enqueued-time", new Date(System.currentTimeMillis()));
+    overridedSystemProperties = new SystemProperties(properties);
+  }
+
+  public static List<EventData> generateEventData(int numEvents) {
+    Random rand = new Random(System.currentTimeMillis());
+    List<EventData> result = new ArrayList<>();
+    for (int i = 0; i < numEvents; i++) {
+      String key = "key_" + rand.nextInt();
+      String message = "message:" + rand.nextInt();
+      String offset = "offset_" + i;
+      EventData eventData = new MockEventData(message.getBytes(Charset.defaultCharset()), key, offset);
+      result.add(eventData);
+    }
+    return result;
+  }
+
+  @Override
+  public EventData.SystemProperties getSystemProperties() {
+    return overridedSystemProperties;
+  }
+}