You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2018/04/27 17:23:04 UTC

samza git commit: SAMZA-1688: use per partition eventhubs client

Repository: samza
Updated Branches:
  refs/heads/master af06c8d0c -> db57304ff


SAMZA-1688: use per partition eventhubs client

Use per partition eventhubs client to improve throughput. As each eventhub client maintains only one single TCP connection.
Also reduce the time spent on getPartitionRuntimeInfo on starting up, by making the calls run in parallels in system admin and also completely removing it from system consumer.

Benchmark:
See 8x improvement on consumption throughput from ~3Mb/s (or 1.5k QPS) to ~21Mb/s (or 10.5k QPS). Memory footprint doesn't seem to get affected. CPU usage also increases by 8x. Essentially, with per partition client, we are able to saturate the CPU usages. The benchmark I did was on a machine with 8 cores. If the machine has say 16 CPU cores, I expect the the throughput to improve by around 16x.

Author: Hai Lu <ha...@linkedin.com>

Reviewers: Xinyu Liu <xi...@gmail.com>

Closes #489 from lhaiesp/master


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/db57304f
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/db57304f
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/db57304f

Branch: refs/heads/master
Commit: db57304ffa29d84912ae4ac69d5a9d1637019405
Parents: af06c8d
Author: Hai Lu <ha...@linkedin.com>
Authored: Fri Apr 27 10:22:57 2018 -0700
Committer: xiliu <xi...@linkedin.com>
Committed: Fri Apr 27 10:22:57 2018 -0700

----------------------------------------------------------------------
 .../samza/system/eventhub/EventHubConfig.java   | 18 ++++
 .../eventhub/admin/EventHubSystemAdmin.java     | 58 +++++-------
 .../consumer/EventHubSystemConsumer.java        | 99 ++++++++++++--------
 .../producer/EventHubSystemProducer.java        | 92 +++++++++++++-----
 .../consumer/TestEventHubSystemConsumer.java    | 21 ++++-
 .../producer/TestEventHubSystemProducer.java    | 23 ++++-
 6 files changed, 214 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/db57304f/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
