You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/04/10 22:39:24 UTC

samza git commit: Upgrade to latest event hub version (1.0.1)

Repository: samza
Updated Branches:
  refs/heads/master 8e000f398 -> fda1e37d0


Upgrade to latest event hub version (1.0.1)

* Upgrade to the latest event hub version 1.0.1
* Adding configs for prefetchCount and maxEventPerPoll
* Fix the high cpu usage issue in SamzaHistogram
* Fixing a race condition in event hub system producer where the future was getting removed while it was being checked for completion resulting in NPE.

Author: Srinivasulu Punuru <sp...@linkedin.com>

Reviewers: Prateek Maheshwari <pm...@apache.org>

Closes #467 from srinipunuru/upgrade-eh-1.0.1


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/fda1e37d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/fda1e37d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/fda1e37d

Branch: refs/heads/master
Commit: fda1e37d0862493cb59ed0907c597a2dc973ef6f
Parents: 8e000f3
Author: Srinivasulu Punuru <sp...@linkedin.com>
Authored: Tue Apr 10 15:39:21 2018 -0700
Committer: Prateek Maheshwari <pm...@linkedin.com>
Committed: Tue Apr 10 15:39:21 2018 -0700

----------------------------------------------------------------------
 build.gradle                                    |   2 +-
 .../versioned/jobs/configuration-table.html     |  28 ++-
 .../eventhub/EventHubClientManagerFactory.java  |   3 +-
 .../samza/system/eventhub/EventHubConfig.java   |  38 ++++
 .../eventhub/SamzaEventHubClientManager.java    |  37 ++--
 .../eventhub/admin/EventHubSystemAdmin.java     |   8 +-
 .../consumer/EventHubSystemConsumer.java        | 179 ++++++++++---------
 .../system/eventhub/metrics/SamzaHistogram.java |  69 ++++---
 .../eventhub/producer/AsyncSystemProducer.java  |   1 -
 .../producer/EventHubSystemProducer.java        |   7 +-
 .../samza/system/eventhub/MockEventData.java    |  21 ++-
 .../MockEventHubClientManagerFactory.java       |  25 ++-
 .../system/eventhub/TestMetricsRegistry.java    |   7 +-
 .../consumer/TestEventHubSystemConsumer.java    |   6 +-
 .../producer/ITestEventHubSystemProducer.java   |  10 +-
 .../producer/TestEventHubSystemProducer.java    |   4 +-
 .../samza/tools/EventHubConsoleConsumer.java    |  64 ++++---
 17 files changed, 322 insertions(+), 187 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index d96ec96..2f27a03 100644
