You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2019/02/16 02:49:15 UTC

[samza] branch master updated: Adding a bound on EventHubSystemConsumer's initalizeEventHubsManagers

This is an automated email from the ASF dual-hosted git repository.

jagadish pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 298c07c  Adding a bound on EventHubSystemConsumer's initalizeEventHubsManagers
298c07c is described below

commit 298c07ceaa8af94cdea3eb87fcfbb823c0897220
Author: Ray Matharu <rm...@linkedin.com>
AuthorDate: Fri Feb 15 18:49:07 2019 -0800

    Adding a bound on EventHubSystemConsumer's initalizeEventHubsManagers
    
    Being a sync call, eventHubManagers can cause the container-start or the runLoop to block.
    Like so,
    "Samza StreamProcessor Container Thread-0" # 340 daemon prio=5 os_prio=0 tid=0x00007f5e38427800 nid=0x579e waiting on condition [0x00007f5e888cc000]
       java.lang.Thread.State: WAITING (parking)
    	at sun.misc.Unsafe.park(Native Method)
    	- parking to wait for  <0x00000005c1e04bf0> (a java.util.concurrent.CompletableFuture$Signaller)
    	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    	at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
    	at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
    	at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
    	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
    	at com.microsoft.azure.eventhubs.EventHubClient.lambda$createReceiverSync$7(EventHubClient.java:336)
    	at com.microsoft.azure.eventhubs.EventHubClient$$Lambda$371/65833266.execute(Unknown Source)
    	at com.microsoft.azure.eventhubs.impl.ExceptionUtil.sync(ExceptionUtil.java:191)
    	at com.microsoft.azure.eventhubs.EventHubClient.createReceiverSync(EventHubClient.java:336)
    	at org.apache.samza.system.eventhub.consumer.EventHubSystemConsumer.initializeEventHubsManagers(EventHubSystemConsumer.java:320)
    	- locked <0x00000005c1e04cd0> (a com.linkedin.samza.eventhub.consumer.LiEventHubSystemConsumer)
    	at org.apache.samza.system.eventhub.consumer.EventHubSystemConsumer.start(EventHubSystemConsumer.java:355)
    	at org.apache.samza.system.SystemConsumers$$anonfun$start$5.apply(SystemConsumers.scala:168)
    	at org.apache.samza.system.SystemConsumers$$anonfun$start$5.apply(SystemConsumers.scala:168)
    	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    	at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
    	at org.apache.samza.system.SystemConsumers.start(SystemConsumers.scala:168)
    	at org.apache.samza.container.SamzaContainer.startConsumers(SamzaContainer.scala:999)
    	at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:779)
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    	at java.lang.Thread.run(Thread.java:745)
    
    This PR introduces a bound on it.
    
    Author: Ray Matharu <rm...@linkedin.com>
    
    Reviewers: Jagadish <ja...@apache.org>
    
    Closes #923 from rmatharu/test-bounded-event-hub-init
---
 .../eventhub/consumer/EventHubSystemConsumer.java  |  8 ++++--
 .../eventhub/producer/EventHubSystemProducer.java  |  3 +-
 .../eventhub/MockEventHubClientManagerFactory.java | 32 ++++++++++++----------
 3 files changed, 24 insertions(+), 19 deletions(-)

diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
index 887b3fe..2f2873e 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
@@ -105,6 +105,8 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
 
   // Overall timeout for EventHubClient exponential backoff policy
   private static final Duration DEFAULT_EVENTHUB_RECEIVER_TIMEOUT = Duration.ofMinutes(10);
+  private static final Duration DEFAULT_EVENTHUB_CREATE_RECEIVER_TIMEOUT = Duration.ofMinutes(1);
+
   private static final long DEFAULT_SHUTDOWN_TIMEOUT_MILLIS = Duration.ofSeconds(15).toMillis();
 
   public static final String START_OF_STREAM = ClientConstants.START_OF_STREAM; // -1
@@ -274,13 +276,13 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
           // If the offset is greater than the newest offset, use the use current Instant as
           // offset to fetch in Eventhub.
           receiver = eventHubClientManager.getEventHubClient()