index e40b3c2..e26d47c 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
@@ -77,6 +77,12 @@ public class EventHubConfig extends MapConfig {
   private static final int MESSAGE_HEADER_OVERHEAD = 24 * 1024;
   private static final int DEFAULT_MAX_MESSAGE_SIZE = 1024 * 1024 - MESSAGE_HEADER_OVERHEAD;
 
+  // Each EventHub client maintains single TCP connection. To improve throughput, we will instantiate one
+  // client for each partition. Allow the option to disable the feature in case too many EventHub clients
+  // end up causing unpredictable issues when number of partitions is really high.
+  public static final String CONFIG_PER_PARTITION_CONNECTION = "systems.%s.eventhubs.perPartition.connection";
+  public static final Boolean DEFAULT_CONFIG_PER_PARTITION_CONNECTION = true;
+
   private final Map<String, String> physcialToId = new HashMap<>();
 
   public EventHubConfig(Config config) {
@@ -280,4 +286,16 @@ public class EventHubConfig extends MapConfig {
     return Integer.parseInt(bufferCapacity);
   }
 
+  /**
+   * Returns whether to create one EventHub client per partition. Each EventHub client maintains
+   * single TCP connection. More EventHub clients will improve throughput in general.
+   * For producer this config is only relevant when partition method is PARTITION_KEY_AS_PARTITION
+   */
+  public Boolean getPerPartitionConnection(String systemName) {
+    String isPerPartitionConnection = get(String.format(CONFIG_PER_PARTITION_CONNECTION, systemName));
+    if (isPerPartitionConnection == null) {
+      return DEFAULT_CONFIG_PER_PARTITION_CONNECTION;
+    }
+    return Boolean.valueOf(isPerPartitionConnection);
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/db57304f/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
index c86e31a..2141ebd 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
@@ -22,6 +22,8 @@ package org.apache.samza.system.eventhub.admin;
 import com.microsoft.azure.eventhubs.EventHubClient;
 import com.microsoft.azure.eventhubs.EventHubRuntimeInformation;
 import com.microsoft.azure.eventhubs.PartitionRuntimeInformation;
+
+import java.util.ArrayList;
 import java.util.Arrays;
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
@@ -38,6 +40,7 @@ import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -121,17 +124,6 @@ public class EventHubSystemAdmin implements SystemAdmin {
           EventHubRuntimeInformation ehInfo = runtimeInfo.get(timeoutMs, TimeUnit.MILLISECONDS);
           LOG.info(String.format("Adding partition ids=%s for stream=%s. EHRuntimetInfo=%s",
               Arrays.toString(ehInfo.getPartitionIds()), streamName, printEventHubRuntimeInfo(ehInfo)));
-
-          try {
-            for (String partitionId : ehInfo.getPartitionIds()) {
-              LOG.info(printPartitionRuntimeInfo(
-                  ehClient.getPartitionRuntimeInformation(partitionId).get(timeoutMs, TimeUnit.MILLISECONDS)));
-            }
-          } catch (Exception e) {
-            // ignore failures as this is just for information logging
-            LOG.warn("Failed to fetch and print partition runtime info from EventHubs.", e);
-          }
-
           streamPartitions.put(streamName, ehInfo.getPartitionIds());
         }
         String[] partitionIds = streamPartitions.get(streamName);
@@ -168,39 +160,37 @@ public class EventHubSystemAdmin implements SystemAdmin {
   private Map<Partition, SystemStreamPartitionMetadata> getPartitionMetadata(String streamName, String[] partitionIds) {
     EventHubClientManager eventHubClientManager = getOrCreateStreamEventHubClient(streamName);
     Map<Partition, SystemStreamPartitionMetadata> sspMetadataMap = new HashMap<>();
-    Map<String, CompletableFuture<PartitionRuntimeInformation>> ehRuntimeInfos = new HashMap<>();
+    List<CompletableFuture<PartitionRuntimeInformation>> futureList = new ArrayList<>();
 
     for (String partition : partitionIds) {
       CompletableFuture<PartitionRuntimeInformation> partitionRuntimeInfo = eventHubClientManager
               .getEventHubClient()
               .getPartitionRuntimeInformation(partition);
-
-      ehRuntimeInfos.put(partition, partitionRuntimeInfo);
-    }
-
-    ehRuntimeInfos.forEach((partitionId, ehPartitionRuntimeInfo) -> {
-        try {
-          long timeoutMs = eventHubConfig.getRuntimeInfoWaitTimeMS(systemName);
-          PartitionRuntimeInformation ehPartitionInfo = ehPartitionRuntimeInfo.get(timeoutMs, TimeUnit.MILLISECONDS);
-
+      futureList.add(partitionRuntimeInfo);
+      partitionRuntimeInfo.thenAccept(ehPartitionInfo -> {
+          LOG.info(printPartitionRuntimeInfo(ehPartitionInfo));
           // Set offsets
           String startingOffset = EventHubSystemConsumer.START_OF_STREAM;
           String newestOffset = ehPartitionInfo.getLastEnqueuedOffset();
-          String upcomingOffset = getNextOffset(newestOffset);
+          String upcomingOffset = EventHubSystemConsumer.END_OF_STREAM;
           SystemStreamPartitionMetadata sspMetadata = new SystemStreamPartitionMetadata(startingOffset, newestOffset,
-                  upcomingOffset);
-
-          Partition partition = new Partition(Integer.parseInt(partitionId));
+            upcomingOffset);
+          sspMetadataMap.put(new Partition(Integer.parseInt(partition)), sspMetadata);
+        });
+    }
 
-          sspMetadataMap.put(partition, sspMetadata);
-        } catch (InterruptedException | ExecutionException | TimeoutException e) {
-          String msg = String.format(
-                  "Error while fetching EventHubPartitionRuntimeInfo for System:%s, Stream:%s, Partition:%s",
-                  systemName, streamName, partitionId);
-          LOG.error(msg, e);
-          throw new SamzaException(msg, e);
-        }
-      });
+    CompletableFuture<Void> futureGroup =
+        CompletableFuture.allOf(futureList.toArray(new CompletableFuture[futureList.size()]));
+    long timeoutMs = eventHubConfig.getRuntimeInfoWaitTimeMS(systemName);
+    try {
+      futureGroup.get(timeoutMs, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException | ExecutionException | TimeoutException e) {
+      String msg = String.format(
+          "Error while fetching EventHubPartitionRuntimeInfo for System:%s, Stream:%s",
+          systemName, streamName);
+      LOG.error(msg, e);
+      throw new SamzaException(msg, e);
+    }
     return sspMetadataMap;
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/db57304f/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
index 0fd0f2c..04e361f 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
@@ -19,12 +19,12 @@
 
 package org.apache.samza.system.eventhub.consumer;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.microsoft.azure.eventhubs.EventData;
 import com.microsoft.azure.eventhubs.EventHubException;
 import com.microsoft.azure.eventhubs.EventPosition;
 import com.microsoft.azure.eventhubs.PartitionReceiveHandler;
 import com.microsoft.azure.eventhubs.PartitionReceiver;
-import com.microsoft.azure.eventhubs.PartitionRuntimeInformation;
 import com.microsoft.azure.eventhubs.impl.ClientConstants;
 import java.time.Duration;
 import java.time.Instant;
@@ -42,6 +42,8 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.Validate;
 import org.apache.samza.SamzaException;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.MetricsRegistry;
@@ -122,17 +124,21 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
   private final Map<String, SamzaHistogram> consumptionLagMs;
   private final Map<String, Counter> readErrors;
 
-  final ConcurrentHashMap<SystemStreamPartition, PartitionReceiveHandler> streamPartitionHandlers =
-      new ConcurrentHashMap<>();
-  private final ConcurrentHashMap<SystemStreamPartition, PartitionReceiver> streamPartitionReceivers =
-      new ConcurrentHashMap<>();
-  private final ConcurrentHashMap<String, EventHubClientManager> streamEventHubManagers = new ConcurrentHashMap<>();
-  private final ConcurrentHashMap<SystemStreamPartition, String> streamPartitionOffsets = new ConcurrentHashMap<>();
+  @VisibleForTesting
+  final Map<SystemStreamPartition, PartitionReceiveHandler> streamPartitionHandlers = new ConcurrentHashMap<>();
+  @VisibleForTesting
+  final Map<SystemStreamPartition, EventHubClientManager> perPartitionEventHubManagers = new ConcurrentHashMap<>();
+
+  private final Map<SystemStreamPartition, PartitionReceiver> streamPartitionReceivers = new ConcurrentHashMap<>();
+  // should remain empty if PerPartitionConnection is true
+  private final Map<String, EventHubClientManager> perStreamEventHubManagers = new ConcurrentHashMap<>();
+  private final Map<SystemStreamPartition, String> streamPartitionOffsets = new ConcurrentHashMap<>();
   private final Map<String, Interceptor> interceptors;
   private final Integer prefetchCount;
-  private boolean isStarted = false;
+  private volatile boolean isStarted = false;
   private final EventHubConfig config;
   private final String systemName;
+  private final EventHubClientManagerFactory eventHubClientManagerFactory;
 
   // Partition receiver error propagation
   private final AtomicReference<Throwable> eventHubHandlerError = new AtomicReference<>(null);
@@ -145,14 +151,8 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
     this.config = config;
     this.systemName = systemName;
     this.interceptors = interceptors;
+    this.eventHubClientManagerFactory = eventHubClientManagerFactory;
     List<String> streamIds = config.getStreams(systemName);
-    // Create and initiate connections to Event Hubs
-    for (String streamId : streamIds) {
-      EventHubClientManager eventHubClientManager =
-          eventHubClientManagerFactory.getEventHubClientManager(systemName, streamId, config);
-      streamEventHubManagers.put(streamId, eventHubClientManager);
-      eventHubClientManager.init();
-    }
     prefetchCount = config.getPrefetchCount(systemName);
 
 
@@ -200,47 +200,62 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
     streamPartitionOffsets.put(systemStreamPartition, offset);
   }
 
-  private String getNewestEventHubOffset(EventHubClientManager eventHubClientManager, String streamName,
-      Integer partitionId) {
-    CompletableFuture<PartitionRuntimeInformation> partitionRuntimeInfoFuture =
-        eventHubClientManager.getEventHubClient().getPartitionRuntimeInformation(partitionId.toString());
-    try {
-      long timeoutMs = config.getRuntimeInfoWaitTimeMS(systemName);
-
-      PartitionRuntimeInformation partitionRuntimeInformation =
-          partitionRuntimeInfoFuture.get(timeoutMs, TimeUnit.MILLISECONDS);
-
-      return partitionRuntimeInformation.getLastEnqueuedOffset();
-    } catch (InterruptedException | ExecutionException | TimeoutException e) {
-      String msg =
-          String.format("Error while fetching EventHubPartitionRuntimeInfo for System:%s, Stream:%s, Partition:%s",
-              systemName, streamName, partitionId);
-      throw new SamzaException(msg);
+  // Based on the config PerPartitionConnection, create or get EventHubClientManager for the SSP
+  // Note: this should be used only when starting up. After initialization, directly use perPartitionEventHubManagers
+  // to obtain the corresponding EventHubClientManager
+  private EventHubClientManager createOrGetEventHubClientManagerForSSP(String streamId, SystemStreamPartition ssp) {
+    EventHubClientManager eventHubClientManager;
+    if (config.getPerPartitionConnection(systemName)) {
+      // will create one EventHub client per partition
+      if (perPartitionEventHubManagers.containsKey(ssp)) {
+        LOG.warn(String.format("Trying to create new EventHubClientManager for ssp=%s. But one already exists", ssp));
+        eventHubClientManager = perPartitionEventHubManagers.get(ssp);
+      } else {
+        LOG.info("Creating EventHub client manager for SSP: " + ssp);
+        eventHubClientManager = eventHubClientManagerFactory.getEventHubClientManager(systemName, streamId, config);
+        eventHubClientManager.init();
+        perPartitionEventHubManagers.put(ssp, eventHubClientManager);
+      }
+    } else {
+      // will share one EventHub client per stream
+      if (!perStreamEventHubManagers.containsKey(streamId)) {
+        LOG.info("Creating EventHub client manager for stream: " + streamId);
+        EventHubClientManager perStreamEventHubClientManager =
+            eventHubClientManagerFactory.getEventHubClientManager(systemName, streamId, config);
+        perStreamEventHubClientManager.init();
+        perStreamEventHubManagers.put(streamId, perStreamEventHubClientManager);
+      }
+      eventHubClientManager = perStreamEventHubManagers.get(streamId);
+      perPartitionEventHubManagers.put(ssp, eventHubClientManager);
     }
+    LOG.info("EventHub client created for ssp: " + ssp);
+    Validate.notNull(eventHubClientManager,
+        String.format("Fail to create or get EventHubClientManager for ssp=%s", ssp));
+    return eventHubClientManager;
   }
 
   @Override
   public void start() {
+    if (isStarted) {
+      LOG.warn("Trying to start EventHubSystemConsumer while it's already started. Ignore the request.");
+      return;
+    }
     isStarted = true;
     LOG.info("Starting EventHubSystemConsumer. Count of SSPs registered: " + streamPartitionOffsets.entrySet().size());
     // Create receivers for Event Hubs
     for (Map.Entry<SystemStreamPartition, String> entry : streamPartitionOffsets.entrySet()) {
-
       SystemStreamPartition ssp = entry.getKey();
-      String streamName = ssp.getStream();
       String streamId = config.getStreamId(ssp.getStream());
       Integer partitionId = ssp.getPartition().getPartitionId();
       String offset = entry.getValue();
       String consumerGroup = config.getStreamConsumerGroup(systemName, streamId);
       String namespace = config.getStreamNamespace(systemName, streamId);
       String entityPath = config.getStreamEntityPath(systemName, streamId);
-      EventHubClientManager eventHubClientManager = streamEventHubManagers.get(streamId);
+      EventHubClientManager eventHubClientManager = createOrGetEventHubClientManagerForSSP(streamId, ssp);
 
       try {
-        // Fetch the newest offset
-        String newestEventHubOffset = getNewestEventHubOffset(eventHubClientManager, streamName, partitionId);
         PartitionReceiver receiver;
-        if (END_OF_STREAM.equals(offset) || EventHubSystemAdmin.compareOffsets(newestEventHubOffset, offset) == -1) {
+        if (END_OF_STREAM.equals(offset)) {
           // If the offset is greater than the newest offset, use the use current Instant as
           // offset to fetch in Eventhub.
           receiver = eventHubClientManager.getEventHubClient()
@@ -306,7 +321,7 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
 
   private void renewPartitionReceiver(SystemStreamPartition ssp) {
     String streamId = config.getStreamId(ssp.getStream());
-    EventHubClientManager eventHubClientManager = streamEventHubManagers.get(streamId);
+    EventHubClientManager eventHubClientManager = perPartitionEventHubManagers.get(ssp);
     String offset = streamPartitionOffsets.get(ssp);
     Integer partitionId = ssp.getPartition().getPartitionId();
     String consumerGroup = config.getStreamConsumerGroup(ssp.getSystem(), streamId);
@@ -345,7 +360,13 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
     } catch (ExecutionException | InterruptedException | TimeoutException e) {
       LOG.warn("Failed to close receivers", e);
     }
-    streamEventHubManagers.values().forEach(ehClientManager -> ehClientManager.close(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS));
+    perPartitionEventHubManagers.values()
+        .parallelStream()
+        .forEach(ehClientManager -> ehClientManager.close(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS));
+    perPartitionEventHubManagers.clear();
+    perStreamEventHubManagers.clear();
+    isStarted = false;
+    LOG.info("Event hub system consumer stopped.");
   }
 
   private boolean isErrorTransient(Throwable throwable) {

http://git-wip-us.apache.org/repos/asf/samza/blob/db57304f/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
index 55a2ae0..690e59e 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
@@ -19,12 +19,6 @@
 
 package org.apache.samza.system.eventhub.producer;
 
-import com.microsoft.azure.eventhubs.EventData;
-import com.microsoft.azure.eventhubs.EventHubClient;
-import com.microsoft.azure.eventhubs.EventHubException;
-import com.microsoft.azure.eventhubs.PartitionSender;
-import com.microsoft.azure.eventhubs.impl.ClientConstants;
-import com.microsoft.azure.eventhubs.impl.EventDataImpl;
 import java.nio.charset.Charset;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -37,6 +31,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.lang3.Validate;
 import org.apache.samza.SamzaException;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.MetricsRegistry;
@@ -48,6 +44,14 @@ import org.apache.samza.system.eventhub.Interceptor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.microsoft.azure.eventhubs.EventData;
+import com.microsoft.azure.eventhubs.EventHubClient;
+import com.microsoft.azure.eventhubs.EventHubException;
+import com.microsoft.azure.eventhubs.PartitionSender;
+import com.microsoft.azure.eventhubs.impl.ClientConstants;
+import com.microsoft.azure.eventhubs.impl.EventDataImpl;
+
 
 /**
  * EventHub system producer that can be used in Samza jobs to send events to Azure EventHubs
@@ -92,10 +96,13 @@ public class EventHubSystemProducer extends AsyncSystemProducer {
   private volatile boolean isStarted = false;
 
   /**
-   * Per stream event hub client
+   * Per partition event hub client. Partitions from the same stream may share the same client,
+   * depends on config PerPartitionConnection. See {@link EventHubConfig}
    */
-  // Map of the system name to the event hub client.
-  private final Map<String, EventHubClientManager> eventHubClients = new HashMap<>();
+  @VisibleForTesting
+  final Map<String, Map<Integer, EventHubClientManager>> perPartitionEventHubClients = new HashMap<>();
+
+  private final Map<String, EventHubClientManager> perStreamEventHubClientManagers = new HashMap<>();
 
   /**
    * PartitionSender for each partition in the stream.
@@ -107,6 +114,8 @@ public class EventHubSystemProducer extends AsyncSystemProducer {
    */
   private final Map<String, Interceptor> interceptors;
 
+  private final EventHubClientManagerFactory eventHubClientManagerFactory;
+
   public EventHubSystemProducer(EventHubConfig config, String systemName,
       EventHubClientManagerFactory eventHubClientManagerFactory, Map<String, Interceptor> interceptors,
       MetricsRegistry registry) {
@@ -117,15 +126,17 @@ public class EventHubSystemProducer extends AsyncSystemProducer {
     this.partitioningMethod = config.getPartitioningMethod(systemName);
     this.interceptors = interceptors;
     this.maxMessageSize = config.getSkipMessagesLargerThan(systemName);
-
+    this.eventHubClientManagerFactory = eventHubClientManagerFactory;
     // Fetches the stream ids
     List<String> streamIds = config.getStreams(systemName);
 
     // Create and initiate connections to Event Hubs
+    // even if PerPartitionConnection == true, we still need a stream level event hub for initial metadata (fetching
+    // partition count)
     for (String streamId : streamIds) {
       EventHubClientManager ehClient =
           eventHubClientManagerFactory.getEventHubClientManager(systemName, streamId, config);
-      eventHubClients.put(streamId, ehClient);
+      perStreamEventHubClientManagers.put(streamId, ehClient);
       ehClient.init();
     }
   }
@@ -139,6 +150,34 @@ public class EventHubSystemProducer extends AsyncSystemProducer {
     }
   }
 
+  private EventHubClientManager createOrGetEventHubClientManagerForPartition(String streamId, int partitionId) {
+    Map<Integer, EventHubClientManager> perStreamMap =
+        perPartitionEventHubClients.computeIfAbsent(streamId, key -> new HashMap<>());
+    EventHubClientManager eventHubClientManager;
+    if (config.getPerPartitionConnection(systemName)) {
+      // will create one EventHub client per partition
+      if (perStreamMap.containsKey(partitionId)) {
+        LOG.warn(String.format("Trying to create new EventHubClientManager for partition=%d. But one already exists",
+            partitionId));
+        eventHubClientManager = perStreamMap.get(partitionId);
+      } else {
+        LOG.info(
+            String.format("Creating EventHub client manager for streamId=%s, partitionId=%d: ", streamId, partitionId));
+        eventHubClientManager = eventHubClientManagerFactory.getEventHubClientManager(systemName, streamId, config);
+        eventHubClientManager.init();
+        perStreamMap.put(partitionId, eventHubClientManager);
+      }
+    } else {
+      // will share one EventHub client per stream
+      eventHubClientManager = perStreamEventHubClientManagers.get(streamId);
+      perStreamMap.put(partitionId, eventHubClientManager);
+    }
+    Validate.notNull(eventHubClientManager,
+        String.format("Fail to create or get EventHubClientManager for streamId=%s, partitionId=%d", streamId,
+            partitionId));
+    return eventHubClientManager;
+  }
+
   @Override
   public synchronized void start() {
     super.start();
@@ -147,7 +186,7 @@ public class EventHubSystemProducer extends AsyncSystemProducer {
     // Create partition senders if required
     if (PartitioningMethod.PARTITION_KEY_AS_PARTITION.equals(partitioningMethod)) {
       // Create all partition senders
-      eventHubClients.forEach((streamId, samzaEventHubClient) -> {
+      perStreamEventHubClientManagers.forEach((streamId, samzaEventHubClient) -> {
           EventHubClient ehClient = samzaEventHubClient.getEventHubClient();
 
           try {
@@ -156,9 +195,12 @@ public class EventHubSystemProducer extends AsyncSystemProducer {
             Integer numPartitions =
                 ehClient.getRuntimeInformation().get(timeoutMs, TimeUnit.MILLISECONDS).getPartitionCount();
 
-            for (int i = 0; i < numPartitions; i++) { // 32 partitions max
+            for (int i = 0; i < numPartitions; i++) {
               String partitionId = String.valueOf(i);
-              PartitionSender partitionSender = ehClient.createPartitionSenderSync(partitionId);
+              EventHubClientManager perPartitionClientManager =
+                  createOrGetEventHubClientManagerForPartition(streamId, i);
+              PartitionSender partitionSender =
+                  perPartitionClientManager.getEventHubClient().createPartitionSenderSync(partitionId);
               partitionSenders.put(i, partitionSender);
             }
 
@@ -206,7 +248,7 @@ public class EventHubSystemProducer extends AsyncSystemProducer {
 
     String streamId = config.getStreamId(envelope.getSystemStream().getStream());
 
-    if (!eventHubClients.containsKey(streamId)) {
+    if (!perStreamEventHubClientManagers.containsKey(streamId)) {
       String msg = String.format("Trying to send event to a destination {%s} that is not registered.", streamId);
       throw new SamzaException(msg);
     }
@@ -229,13 +271,10 @@ public class EventHubSystemProducer extends AsyncSystemProducer {
     aggEventWriteRate.inc();
     eventByteWriteRate.get(streamId).inc(eventDataLength);
     aggEventByteWriteRate.inc(eventDataLength);
-    EventHubClientManager ehClient = eventHubClients.get(streamId);
+    EventHubClientManager ehClient = perStreamEventHubClientManagers.get(streamId);
 
     // Async send call
-    CompletableFuture<Void> sendResult =
-        sendToEventHub(streamId, eventData, getEnvelopePartitionId(envelope), ehClient.getEventHubClient());
-
-    return sendResult;
+    return sendToEventHub(streamId, eventData, getEnvelopePartitionId(envelope), ehClient.getEventHubClient());
   }
 
   private CompletableFuture<Void> sendToEventHub(String streamId, EventData eventData, Object partitionKey,
@@ -321,8 +360,17 @@ public class EventHubSystemProducer extends AsyncSystemProducer {
           LOG.error("Closing the partition sender failed ", e);
         }
       });
-    eventHubClients.values().forEach(ehClient -> ehClient.close(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS));
-    eventHubClients.clear();
+    perStreamEventHubClientManagers.values()
+        .parallelStream()
+        .forEach(ehClient -> ehClient.close(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS));
+    perStreamEventHubClientManagers.clear();
+    if (config.getPerPartitionConnection(systemName)) {
+      perPartitionEventHubClients.values()
+          .stream()
+          .flatMap(map -> map.values().stream())
+          .forEach(ehClient -> ehClient.close(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS));
+      perPartitionEventHubClients.clear();
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/db57304f/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java
index b40d86d..6e055c6 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java
@@ -203,7 +203,16 @@ public class TestEventHubSystemConsumer {
   }
 
   @Test
-  public void testMultiPartitionConsumptionHappyPath() throws Exception {
+  public void testMultiPartitionConsumptionPerPartitionConnection() throws Exception {
+    testMultiPartitionConsumptionHappyPath(true);
+  }
+
+  @Test
+  public void testMultiPartitionConsumptionShareConnection() throws Exception {
+    testMultiPartitionConsumptionHappyPath(false);
+  }
+
+  private void testMultiPartitionConsumptionHappyPath(boolean perPartitionConnection) throws Exception {
     String systemName = "eventhubs";
     String streamName = "testStream";
     int numEvents = 10; // needs to be less than BLOCKING_QUEUE_SIZE
@@ -229,6 +238,8 @@ public class TestEventHubSystemConsumer {
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamName), EVENTHUB_NAMESPACE);
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamName), EVENTHUB_KEY_NAME);
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamName), EVENTHUB_KEY);
+    configMap.put(String.format(EventHubConfig.CONFIG_PER_PARTITION_CONNECTION, systemName),
+        String.valueOf(perPartitionConnection));
     MapConfig config = new MapConfig(configMap);
 
     MockEventHubClientManagerFactory eventHubClientWrapperFactory = new MockEventHubClientManagerFactory(eventData);
@@ -257,6 +268,14 @@ public class TestEventHubSystemConsumer {
 
     Assert.assertEquals(counters.get(EventHubSystemConsumer.EVENT_READ_RATE).getCount(), numEvents * 2);
     Assert.assertEquals(counters.get(EventHubSystemConsumer.READ_ERRORS).getCount(), 0);
+    if (perPartitionConnection) {
+      Assert.assertNotEquals("perPartitionConnection=true; SSPs should not share the same client",
+          consumer.perPartitionEventHubManagers.get(ssp1), consumer.perPartitionEventHubManagers.get(ssp2));
+    } else {
+
+      Assert.assertEquals("perPartitionConnection=false; SSPs should share the same client",
+          consumer.perPartitionEventHubManagers.get(ssp1), consumer.perPartitionEventHubManagers.get(ssp2));
+    }
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/samza/blob/db57304f/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java
index 9a3bf7d..7436468 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java
@@ -64,7 +64,16 @@ public class TestEventHubSystemProducer {
   }
 
   @Test
-  public void testSendingToSpecificPartitions() throws Exception {
+  public void testSendingToSpecificPartitionsPerPartitionConnection() throws Exception {
+    testSendingToSpecificPartitions(true);
+  }
+
+  @Test
+  public void testSendingToSpecificPartitionsShareConnection() throws Exception {
+    testSendingToSpecificPartitions(false);
+  }
+
+  private void testSendingToSpecificPartitions(boolean perPartitionConnection) throws Exception {
     String systemName = "eventhubs";
     String streamName = "testStream";
     int numEvents = 10;
@@ -87,6 +96,8 @@ public class TestEventHubSystemProducer {
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamName), EVENTHUB_ENTITY1);
     configMap.put(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, systemName),
         PartitioningMethod.PARTITION_KEY_AS_PARTITION.toString());
+    configMap.put(String.format(EventHubConfig.CONFIG_PER_PARTITION_CONNECTION, systemName),
+        String.valueOf(perPartitionConnection));
     MapConfig config = new MapConfig(configMap);
 
     MockEventHubClientManagerFactory factory = new MockEventHubClientManagerFactory();
@@ -115,6 +126,16 @@ public class TestEventHubSystemProducer {
 
     Assert.assertTrue(outgoingMessagesP0.equals(receivedData0));
     Assert.assertTrue(outgoingMessagesP1.equals(receivedData1));
+    if (perPartitionConnection) {
+      Assert.assertNotEquals("perPartitionConnection=true; partitions should not share the same client",
+          producer.perPartitionEventHubClients.get(streamName).get(0),
+          producer.perPartitionEventHubClients.get(streamName).get(1));
+    } else {
+
+      Assert.assertEquals("perPartitionConnection=false; partitions should share the same client",
+          producer.perPartitionEventHubClients.get(streamName).get(0),
+          producer.perPartitionEventHubClients.get(streamName).get(1));
+    }
   }
 
   @Test