You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/05/09 22:32:48 UTC

samza git commit: SAMZA-1706: lazy initialization for eventhub system producer

Repository: samza
Updated Branches:
  refs/heads/master 89beb1fcc -> 9d404bc51


SAMZA-1706: lazy initialization for eventhub system producer

We are seeing slow shutdown issue for eventhub system producers for users who only use eventhub consumer (but then Samza system creates both consumer and producer together no matter what). As a workaround, add lazy initialization to the producer to avoid the slow shutdown

Author: Hai Lu <ha...@linkedin.com>

Reviewers: Jagadish <ja...@apache.org>

Closes #511 from lhaiesp/master


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9d404bc5
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9d404bc5
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9d404bc5

Branch: refs/heads/master
Commit: 9d404bc515a4cbc81bb9922d8d44b02ef1e84fcf
Parents: 89beb1f
Author: Hai Lu <ha...@linkedin.com>
Authored: Wed May 9 15:32:44 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Wed May 9 15:32:44 2018 -0700

----------------------------------------------------------------------
 .../producer/EventHubSystemProducer.java        | 82 ++++++++++++--------
 1 file changed, 49 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/9d404bc5/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
index 690e59e..b9afea7 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
@@ -95,6 +95,10 @@ public class EventHubSystemProducer extends AsyncSystemProducer {
 
   private volatile boolean isStarted = false;
 
+  // We implement lazy initialization for producer as a workaround for
+  // slow shutdown issue.
+  private boolean isInitialized = false;
+
   /**
    * Per partition event hub client. Partitions from the same stream may share the same client,
    * depends on config PerPartitionConnection. See {@link EventHubConfig}
@@ -127,9 +131,12 @@ public class EventHubSystemProducer extends AsyncSystemProducer {
     this.interceptors = interceptors;
     this.maxMessageSize = config.getSkipMessagesLargerThan(systemName);
     this.eventHubClientManagerFactory = eventHubClientManagerFactory;
+  }
+
+  private void init() {
+    LOG.info("Initializing EventHubSystemProducer");
     // Fetches the stream ids
     List<String> streamIds = config.getStreams(systemName);
-
     // Create and initiate connections to Event Hubs
     // even if PerPartitionConnection == true, we still need a stream level event hub for initial metadata (fetching
     // partition count)
@@ -139,6 +146,40 @@ public class EventHubSystemProducer extends AsyncSystemProducer {
       perStreamEventHubClientManagers.put(streamId, ehClient);
       ehClient.init();
     }
+
+    // Create partition senders if required
+    if (PartitioningMethod.PARTITION_KEY_AS_PARTITION.equals(partitioningMethod)) {
+      // Create all partition senders
+      perStreamEventHubClientManagers.forEach((streamId, samzaEventHubClient) -> {
+          EventHubClient ehClient = samzaEventHubClient.getEventHubClient();
+
+          try {
+            Map<Integer, PartitionSender> partitionSenders = new HashMap<>();
+            long timeoutMs = config.getRuntimeInfoWaitTimeMS(systemName);
+            Integer numPartitions =
+                ehClient.getRuntimeInformation().get(timeoutMs, TimeUnit.MILLISECONDS).getPartitionCount();
+
+            for (int i = 0; i < numPartitions; i++) {
+              String partitionId = String.valueOf(i);
+              EventHubClientManager perPartitionClientManager =
+                  createOrGetEventHubClientManagerForPartition(streamId, i);
+              PartitionSender partitionSender =
+                  perPartitionClientManager.getEventHubClient().createPartitionSenderSync(partitionId);
+              partitionSenders.put(i, partitionSender);
+            }
+
+            streamPartitionSenders.put(streamId, partitionSenders);
+          } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            String msg = "Failed to fetch number of Event Hub partitions for partition sender creation";
+            throw new SamzaException(msg, e);
+          } catch (EventHubException | IllegalArgumentException e) {
+            String msg = "Creation of partition sender failed with exception";
+            throw new SamzaException(msg, e);
+          }
+        });
+    }
+    isInitialized = true;
+    LOG.info("EventHubSystemProducer initialized.");
   }
 
   @Override
@@ -183,38 +224,6 @@ public class EventHubSystemProducer extends AsyncSystemProducer {
     super.start();
     LOG.info("Starting system producer.");
 
-    // Create partition senders if required
-    if (PartitioningMethod.PARTITION_KEY_AS_PARTITION.equals(partitioningMethod)) {
-      // Create all partition senders
-      perStreamEventHubClientManagers.forEach((streamId, samzaEventHubClient) -> {
-          EventHubClient ehClient = samzaEventHubClient.getEventHubClient();
-
-          try {
-            Map<Integer, PartitionSender> partitionSenders = new HashMap<>();
-            long timeoutMs = config.getRuntimeInfoWaitTimeMS(systemName);
-            Integer numPartitions =
-                ehClient.getRuntimeInformation().get(timeoutMs, TimeUnit.MILLISECONDS).getPartitionCount();
-
-            for (int i = 0; i < numPartitions; i++) {
-              String partitionId = String.valueOf(i);
-              EventHubClientManager perPartitionClientManager =
-                  createOrGetEventHubClientManagerForPartition(streamId, i);
-              PartitionSender partitionSender =
-                  perPartitionClientManager.getEventHubClient().createPartitionSenderSync(partitionId);
-              partitionSenders.put(i, partitionSender);
-            }
-
-            streamPartitionSenders.put(streamId, partitionSenders);
-          } catch (InterruptedException | ExecutionException | TimeoutException e) {
-            String msg = "Failed to fetch number of Event Hub partitions for partition sender creation";
-            throw new SamzaException(msg, e);
-          } catch (EventHubException | IllegalArgumentException e) {
-            String msg = "Creation of partition sender failed with exception";
-            throw new SamzaException(msg, e);
-          }
-        });
-    }
-
     // Initiate metrics
     streamIds.forEach((streamId) -> {
         eventSkipRate.put(streamId, metricsRegistry.newCounter(streamId, EVENT_SKIP_RATE));
@@ -245,6 +254,10 @@ public class EventHubSystemProducer extends AsyncSystemProducer {
     if (!isStarted) {
       throw new SamzaException("Trying to call send before the producer is started.");
     }
+    if (!isInitialized) {
+      // lazy initialization on the first send
+      init();
+    }
 
     String streamId = config.getStreamId(envelope.getSystemStream().getStream());
 
@@ -371,6 +384,9 @@ public class EventHubSystemProducer extends AsyncSystemProducer {
           .forEach(ehClient -> ehClient.close(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS));
       perPartitionEventHubClients.clear();
     }
+    isStarted = false;
+    isInitialized = false;
+    LOG.info("EventHubSystemProducer stopped.");
   }
 
   /**