-              .createReceiverSync(consumerGroup, partitionId.toString(), EventPosition.fromEnqueuedTime(Instant.now()));
+              .createReceiver(consumerGroup, partitionId.toString(), EventPosition.fromEnqueuedTime(Instant.now())).get(DEFAULT_EVENTHUB_CREATE_RECEIVER_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
         } else {
           // EventHub will return the first message AFTER the offset that was specified in the fetch request.
           // If no such offset exists Eventhub will return an error.
           receiver = eventHubClientManager.getEventHubClient()
-              .createReceiverSync(consumerGroup, partitionId.toString(),
-                  EventPosition.fromOffset(offset, /* inclusiveFlag */false));
+              .createReceiver(consumerGroup, partitionId.toString(),
+                  EventPosition.fromOffset(offset, /* inclusiveFlag */false)).get(DEFAULT_EVENTHUB_CREATE_RECEIVER_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
         }
 
         receiver.setPrefetchCount(prefetchCount);
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
index b9afea7..f021f36 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
@@ -59,6 +59,7 @@ import com.microsoft.azure.eventhubs.impl.EventDataImpl;
 public class EventHubSystemProducer extends AsyncSystemProducer {
   private static final Logger LOG = LoggerFactory.getLogger(EventHubSystemProducer.class.getName());
   private static final long DEFAULT_SHUTDOWN_TIMEOUT_MILLIS = Duration.ofMinutes(1L).toMillis();
+  private static final long DEFAULT_CREATE_PARTITION_SENDER_TIMEOUT_MILLIS = Duration.ofMinutes(1L).toMillis();
 
   public static final String PRODUCE_TIMESTAMP = "produce-timestamp";
   public static final String KEY = "key";
@@ -164,7 +165,7 @@ public class EventHubSystemProducer extends AsyncSystemProducer {
               EventHubClientManager perPartitionClientManager =
                   createOrGetEventHubClientManagerForPartition(streamId, i);
               PartitionSender partitionSender =
-                  perPartitionClientManager.getEventHubClient().createPartitionSenderSync(partitionId);
+                  perPartitionClientManager.getEventHubClient().createPartitionSender(partitionId).get(DEFAULT_CREATE_PARTITION_SENDER_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
               partitionSenders.put(i, partitionSender);
             }
 
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java
index 3b4f1ec..00bffc3 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java
@@ -131,24 +131,26 @@ public class MockEventHubClientManagerFactory extends EventHubClientManagerFacto
 
       try {
         // Consumer calls
-        PowerMockito.when(mockEventHubClient.createReceiverSync(anyString(), anyString(), anyObject()))
-                .then((Answer<PartitionReceiver>) invocationOnMock -> {
-                    String partitionId = invocationOnMock.getArgumentAt(1, String.class);
-                    startingOffsets.put(partitionId, EventPosition.fromEndOfStream());
-                    return mockPartitionReceiver;
-                  });
-        PowerMockito.when(mockEventHubClient.createReceiverSync(anyString(), anyString(), anyObject()))
-                .then((Answer<PartitionReceiver>) invocationOnMock -> {
-                    String partitionId = invocationOnMock.getArgumentAt(1, String.class);
-                    EventPosition offset = invocationOnMock.getArgumentAt(2, EventPosition.class);
-                    startingOffsets.put(partitionId, offset);
-                    return mockPartitionReceiver;
-                  });
+        PowerMockito.when(mockEventHubClient.createReceiver(anyString(), anyString(), anyObject()))
+              .then((Answer<CompletableFuture<PartitionReceiver>>) invocationOnMock -> {
+                  String partitionId = invocationOnMock.getArgumentAt(1, String.class);
+                  startingOffsets.put(partitionId, EventPosition.fromEndOfStream());
+                  return CompletableFuture.completedFuture(mockPartitionReceiver);
+                });
+
+        PowerMockito.when(mockEventHubClient.createReceiver(anyString(), anyString(), anyObject()))
+              .then((Answer<CompletableFuture<PartitionReceiver>>) invocationOnMock -> {
+                  String partitionId = invocationOnMock.getArgumentAt(1, String.class);
+                  EventPosition offset = invocationOnMock.getArgumentAt(2, EventPosition.class);
+                  startingOffsets.put(partitionId, offset);
+                  return CompletableFuture.completedFuture(mockPartitionReceiver);
+                });
+
         PowerMockito.when(mockEventHubClient.getPartitionRuntimeInformation(anyString())).thenReturn(partitionFuture);
 
         // Producer calls
-        PowerMockito.when(mockEventHubClient.createPartitionSenderSync("0")).thenReturn(mockPartitionSender0);
-        PowerMockito.when(mockEventHubClient.createPartitionSenderSync("1")).thenReturn(mockPartitionSender1);
+        PowerMockito.when(mockEventHubClient.createPartitionSender("0")).thenReturn(CompletableFuture.completedFuture(mockPartitionSender0));
+        PowerMockito.when(mockEventHubClient.createPartitionSender("1")).thenReturn(CompletableFuture.completedFuture(mockPartitionSender1));
 
         PowerMockito.when(mockEventHubClient.getRuntimeInformation()).thenReturn(future);