--- a/build.gradle
+++ b/build.gradle
@@ -202,7 +202,7 @@ project(':samza-azure') {
 
   dependencies {
     compile "com.microsoft.azure:azure-storage:5.3.1"
-    compile "com.microsoft.azure:azure-eventhubs:0.14.5"
+    compile "com.microsoft.azure:azure-eventhubs:1.0.1"
     compile "com.fasterxml.jackson.core:jackson-core:2.8.8"
     compile "io.dropwizard.metrics:metrics-core:3.1.2"
     compile project(':samza-api')

http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 5c41596..4495cb1 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -2279,30 +2279,48 @@
                 </tr>
 
                 <tr>
-                    <td class="property" id="eventhub-stream-namespace">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-id</span>.<br>eventhubs.namespace</td>
+                    <td class="property" id="eventhub-stream-namespace">streams.<span class="stream">stream-id</span>.<br>eventhubs.namespace</td>
                     <td class="default"></td>
                     <td class="description">Namespace of the associated <span class="stream">stream-ids</span>. Required to access the Eventhubs entity per stream.</td>
                 </tr>
 
                 <tr>
-                    <td class="property" id="eventhub-stream-entity">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-id</span>.<br>eventhubs.entitypath</td>
+                    <td class="property" id="eventhub-stream-entity">streams.<span class="stream">stream-id</span>.<br>eventhubs.entitypath</td>
                     <td class="default"></td>
                     <td class="description">Entity of the associated <span class="stream">stream-ids</span>. Required to access the Eventhubs entity per stream.</td>
                 </tr>
 
                 <tr>
-                    <td class="property" id="eventhub-stream-sas-keyname">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-id</span>.<br>eventhubs.sas.keyname</td>
+                    <td class="property" id="eventhub-stream-sas-keyname">sensitive.streams.<span class="stream">stream-id</span>.<br>eventhubs.sas.keyname</td>
                     <td class="default"></td>
                     <td class="description">SAS Keyname of the associated <span class="stream">stream-ids</span>. Required to access the Eventhubs entity per stream.</td>
                 </tr>
 
                 <tr>
-                    <td class="property" id="eventhub-stream-sas-token">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-id</span>.<br>eventhubs.sas.token</td>
+                    <td class="property" id="eventhub-stream-sas-token">sensitive.streams.<span class="stream">stream-id</span>.<br>eventhubs.sas.token</td>
                     <td class="default"></td>
                     <td class="description">SAS Token the associated <span class="stream">stream-ids</span>. Required to access the Eventhubs entity per stream.</td>
                 </tr>
 
                 <tr>
+                    <td class="property" id="eventhub-client-threads">streams.<span class="system">stream-name</span>.<br>eventhubs.numClientThreads</td>
+                    <td class="default">10</td>
+                    <td class="description">Number of threads in thread pool that will be used by the EventHubClient. See <a href="https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_hub_client.create">here </a> for more details.</td>
+                </tr>
+
+                <tr>
+                    <td class="property" id="eventhub-prefetch-count">systems.<span class="system">system-name</span>.<br>eventhubs.prefetchCount</td>
+                    <td class="default">999</td>
+                    <td class="description">Number of events that EventHub client should prefetch from the server. See <a href="https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._partition_receiver.setprefetchcount">here </a> for more details.</td>
+                </tr>
+
+                <tr>
+                    <td class="property" id="eventhub-max-event-count">systems.<span class="system">system-name</span>.<br>eventhubs.maxEventCountPerPoll</td>
+                    <td class="default">50</td>
+                    <td class="description">Maximum number of events that EventHub client can return in a receive call. See <a href="https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._partition_receive_handler.getmaxeventcount#com_microsoft_azure_eventhubs__partition_receive_handler_getMaxEventCount__">here </a> for more details.</td>
+                </tr>
+
+                <tr>
                     <td class="property" id="eventhub-runtime-timeout">systems.<span class="system">system-name</span>.<br>eventhubs.runtime.info.timeout</td>
                     <td class="default">60000</td>
                     <td class="description">Timeout for fetching the runtime metadata from an Eventhub entity on startup in millis.</td>
@@ -2333,7 +2351,7 @@
                 </tr>
 
                 <tr>
-                    <td class="property" id="eventhub-consumer-group">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-id</span>.<br>eventhubs.consumer.group</td>
+                    <td class="property" id="eventhub-consumer-group">streams.<span class="stream">stream-id</span>.<br>eventhubs.consumer.group</td>
                     <td class="default"><code>$Default</code></td>
                     <td class="description">
                         Consumer only config. Set the consumer group from the upstream Eventhub that the consumer is part of. Defaults to the <code>$Default</code> group that is initially present in all Eventhub entities (unless removed)

http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubClientManagerFactory.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubClientManagerFactory.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubClientManagerFactory.java
index 0578a50..879a07f 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubClientManagerFactory.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubClientManagerFactory.java
@@ -26,7 +26,8 @@ public class EventHubClientManagerFactory {
     String entityPath = config.getStreamEntityPath(systemName, streamName);
     String sasKeyName = config.getStreamSasKeyName(systemName, streamName);
     String sasToken = config.getStreamSasToken(systemName, streamName);
+    int numClientThreads = config.getNumClientThreads(systemName);
 
-    return new SamzaEventHubClientManager(eventHubNamespace, entityPath, sasKeyName, sasToken);
+    return new SamzaEventHubClientManager(eventHubNamespace, entityPath, sasKeyName, sasToken, numClientThreads);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
index 7df92c0..6639dd8 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
@@ -20,6 +20,7 @@
 package org.apache.samza.system.eventhub;
 
 import com.microsoft.azure.eventhubs.EventHubClient;
+import com.microsoft.azure.eventhubs.PartitionReceiver;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
@@ -49,6 +50,15 @@ public class EventHubConfig extends MapConfig {
   public static final String CONFIG_STREAM_CONSUMER_GROUP = "streams.%s.eventhubs.consumer.group";
   public static final String DEFAULT_CONFIG_STREAM_CONSUMER_GROUP = EventHubClient.DEFAULT_CONSUMER_GROUP_NAME;
 
+  public static final String CONFIG_SYSTEM_NUM_CLIENT_THREADS = "streams.%s.eventhubs.numClientThreads";
+  public static final int DEFAULT_CONFIG_SYSTEM_NUM_CLIENT_THREADS = 10;
+
+  public static final String CONFIG_PREFETCH_COUNT = "systems.%s.eventhubs.prefetchCount";
+  public static final int DEFAULT_CONFIG_PREFETCH_COUNT = PartitionReceiver.DEFAULT_PREFETCH_COUNT;
+
+  public static final String CONFIG_MAX_EVENT_COUNT_PER_POLL = "systems.%s.eventhubs.maxEventCountPerPoll";
+  public static final int DEFAULT_CONFIG_MAX_EVENT_COUNT_PER_POLL = 50;
+
   public static final String CONFIG_PRODUCER_PARTITION_METHOD = "systems.%s.eventhubs.partition.method";
   public static final String DEFAULT_CONFIG_PRODUCER_PARTITION_METHOD = EventHubSystemProducer
           .PartitioningMethod.EVENT_HUB_HASHING.name();
@@ -144,6 +154,34 @@ public class EventHubConfig extends MapConfig {
   }
 
   /**
+   * Get the number of client threads, This is used to create the ThreadPool executor that is passed to the
+   * {@link EventHubClient#create}
+   * @param systemName Name of the system.
+   * @return Num of client threads to use.
+   */
+  public Integer getNumClientThreads(String systemName) {
+    return getInt(String.format(CONFIG_SYSTEM_NUM_CLIENT_THREADS, systemName), DEFAULT_CONFIG_SYSTEM_NUM_CLIENT_THREADS);
+  }
+
+  /**
+   * Get the max event count returned per poll
+   * @param systemName Name of the system
+   * @return Max number of events returned per poll
+   */
+  public Integer getMaxEventCountPerPoll(String systemName) {
+    return getInt(String.format(CONFIG_MAX_EVENT_COUNT_PER_POLL, systemName), DEFAULT_CONFIG_MAX_EVENT_COUNT_PER_POLL);
+  }
+
+  /**
+   * Get the per partition prefetch count for the event hub client
+   * @param systemName Name of the system.
+   * @return Per partition Prefetch count for the event hub client.
+   */
+  public Integer getPrefetchCount(String systemName) {
+    return getInt(String.format(CONFIG_PREFETCH_COUNT, systemName), DEFAULT_CONFIG_PREFETCH_COUNT);
+  }
+
+  /**
    * Get the EventHubs max Message size
    *
    * @param systemName name of the system

http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java
index 977e252..a884a79 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java
@@ -19,12 +19,15 @@
 
 package org.apache.samza.system.eventhub;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.microsoft.azure.eventhubs.EventHubClient;
-import com.microsoft.azure.servicebus.ClientConstants;
-import com.microsoft.azure.servicebus.ConnectionStringBuilder;
-import com.microsoft.azure.servicebus.RetryExponential;
-import com.microsoft.azure.servicebus.RetryPolicy;
-import com.microsoft.azure.servicebus.ServiceBusException;
+import com.microsoft.azure.eventhubs.impl.ClientConstants;
+import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
+import com.microsoft.azure.eventhubs.impl.RetryExponential;
+import com.microsoft.azure.eventhubs.RetryPolicy;
+import com.microsoft.azure.eventhubs.EventHubException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import org.apache.samza.SamzaException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,6 +46,7 @@ public class SamzaEventHubClientManager implements EventHubClientManager {
   private static final Duration MAX_RETRY_BACKOFF = Duration.ofMillis(11000);
   private static final int MAX_RETRY_COUNT = 100;
   private static final String SAMZA_EVENTHUB_RETRY = "SAMZA_CONNECTOR_RETRY";
+  private final int numClientThreads;
 
   private EventHubClient eventHubClient;
 
@@ -51,30 +55,38 @@ public class SamzaEventHubClientManager implements EventHubClientManager {
   private final String sasKeyName;
   private final String sasKey;
   private final RetryPolicy retryPolicy;
+  private ExecutorService eventHubClientExecutor;
 
-  public SamzaEventHubClientManager(String eventHubNamespace, String entityPath, String sasKeyName, String sasKey) {
+  public SamzaEventHubClientManager(String eventHubNamespace, String entityPath, String sasKeyName, String sasKey,
+      Integer numClientThreads) {
     this(eventHubNamespace, entityPath, sasKeyName, sasKey,
-            new RetryExponential(MIN_RETRY_BACKOFF, MAX_RETRY_BACKOFF, MAX_RETRY_COUNT, SAMZA_EVENTHUB_RETRY));
+            new RetryExponential(MIN_RETRY_BACKOFF, MAX_RETRY_BACKOFF, MAX_RETRY_COUNT, SAMZA_EVENTHUB_RETRY), numClientThreads);
   }
 
   public SamzaEventHubClientManager(String eventHubNamespace, String entityPath, String sasKeyName, String sasKey,
-                                    RetryPolicy retryPolicy) {
+                                    RetryPolicy retryPolicy, int numClientThreads) {
     this.eventHubNamespace = eventHubNamespace;
     this.entityPath = entityPath;
     this.sasKeyName = sasKeyName;
     this.sasKey = sasKey;
     this.retryPolicy = retryPolicy;
+    this.numClientThreads = numClientThreads;
   }
 
   @Override
   public void init() {
     String remoteHost = String.format(EVENTHUB_REMOTE_HOST_FORMAT, eventHubNamespace);
     try {
-      ConnectionStringBuilder connectionStringBuilder =
-              new ConnectionStringBuilder(eventHubNamespace, entityPath, sasKeyName, sasKey);
+      ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder()
+          .setNamespaceName(eventHubNamespace)
+          .setEventHubName(entityPath)
+          .setSasKeyName(sasKeyName)
+          .setSasKey(sasKey);
 
-      eventHubClient = EventHubClient.createFromConnectionStringSync(connectionStringBuilder.toString(), retryPolicy);
-    } catch (IOException | ServiceBusException e) {
+      ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder().setNameFormat("Samza EventHubClient Thread-%d");
+      eventHubClientExecutor = Executors.newFixedThreadPool(numClientThreads, threadFactoryBuilder.build());
+      eventHubClient = EventHubClient.createSync(connectionStringBuilder.toString(), retryPolicy, eventHubClientExecutor);
+    } catch (IOException | EventHubException e) {
       String msg = String.format("Creation of EventHub client failed for eventHub EntityPath: %s on remote host %s:%d",
               entityPath, remoteHost, ClientConstants.AMQPS_PORT);
       LOG.error(msg, e);
@@ -92,6 +104,7 @@ public class SamzaEventHubClientManager implements EventHubClientManager {
     try {
       if (timeoutMS == EventHubClientManager.BLOCK_UNTIL_CLOSE) {
         eventHubClient.closeSync();
+        eventHubClientExecutor.shutdown();
       } else {
         CompletableFuture<Void> future = eventHubClient.close();
         future.get(timeoutMS, TimeUnit.MILLISECONDS);

http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
index 5564747..acb1775 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
@@ -19,8 +19,8 @@
 
 package org.apache.samza.system.eventhub.admin;
 
-import com.microsoft.azure.eventhubs.EventHubPartitionRuntimeInformation;
 import com.microsoft.azure.eventhubs.EventHubRuntimeInformation;
+import com.microsoft.azure.eventhubs.PartitionRuntimeInformation;
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
 import org.apache.samza.system.SystemAdmin;
@@ -144,10 +144,10 @@ public class EventHubSystemAdmin implements SystemAdmin {
   private Map<Partition, SystemStreamPartitionMetadata> getPartitionMetadata(String streamName, String[] partitionIds) {
     EventHubClientManager eventHubClientManager = getOrCreateStreamEventHubClient(streamName);
     Map<Partition, SystemStreamPartitionMetadata> sspMetadataMap = new HashMap<>();
-    Map<String, CompletableFuture<EventHubPartitionRuntimeInformation>> ehRuntimeInfos = new HashMap<>();
+    Map<String, CompletableFuture<PartitionRuntimeInformation>> ehRuntimeInfos = new HashMap<>();
 
     for (String partition : partitionIds) {
-      CompletableFuture<EventHubPartitionRuntimeInformation> partitionRuntimeInfo = eventHubClientManager
+      CompletableFuture<PartitionRuntimeInformation> partitionRuntimeInfo = eventHubClientManager
               .getEventHubClient()
               .getPartitionRuntimeInformation(partition);
 
@@ -157,7 +157,7 @@ public class EventHubSystemAdmin implements SystemAdmin {
     ehRuntimeInfos.forEach((partitionId, ehPartitionRuntimeInfo) -> {
         try {
           long timeoutMs = eventHubConfig.getRuntimeInfoWaitTimeMS(systemName);
-          EventHubPartitionRuntimeInformation ehPartitionInfo = ehPartitionRuntimeInfo.get(timeoutMs, TimeUnit.MILLISECONDS);
+          PartitionRuntimeInformation ehPartitionInfo = ehPartitionRuntimeInfo.get(timeoutMs, TimeUnit.MILLISECONDS);
 
           // Set offsets
           String startingOffset = EventHubSystemConsumer.START_OF_STREAM;

http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
index 90c73dc..f00944b 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
@@ -1,45 +1,31 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 
 package org.apache.samza.system.eventhub.consumer;
 
 import com.microsoft.azure.eventhubs.EventData;
-import com.microsoft.azure.eventhubs.EventHubPartitionRuntimeInformation;
+import com.microsoft.azure.eventhubs.EventHubException;
+import com.microsoft.azure.eventhubs.EventPosition;
 import com.microsoft.azure.eventhubs.PartitionReceiveHandler;
 import com.microsoft.azure.eventhubs.PartitionReceiver;
-import com.microsoft.azure.servicebus.ServiceBusException;
-import org.apache.samza.SamzaException;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.system.eventhub.EventHubClientManagerFactory;
-import org.apache.samza.system.eventhub.EventHubClientManager;
-import org.apache.samza.system.eventhub.EventHubConfig;
-import org.apache.samza.system.eventhub.Interceptor;
-import org.apache.samza.system.eventhub.admin.EventHubSystemAdmin;
-import org.apache.samza.system.eventhub.metrics.SamzaHistogram;
-import org.apache.samza.system.eventhub.producer.EventHubSystemProducer;
-import org.apache.samza.util.BlockingEnvelopeMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import com.microsoft.azure.eventhubs.PartitionRuntimeInformation;
+import com.microsoft.azure.eventhubs.impl.ClientConstants;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.ArrayList;
@@ -51,11 +37,26 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import org.apache.samza.SamzaException;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.eventhub.EventHubClientManager;
+import org.apache.samza.system.eventhub.EventHubClientManagerFactory;
+import org.apache.samza.system.eventhub.EventHubConfig;
+import org.apache.samza.system.eventhub.Interceptor;
+import org.apache.samza.system.eventhub.admin.EventHubSystemAdmin;
+import org.apache.samza.system.eventhub.metrics.SamzaHistogram;
+import org.apache.samza.system.eventhub.producer.EventHubSystemProducer;
+import org.apache.samza.util.BlockingEnvelopeMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -96,13 +97,12 @@ import java.util.stream.Collectors;
  */
 public class EventHubSystemConsumer extends BlockingEnvelopeMap {
   private static final Logger LOG = LoggerFactory.getLogger(EventHubSystemConsumer.class);
-  private static final int MAX_EVENT_COUNT_PER_PARTITION_POLL = 50;
 
   // Overall timeout for EventHubClient exponential backoff policy
   private static final Duration DEFAULT_EVENTHUB_RECEIVER_TIMEOUT = Duration.ofMinutes(10L);
   private static final long DEFAULT_SHUTDOWN_TIMEOUT_MILLIS = Duration.ofMinutes(1L).toMillis();
 
-  public static final String START_OF_STREAM = PartitionReceiver.START_OF_STREAM; // -1
+  public static final String START_OF_STREAM = ClientConstants.START_OF_STREAM; // -1
   public static final String END_OF_STREAM = "-2";
   public static final String EVENT_READ_RATE = "eventReadRate";
   public static final String EVENT_BYTE_READ_RATE = "eventByteReadRate";
@@ -122,11 +122,14 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
   private final Map<String, SamzaHistogram> readLatencies;
   private final Map<String, Counter> readErrors;
 
-  final ConcurrentHashMap<SystemStreamPartition, PartitionReceiveHandler> streamPartitionHandlers = new ConcurrentHashMap<>();
-  private final ConcurrentHashMap<SystemStreamPartition, PartitionReceiver> streamPartitionReceivers = new ConcurrentHashMap<>();
+  final ConcurrentHashMap<SystemStreamPartition, PartitionReceiveHandler> streamPartitionHandlers =
+      new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<SystemStreamPartition, PartitionReceiver> streamPartitionReceivers =
+      new ConcurrentHashMap<>();
   private final ConcurrentHashMap<String, EventHubClientManager> streamEventHubManagers = new ConcurrentHashMap<>();
   private final ConcurrentHashMap<SystemStreamPartition, String> streamPartitionOffsets = new ConcurrentHashMap<>();
   private final Map<String, Interceptor> interceptors;
+  private final Integer prefetchCount;
   private boolean isStarted = false;
   private final EventHubConfig config;
   private final String systemName;
@@ -135,8 +138,8 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
   private final AtomicReference<Throwable> eventHubHandlerError = new AtomicReference<>(null);
 
   public EventHubSystemConsumer(EventHubConfig config, String systemName,
-                                EventHubClientManagerFactory eventHubClientManagerFactory,
-                                Map<String, Interceptor> interceptors, MetricsRegistry registry) {
+      EventHubClientManagerFactory eventHubClientManagerFactory, Map<String, Interceptor> interceptors,
+      MetricsRegistry registry) {
     super(registry, System::currentTimeMillis);
 
     this.config = config;
@@ -145,21 +148,24 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
     List<String> streamIds = config.getStreams(systemName);
     // Create and initiate connections to Event Hubs
     for (String streamId : streamIds) {
-      EventHubClientManager eventHubClientManager = eventHubClientManagerFactory
-              .getEventHubClientManager(systemName, streamId, config);
+      EventHubClientManager eventHubClientManager =
+          eventHubClientManagerFactory.getEventHubClientManager(systemName, streamId, config);
       streamEventHubManagers.put(streamId, eventHubClientManager);
       eventHubClientManager.init();
     }
+    prefetchCount = config.getPrefetchCount(systemName);
+
+
 
     // Initiate metrics
-    eventReadRates = streamIds.stream()
-            .collect(Collectors.toMap(Function.identity(), x -> registry.newCounter(x, EVENT_READ_RATE)));
+    eventReadRates =
+        streamIds.stream().collect(Collectors.toMap(Function.identity(), x -> registry.newCounter(x, EVENT_READ_RATE)));
     eventByteReadRates = streamIds.stream()
-            .collect(Collectors.toMap(Function.identity(), x -> registry.newCounter(x, EVENT_BYTE_READ_RATE)));
+        .collect(Collectors.toMap(Function.identity(), x -> registry.newCounter(x, EVENT_BYTE_READ_RATE)));
     readLatencies = streamIds.stream()
-            .collect(Collectors.toMap(Function.identity(), x -> new SamzaHistogram(registry, x, READ_LATENCY)));
-    readErrors = streamIds.stream()
-            .collect(Collectors.toMap(Function.identity(), x -> registry.newCounter(x, READ_ERRORS)));
+        .collect(Collectors.toMap(Function.identity(), x -> new SamzaHistogram(registry, x, READ_LATENCY)));
+    readErrors =
+        streamIds.stream().collect(Collectors.toMap(Function.identity(), x -> registry.newCounter(x, READ_ERRORS)));
 
     // Locking to ensure that these aggregated metrics will be created only once across multiple system consumers.
     synchronized (AGGREGATE_METRICS_LOCK) {
@@ -183,7 +189,9 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
 
     if (streamPartitionOffsets.containsKey(systemStreamPartition)) {
       // Only update if new offset is lower than previous offset
-      if (END_OF_STREAM.equals(offset)) return;
+      if (END_OF_STREAM.equals(offset)) {
+        return;
+      }
       String prevOffset = streamPartitionOffsets.get(systemStreamPartition);
       if (!END_OF_STREAM.equals(prevOffset) && EventHubSystemAdmin.compareOffsets(offset, prevOffset) > -1) {
         return;
@@ -192,20 +200,20 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
     streamPartitionOffsets.put(systemStreamPartition, offset);
   }
 
-  private String getNewestEventHubOffset(EventHubClientManager eventHubClientManager, String streamName, Integer partitionId) {
-    CompletableFuture<EventHubPartitionRuntimeInformation> partitionRuntimeInfoFuture = eventHubClientManager
-            .getEventHubClient()
-            .getPartitionRuntimeInformation(partitionId.toString());
+  private String getNewestEventHubOffset(EventHubClientManager eventHubClientManager, String streamName,
+      Integer partitionId) {
+    CompletableFuture<PartitionRuntimeInformation> partitionRuntimeInfoFuture =
+        eventHubClientManager.getEventHubClient().getPartitionRuntimeInformation(partitionId.toString());
     try {
       long timeoutMs = config.getRuntimeInfoWaitTimeMS(systemName);
 
-      EventHubPartitionRuntimeInformation partitionRuntimeInformation = partitionRuntimeInfoFuture
-              .get(timeoutMs, TimeUnit.MILLISECONDS);
+      PartitionRuntimeInformation partitionRuntimeInformation =
+          partitionRuntimeInfoFuture.get(timeoutMs, TimeUnit.MILLISECONDS);
 
       return partitionRuntimeInformation.getLastEnqueuedOffset();
     } catch (InterruptedException | ExecutionException | TimeoutException e) {
-      String msg = String.format(
-              "Error while fetching EventHubPartitionRuntimeInfo for System:%s, Stream:%s, Partition:%s",
+      String msg =
+          String.format("Error while fetching EventHubPartitionRuntimeInfo for System:%s, Stream:%s, Partition:%s",
               systemName, streamName, partitionId);
       throw new SamzaException(msg);
     }
@@ -235,21 +243,23 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
           // If the offset is greater than the newest offset, use the use current Instant as
           // offset to fetch in Eventhub.
           receiver = eventHubClientManager.getEventHubClient()
-                  .createReceiverSync(consumerGroup, partitionId.toString(), Instant.now());
+              .createReceiverSync(consumerGroup, partitionId.toString(), EventPosition.fromEnqueuedTime(Instant.now()));
         } else {
           // If the offset is less or equal to the newest offset in the system, it can be
           // used as the starting offset to receive from. EventHub will return the first
           // message AFTER the offset that was specified in the fetch request.
           // If no such offset exists Eventhub will return an error.
           receiver = eventHubClientManager.getEventHubClient()
-                  .createReceiverSync(consumerGroup, partitionId.toString(), offset,
-                          !offset.equals(EventHubSystemConsumer.START_OF_STREAM));
+              .createReceiverSync(consumerGroup, partitionId.toString(),
+                  EventPosition.fromOffset(offset, !offset.equals(EventHubSystemConsumer.START_OF_STREAM)));
         }
 
-        PartitionReceiveHandler handler = new PartitionReceiverHandlerImpl(ssp, eventReadRates.get(streamId),
-                eventByteReadRates.get(streamId), readLatencies.get(streamId), readErrors.get(streamId),
-                interceptors.getOrDefault(streamId, null));
+        receiver.setPrefetchCount(prefetchCount);
 
+        PartitionReceiveHandler handler =
+            new PartitionReceiverHandlerImpl(ssp, eventReadRates.get(streamId), eventByteReadRates.get(streamId),
+                readLatencies.get(streamId), readErrors.get(streamId), interceptors.getOrDefault(streamId, null),
+                config.getMaxEventCountPerPoll(systemName));
 
         // Timeout for EventHubClient receive
         receiver.setReceiveTimeout(DEFAULT_EVENTHUB_RECEIVER_TIMEOUT);
@@ -261,16 +271,16 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
         streamPartitionReceivers.put(ssp, receiver);
       } catch (Exception e) {
         throw new SamzaException(
-                String.format("Failed to create receiver for EventHubs: namespace=%s, entity=%s, partitionId=%d",
-                        namespace, entityPath, partitionId), e);
+            String.format("Failed to create receiver for EventHubs: namespace=%s, entity=%s, partitionId=%d", namespace,
+                entityPath, partitionId), e);
       }
       LOG.debug(String.format("Connection successfully started for namespace=%s, entity=%s ", namespace, entityPath));
-
     }
   }
 
   @Override
-  public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
+  public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(
+      Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
     Throwable handlerError = eventHubHandlerError.get();
 
     if (handlerError != null) {
@@ -305,8 +315,10 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
 
       // Recreate receiver
       PartitionReceiver receiver = eventHubClientManager.getEventHubClient()
-              .createReceiverSync(consumerGroup, partitionId.toString(), offset,
-                      !offset.equals(EventHubSystemConsumer.START_OF_STREAM));
+          .createReceiverSync(consumerGroup, partitionId.toString(),
+              EventPosition.fromOffset(offset, !offset.equals(EventHubSystemConsumer.START_OF_STREAM)));
+
+      receiver.setPrefetchCount(prefetchCount);
 
       // Timeout for EventHubClient receive
       receiver.setReceiveTimeout(DEFAULT_EVENTHUB_RECEIVER_TIMEOUT);
@@ -314,10 +326,9 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
       // Create and start receiver thread with handler
       receiver.setReceiveHandler(streamPartitionHandlers.get(ssp));
       streamPartitionReceivers.put(ssp, receiver);
-
     } catch (Exception e) {
       eventHubHandlerError.set(new SamzaException(
-              String.format("Failed to recreate receiver for EventHubs after ReceiverHandlerError (ssp=%s)", ssp), e));
+          String.format("Failed to recreate receiver for EventHubs after ReceiverHandlerError (ssp=%s)", ssp), e));
     }
   }
 
@@ -336,9 +347,9 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
   }
 
   private boolean isErrorTransient(Throwable throwable) {
-    if (throwable instanceof ServiceBusException) {
-      ServiceBusException serviceBusException = (ServiceBusException) throwable;
-      return serviceBusException.getIsTransient();
+    if (throwable instanceof EventHubException) {
+      EventHubException eventHubException = (EventHubException) throwable;
+      return eventHubException.getIsTransient();
     }
     return false;
   }
@@ -348,24 +359,30 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
     return new LinkedBlockingQueue<>(config.getConsumerBufferCapacity(systemName));
   }
 
-  protected class PartitionReceiverHandlerImpl extends PartitionReceiveHandler {
+  protected class PartitionReceiverHandlerImpl implements PartitionReceiveHandler {
 
     private final Counter eventReadRate;
     private final Counter eventByteReadRate;
     private final SamzaHistogram readLatency;
     private final Counter errorRate;
     private final Interceptor interceptor;
+    private final Integer maxEventCount;
     SystemStreamPartition ssp;
 
     PartitionReceiverHandlerImpl(SystemStreamPartition ssp, Counter eventReadRate, Counter eventByteReadRate,
-                                 SamzaHistogram readLatency, Counter readErrors, Interceptor interceptor) {
-      super(MAX_EVENT_COUNT_PER_PARTITION_POLL);
+        SamzaHistogram readLatency, Counter readErrors, Interceptor interceptor, int maxEventCount) {
       this.ssp = ssp;
       this.eventReadRate = eventReadRate;
       this.eventByteReadRate = eventByteReadRate;
       this.readLatency = readLatency;
       this.errorRate = readErrors;
       this.interceptor = interceptor;
+      this.maxEventCount = maxEventCount;
+    }
+
+    @Override
+    public int getMaxEventCount() {
+      return this.maxEventCount;
     }
 
     @Override
@@ -415,8 +432,8 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
       errorRate.inc();
       aggReadErrors.inc();
 
-      if (throwable instanceof ServiceBusException) {
-        ServiceBusException busException = (ServiceBusException) throwable;
+      if (throwable instanceof EventHubException) {
+        EventHubException busException = (EventHubException) throwable;
 
         if (busException.getIsTransient()) {
 

http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-azure/src/main/java/org/apache/samza/system/eventhub/metrics/SamzaHistogram.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/metrics/SamzaHistogram.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/metrics/SamzaHistogram.java
index 7d6d408..03fc114 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/metrics/SamzaHistogram.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/metrics/SamzaHistogram.java
@@ -1,35 +1,36 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 
 package org.apache.samza.system.eventhub.metrics;
 
 import com.codahale.metrics.ExponentiallyDecayingReservoir;
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Snapshot;
-import org.apache.samza.metrics.Gauge;
-import org.apache.samza.metrics.MetricsRegistry;
-
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.MetricsVisitor;
+
 
 /**
  * Creates a {@link Histogram} metric using {@link ExponentiallyDecayingReservoir}
@@ -49,14 +50,34 @@ public class SamzaHistogram {
     this.histogram = new Histogram(new ExponentiallyDecayingReservoir());
     this.percentiles = percentiles;
     this.gauges = this.percentiles.stream()
-            .filter(x -> x > 0 && x <= 100)
-            .collect(
-                    Collectors.toMap(Function.identity(), x -> registry.newGauge(group, name + "_" + String.valueOf(0), 0D)));
+        .filter(x -> x > 0 && x <= 100)
+        .collect(Collectors.toMap(Function.identity(),
+            x -> registry.newGauge(group, new HistogramGauge(x, name + "_" + String.valueOf(x), 0D))));
   }
 
   public void update(long value) {
     histogram.update(value);
+  }
+
+  public void updateGaugeValues(double percentile) {
     Snapshot values = histogram.getSnapshot();
-    percentiles.forEach(x -> gauges.get(x).set(values.getValue(x / 100)));
+    gauges.get(percentile).set(values.getValue(percentile / 100));
+  }
+
+  /**
+   * Custom gauge whose value is set based on the underlying Histogram
+   */
+  private class HistogramGauge extends Gauge<Double> {
+    private final Double percentile;
+
+    public HistogramGauge(Double percentile, String name, double value) {
+      super(name, value);
+      this.percentile = percentile;
+    }
+
+    public void visit(MetricsVisitor visitor) {
+      updateGaugeValues(percentile);
+      visitor.gauge(this);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/AsyncSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/AsyncSystemProducer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/AsyncSystemProducer.java
index 901dd6a..307b8f6 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/AsyncSystemProducer.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/AsyncSystemProducer.java
@@ -139,7 +139,6 @@ public abstract class AsyncSystemProducer implements SystemProducer {
 
     // Auto update the metrics and possible throwable when futures are complete.
     sendResult.handle((aVoid, throwable) -> {
-        pendingFutures.remove(sendResult);
         long callbackLatencyMs = System.currentTimeMillis() - afterSendTimeMs;
         sendCallbackLatency.get(streamId).update(callbackLatencyMs);
         aggSendCallbackLatency.update(callbackLatencyMs);

http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
index 5139dc6..3639bbc 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
@@ -21,8 +21,9 @@ package org.apache.samza.system.eventhub.producer;
 
 import com.microsoft.azure.eventhubs.EventData;
 import com.microsoft.azure.eventhubs.EventHubClient;
+import com.microsoft.azure.eventhubs.EventHubException;
 import com.microsoft.azure.eventhubs.PartitionSender;
-import com.microsoft.azure.servicebus.ServiceBusException;
+import com.microsoft.azure.eventhubs.impl.EventDataImpl;
 import java.nio.charset.Charset;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -164,7 +165,7 @@ public class EventHubSystemProducer extends AsyncSystemProducer {
           } catch (InterruptedException | ExecutionException | TimeoutException e) {
             String msg = "Failed to fetch number of Event Hub partitions for partition sender creation";
             throw new SamzaException(msg, e);
-          } catch (ServiceBusException | IllegalArgumentException e) {
+          } catch (EventHubException | IllegalArgumentException e) {
             String msg = "Creation of partition sender failed with exception";
             throw new SamzaException(msg, e);
           }
@@ -282,7 +283,7 @@ public class EventHubSystemProducer extends AsyncSystemProducer {
       eventValue = interceptor.get().intercept(eventValue);
     }
 
-    EventData eventData = new EventData(eventValue);
+    EventData eventData = new EventDataImpl(eventValue);
 
     eventData.getProperties().put(PRODUCE_TIMESTAMP, Long.toString(System.currentTimeMillis()));
 

http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventData.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventData.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventData.java
index b5b55dc..1e3d4f5 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventData.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventData.java
@@ -21,15 +21,17 @@ package org.apache.samza.system.eventhub;
 
 import com.microsoft.azure.eventhubs.EventData;
 
+import com.microsoft.azure.eventhubs.impl.EventDataImpl;
 import java.nio.charset.Charset;
 import java.util.*;
 
-public class MockEventData extends EventData {
+public class MockEventData implements EventData {
+  EventData eventData;
 
   private EventData.SystemProperties overridedSystemProperties;
 
   private MockEventData(byte[] data, String partitionKey, String offset) {
-    super(data);
+    eventData = new EventDataImpl(data);
     HashMap<String, Object> properties = new HashMap<>();
     properties.put("x-opt-offset", offset);
     properties.put("x-opt-partition-key", partitionKey);
@@ -51,6 +53,21 @@ public class MockEventData extends EventData {
   }
 
   @Override
+  public Object getObject() {
+    return eventData.getObject();
+  }
+
+  @Override
+  public byte[] getBytes() {
+    return eventData.getBytes();
+  }
+
+  @Override
+  public Map<String, Object> getProperties() {
+    return eventData.getProperties();
+  }
+
+  @Override
   public EventData.SystemProperties getSystemProperties() {
     return overridedSystemProperties;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java
index 368087a..6ee9bcf 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java
@@ -26,7 +26,6 @@ import org.junit.Assert;
 import org.mockito.stubbing.Answer;
 import org.powermock.api.mockito.PowerMockito;
 
-import java.time.Instant;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -39,7 +38,7 @@ import static org.mockito.Matchers.*;
 public class MockEventHubClientManagerFactory extends EventHubClientManagerFactory {
   private Map<SystemStreamPartition, List<EventData>> eventData;
   private Map<String, Map<String, Map<Integer, List<EventData>>>> receivedData;
-  private Map<String, String> startingOffsets = new HashMap<>();
+  private Map<String, EventPosition> startingOffsets = new HashMap<>();
 
   public MockEventHubClientManagerFactory() {
     this.receivedData = new HashMap<>();
@@ -71,7 +70,7 @@ public class MockEventHubClientManagerFactory extends EventHubClientManagerFacto
     handlers.forEach((ssp, value) -> value.onReceive(eventData.get(ssp)));
   }
 
-  public String getPartitionOffset(String partitionId) {
+  public EventPosition getPartitionOffset(String partitionId) {
     return startingOffsets.getOrDefault(partitionId, null);
   }
 
@@ -101,10 +100,10 @@ public class MockEventHubClientManagerFactory extends EventHubClientManagerFacto
           }
           return null;
         });
-      EventHubPartitionRuntimeInformation mockPartitionRuntimeInfo = PowerMockito.mock(EventHubPartitionRuntimeInformation.class);
+      PartitionRuntimeInformation mockPartitionRuntimeInfo = PowerMockito.mock(PartitionRuntimeInformation.class);
       PowerMockito.when(mockPartitionRuntimeInfo.getLastEnqueuedOffset())
               .thenReturn(EventHubSystemConsumer.START_OF_STREAM);
-      CompletableFuture<EventHubPartitionRuntimeInformation> partitionFuture =  new MockPartitionFuture(mockPartitionRuntimeInfo);
+      CompletableFuture<PartitionRuntimeInformation> partitionFuture =  new MockPartitionFuture(mockPartitionRuntimeInfo);
 
       // Producer mocks
       PartitionSender mockPartitionSender0 = PowerMockito.mock(PartitionSender.class);
@@ -128,16 +127,16 @@ public class MockEventHubClientManagerFactory extends EventHubClientManagerFacto
 
       try {
         // Consumer calls
-        PowerMockito.when(mockEventHubClient.createReceiverSync(anyString(), anyString(), any(Instant.class)))
+        PowerMockito.when(mockEventHubClient.createReceiverSync(anyString(), anyString(), anyObject()))
                 .then((Answer<PartitionReceiver>) invocationOnMock -> {
                     String partitionId = invocationOnMock.getArgumentAt(1, String.class);
-                    startingOffsets.put(partitionId, EventHubSystemConsumer.END_OF_STREAM);
+                    startingOffsets.put(partitionId, EventPosition.fromEndOfStream());
                     return mockPartitionReceiver;
                   });
-        PowerMockito.when(mockEventHubClient.createReceiverSync(anyString(), anyString(), anyString(), anyBoolean()))
+        PowerMockito.when(mockEventHubClient.createReceiverSync(anyString(), anyString(), anyObject()))
                 .then((Answer<PartitionReceiver>) invocationOnMock -> {
                     String partitionId = invocationOnMock.getArgumentAt(1, String.class);
-                    String offset = invocationOnMock.getArgumentAt(2, String.class);
+                    EventPosition offset = invocationOnMock.getArgumentAt(2, EventPosition.class);
                     startingOffsets.put(partitionId, offset);
                     return mockPartitionReceiver;
                   });
@@ -196,15 +195,15 @@ public class MockEventHubClientManagerFactory extends EventHubClientManagerFacto
       }
     }
 
-    private class MockPartitionFuture extends CompletableFuture<EventHubPartitionRuntimeInformation> {
-      EventHubPartitionRuntimeInformation runtimeInformation;
+    private class MockPartitionFuture extends CompletableFuture<PartitionRuntimeInformation> {
+      PartitionRuntimeInformation runtimeInformation;
 
-      MockPartitionFuture(EventHubPartitionRuntimeInformation runtimeInformation) {
+      MockPartitionFuture(PartitionRuntimeInformation runtimeInformation) {
         this.runtimeInformation = runtimeInformation;
       }
 
       @Override
-      public EventHubPartitionRuntimeInformation get(long timeout, TimeUnit unit) {
+      public PartitionRuntimeInformation get(long timeout, TimeUnit unit) {
         return runtimeInformation;
       }
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestMetricsRegistry.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestMetricsRegistry.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestMetricsRegistry.java
index a421cbd..d29b975 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestMetricsRegistry.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestMetricsRegistry.java
@@ -70,7 +70,12 @@ public class TestMetricsRegistry implements MetricsRegistry {
 
   @Override
   public <T> Gauge<T> newGauge(String group, Gauge<T> value) {
-    return null;
+    if (!gauges.containsKey(group)) {
+      gauges.put(group, new ArrayList<>());
+    }
+
+    gauges.get(group).add(value);
+    return value;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java
index 4fced77..b40d86d 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java
@@ -42,7 +42,7 @@ import java.util.stream.Collectors;
 import static org.apache.samza.system.eventhub.MockEventHubConfigFactory.*;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({EventHubRuntimeInformation.class, EventHubPartitionRuntimeInformation.class,
+@PrepareForTest({EventHubRuntimeInformation.class, PartitionRuntimeInformation.class,
         EventHubClient.class, PartitionReceiver.class, PartitionSender.class})
 public class TestEventHubSystemConsumer {
   private static final String MOCK_ENTITY_1 = "mocktopic1";
@@ -99,8 +99,8 @@ public class TestEventHubSystemConsumer {
     consumer.register(ssp, EventHubSystemConsumer.START_OF_STREAM);
     consumer.start();
 
-    Assert.assertEquals(EventHubSystemConsumer.START_OF_STREAM,
-            eventHubClientWrapperFactory.getPartitionOffset(String.valueOf(partitionId)));
+    Assert.assertEquals(EventPosition.fromOffset(EventHubSystemConsumer.START_OF_STREAM, false).toString(),
+            eventHubClientWrapperFactory.getPartitionOffset(String.valueOf(partitionId)).toString());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/ITestEventHubSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/ITestEventHubSystemProducer.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/ITestEventHubSystemProducer.java
index 32b1604..63f6daa 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/ITestEventHubSystemProducer.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/ITestEventHubSystemProducer.java
@@ -21,13 +21,13 @@ package org.apache.samza.system.eventhub.producer;
 
 import com.microsoft.azure.eventhubs.EventData;
 import com.microsoft.azure.eventhubs.EventHubClient;
+import com.microsoft.azure.eventhubs.EventHubException;
+import com.microsoft.azure.eventhubs.EventPosition;
 import com.microsoft.azure.eventhubs.PartitionReceiver;
-import com.microsoft.azure.servicebus.ServiceBusException;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.system.*;
 import org.apache.samza.system.eventhub.*;
-import org.apache.samza.system.eventhub.consumer.EventHubSystemConsumer;
 import org.apache.samza.util.NoOpMetricsRegistry;
 import org.junit.Assert;
 import org.junit.Ignore;
@@ -92,7 +92,7 @@ public class ITestEventHubSystemProducer {
   }
 
   @Test
-  public void testReceive() throws ServiceBusException {
+  public void testReceive() throws EventHubException {
     EventHubClientManagerFactory clientFactory = new EventHubClientManagerFactory();
     EventHubClientManager wrapper = clientFactory
             .getEventHubClientManager(SYSTEM_NAME, STREAM_NAME1, new EventHubConfig(createEventHubConfig()));
@@ -100,11 +100,11 @@ public class ITestEventHubSystemProducer {
     EventHubClient client = wrapper.getEventHubClient();
     PartitionReceiver receiver =
             client.createReceiverSync(EventHubClient.DEFAULT_CONSUMER_GROUP_NAME, "0",
-                    EventHubSystemConsumer.START_OF_STREAM, true);
+                    EventPosition.fromStartOfStream());
     receiveMessages(receiver, 300);
   }
 
-  private void receiveMessages(PartitionReceiver receiver, int numMessages) throws ServiceBusException {
+  private void receiveMessages(PartitionReceiver receiver, int numMessages) throws EventHubException {
     int count = 0;
     while (count < numMessages) {
 

http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java
index b5206bb..9a3bf7d 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java
@@ -20,9 +20,9 @@
 package org.apache.samza.system.eventhub.producer;
 
 import com.microsoft.azure.eventhubs.EventHubClient;
-import com.microsoft.azure.eventhubs.EventHubPartitionRuntimeInformation;
 import com.microsoft.azure.eventhubs.EventHubRuntimeInformation;
 import com.microsoft.azure.eventhubs.PartitionReceiver;
+import com.microsoft.azure.eventhubs.PartitionRuntimeInformation;
 import com.microsoft.azure.eventhubs.PartitionSender;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -49,7 +49,7 @@ import static org.apache.samza.system.eventhub.MockEventHubConfigFactory.*;
 
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({EventHubRuntimeInformation.class, EventHubPartitionRuntimeInformation.class, EventHubClient.class, PartitionReceiver.class, PartitionSender.class})
+@PrepareForTest({EventHubRuntimeInformation.class, PartitionRuntimeInformation.class, EventHubClient.class, PartitionReceiver.class, PartitionSender.class})
 public class TestEventHubSystemProducer {
 
   private static final String SOURCE = "TestEventHubSystemProducer";

http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-tools/src/main/java/org/apache/samza/tools/EventHubConsoleConsumer.java
----------------------------------------------------------------------
diff --git a/samza-tools/src/main/java/org/apache/samza/tools/EventHubConsoleConsumer.java b/samza-tools/src/main/java/org/apache/samza/tools/EventHubConsoleConsumer.java
index 096e12d..a72c8b7 100644
--- a/samza-tools/src/main/java/org/apache/samza/tools/EventHubConsoleConsumer.java
+++ b/samza-tools/src/main/java/org/apache/samza/tools/EventHubConsoleConsumer.java
@@ -1,38 +1,41 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 
 package org.apache.samza.tools;
 
+import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
 import com.microsoft.azure.eventhubs.EventData;
 import com.microsoft.azure.eventhubs.EventHubClient;
+import com.microsoft.azure.eventhubs.EventHubException;
 import com.microsoft.azure.eventhubs.EventHubRuntimeInformation;
+import com.microsoft.azure.eventhubs.EventPosition;
 import com.microsoft.azure.eventhubs.PartitionReceiver;
-import com.microsoft.azure.servicebus.ConnectionStringBuilder;
-import com.microsoft.azure.servicebus.ServiceBusException;
 import java.io.IOException;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
 import org.apache.commons.cli.BasicParser;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 
+
 /**
  * Tool to read events from Microsoft Azure event hubs.
  */
@@ -59,17 +62,17 @@ public class EventHubConsoleConsumer {
   private static final String OPT_DESC_TOKEN = "Token corresponding to the key.";
 
   public static void main(String[] args)
-      throws ServiceBusException, IOException, ExecutionException, InterruptedException {
+      throws EventHubException, IOException, ExecutionException, InterruptedException {
     Options options = new Options();
     options.addOption(
         CommandLineHelper.createOption(OPT_SHORT_EVENTHUB_NAME, OPT_LONG_EVENTHUB_NAME, OPT_ARG_EVENTHUB_NAME, true,
-        OPT_DESC_EVENTHUB_NAME));
+            OPT_DESC_EVENTHUB_NAME));
 
-    options.addOption(
-        CommandLineHelper.createOption(OPT_SHORT_NAMESPACE, OPT_LONG_NAMESPACE, OPT_ARG_NAMESPACE, true, OPT_DESC_NAMESPACE));
+    options.addOption(CommandLineHelper.createOption(OPT_SHORT_NAMESPACE, OPT_LONG_NAMESPACE, OPT_ARG_NAMESPACE, true,
+        OPT_DESC_NAMESPACE));
 
-    options.addOption(
-        CommandLineHelper.createOption(OPT_SHORT_KEY_NAME, OPT_LONG_KEY_NAME, OPT_ARG_KEY_NAME, true, OPT_DESC_KEY_NAME));
+    options.addOption(CommandLineHelper.createOption(OPT_SHORT_KEY_NAME, OPT_LONG_KEY_NAME, OPT_ARG_KEY_NAME, true,
+        OPT_DESC_KEY_NAME));
 
     options.addOption(
         CommandLineHelper.createOption(OPT_SHORT_TOKEN, OPT_LONG_TOKEN, OPT_ARG_TOKEN, true, OPT_DESC_TOKEN));
@@ -93,17 +96,20 @@ public class EventHubConsoleConsumer {
   }
 
   private static void consumeEvents(String ehName, String namespace, String keyName, String token)
-      throws ServiceBusException, IOException, ExecutionException, InterruptedException {
-    ConnectionStringBuilder connStr = new ConnectionStringBuilder(namespace, ehName, keyName, token);
+      throws EventHubException, IOException, ExecutionException, InterruptedException {
+    ConnectionStringBuilder connStr = new ConnectionStringBuilder().setNamespaceName(namespace)
+        .setEventHubName(ehName)
+        .setSasKeyName(keyName)
+        .setSasKey(token);
 
-    EventHubClient client = EventHubClient.createFromConnectionStringSync(connStr.toString());
+    EventHubClient client = EventHubClient.createSync(connStr.toString(), Executors.newFixedThreadPool(10));
 
     EventHubRuntimeInformation runTimeInfo = client.getRuntimeInformation().get();
     int numPartitions = runTimeInfo.getPartitionCount();
     for (int partition = 0; partition < numPartitions; partition++) {
       PartitionReceiver receiver =
           client.createReceiverSync(EventHubClient.DEFAULT_CONSUMER_GROUP_NAME, String.valueOf(partition),
-              PartitionReceiver.START_OF_STREAM);
+              EventPosition.fromStartOfStream());
       receiver.receive(10).handle((records, throwable) -> handleComplete(receiver, records, throwable));
     }
   }