You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2020/11/10 22:15:28 UTC

[pulsar] branch master updated: Optimize batch source discovery and task ack (#8498)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c756ee  Optimize batch source discovery and task ack (#8498)
5c756ee is described below

commit 5c756ee1b8c7a4360520bde2c3f5596826d70fc6
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Tue Nov 10 14:14:54 2020 -0800

    Optimize batch source discovery and task ack (#8498)
    
    Co-authored-by: Jerry Peng <je...@splunk.com>
---
 .../functions/instance/JavaInstanceRunnable.java   |   6 +-
 .../source/batch/BatchSourceExecutor.java          |  99 +++++++++++++----
 .../source/batch/BatchSourceExecutorTest.java      | 120 +++++++++++++--------
 3 files changed, 154 insertions(+), 71 deletions(-)

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 2b2f360..6b27868 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -460,7 +460,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
         }
     }
 
-    private Record readInput() {
+    private Record readInput() throws Exception {
         Record record;
         if (!(this.source instanceof PulsarSource)) {
             Thread.currentThread().setContextClassLoader(functionClassLoader);
@@ -469,8 +469,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
             record = this.source.read();
         } catch (Exception e) {
             stats.incrSourceExceptions(e);
-            log.info("Encountered exception in source read: ", e);
-            throw new RuntimeException(e);
+            log.error("Encountered exception in source read", e);
+            throw e;
         } finally {
             Thread.currentThread().setContextClassLoader(instanceClassLoader);
         }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java
index 4474b9b..5937616 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.functions.source.batch;
 
 import com.google.gson.Gson;
+import io.netty.util.concurrent.DefaultThreadFactory;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -41,6 +42,9 @@ import org.apache.pulsar.io.core.SourceContext;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 /**
  * BatchSourceExecutor wraps BatchSource as Source. Thus from Pulsar IO perspective, it is running a regular
@@ -63,13 +67,16 @@ public class BatchSourceExecutor<T> implements Source<T> {
   private String batchSourceClassName;
   private BatchSource<T> batchSource;
   private String intermediateTopicName;
+  private volatile Exception currentError = null;
+  private volatile boolean isRunning = false;
+  private ExecutorService discoveryThread = Executors.newSingleThreadExecutor(new DefaultThreadFactory("batch-source-discovery"));
 
   @Override
   public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
     this.config = config;
     this.sourceContext = sourceContext;
     this.intermediateTopicName = SourceConfigUtils.computeBatchSourceIntermediateTopicName(sourceContext.getTenant(),
-            sourceContext.getNamespace(), sourceContext.getSourceName()).toString();
+      sourceContext.getNamespace(), sourceContext.getSourceName()).toString();
     this.getBatchSourceConfigs(config);
     this.initializeBatchSource();
     this.start();
@@ -78,14 +85,21 @@ public class BatchSourceExecutor<T> implements Source<T> {
   @Override
   public Record<T> read() throws Exception {
     while (true) {
+      if (currentError != null) {
+        throw currentError;
+      }
       if (currentTask == null) {
-        retrieveNextTask();
-        prepareInternal();
+        currentTask = retrieveNextTask();
+        prepareInternal(currentTask);
       }
       Record<T> retval = batchSource.readNext();
       if (retval == null) {
         // signals end if this batch
-        intermediateTopicConsumer.acknowledge(currentTask.getMessageId());
+        intermediateTopicConsumer.acknowledgeAsync(currentTask.getMessageId()).exceptionally(throwable -> {
+          log.error("Encountered error when acknowledging completed task with id {}", currentTask.getMessageId(), throwable);
+          setCurrentError(throwable);
+          return null;
+        });
         currentTask = null;
       } else {
         return retval;
@@ -95,7 +109,7 @@ public class BatchSourceExecutor<T> implements Source<T> {
 
   private void getBatchSourceConfigs(Map<String, Object> config) {
     if (!config.containsKey(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY)
-        || !config.containsKey(BatchSourceConfig.BATCHSOURCE_CLASSNAME_KEY)) {
+      || !config.containsKey(BatchSourceConfig.BATCHSOURCE_CLASSNAME_KEY)) {
       throw new IllegalArgumentException("Batch Configs cannot be found");
     }
 
@@ -108,18 +122,18 @@ public class BatchSourceExecutor<T> implements Source<T> {
     // First init the batchsource
     ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
     Object userClassObject = Reflections.createInstance(
-            batchSourceClassName,
-            clsLoader);
+      batchSourceClassName,
+      clsLoader);
     if (userClassObject instanceof BatchSource) {
-        batchSource = (BatchSource) userClassObject;
+      batchSource = (BatchSource) userClassObject;
     } else {
       throw new IllegalArgumentException("BatchSource does not implement the correct interface");
     }
 
     // next init the discovery triggerer
     Object discoveryClassObject = Reflections.createInstance(
-            batchSourceConfig.getDiscoveryTriggererClassName(),
-            clsLoader);
+      batchSourceConfig.getDiscoveryTriggererClassName(),
+      clsLoader);
     if (discoveryClassObject instanceof BatchSourceTriggerer) {
       discoveryTriggerer = (BatchSourceTriggerer) discoveryClassObject;
     } else {
@@ -128,22 +142,38 @@ public class BatchSourceExecutor<T> implements Source<T> {
   }
 
   private void start() throws Exception {
+    isRunning = true;
     createIntermediateTopicConsumer();
     batchSource.open(this.config, this.sourceContext);
     if (sourceContext.getInstanceId() == 0) {
       discoveryTriggerer.init(batchSourceConfig.getDiscoveryTriggererConfig(),
-                              this.sourceContext);
+        this.sourceContext);
       discoveryTriggerer.start(this::triggerDiscover);
     }
   }
 
-  private void triggerDiscover(String discoveredEvent) {
-    try {
-      batchSource.discover((task) -> this.taskEater(discoveredEvent, task));
-    } catch (Exception e) {
-      log.error("Error on discover", e);
-      throw new RuntimeException(e);
+  volatile boolean discoverInProgress = false;
+  private synchronized void triggerDiscover(String discoveredEvent) {
+
+    if (discoverInProgress) {
+      log.info("Discovery is already in progress");
+      return;
+    } else {
+      discoverInProgress = true;
     }
+    // Run this code asynchronous so it doesn't block processing of the tasks
+    discoveryThread.submit(() -> {
+      try {
+        batchSource.discover(task -> taskEater(discoveredEvent, task));
+      } catch (Exception e) {
+        if (isRunning || !(e instanceof InterruptedException)) {
+          log.error("Encountered error during task discovery", e);
+          setCurrentError(e);
+        }
+      } finally {
+        discoverInProgress = false;
+      }
+    });
   }
 
   private void taskEater(String discoveredEvent, byte[] task) {
@@ -153,6 +183,7 @@ public class BatchSourceExecutor<T> implements Source<T> {
       properties.put("produceTime", String.valueOf(System.currentTimeMillis()));
       TypedMessageBuilder<byte[]> message = sourceContext.newOutputMessage(intermediateTopicName, Schema.BYTES);
       message.value(task).properties(properties);
+      // Note: we can only make this send async if the api returns a future to the connector so that errors can be handled by the connector
       message.send();
     } catch (Exception e) {
       log.error("error writing discovered task to intermediate topic", e);
@@ -160,9 +191,9 @@ public class BatchSourceExecutor<T> implements Source<T> {
     }
   }
 
-  private void prepareInternal() {
+  private void prepareInternal(Message<byte[]> task) {
     try {
-      batchSource.prepare(currentTask.getValue());
+      batchSource.prepare(task.getValue());
     } catch (Exception e) {
       log.error("Error on prepare", e);
       throw new RuntimeException(e);
@@ -175,6 +206,7 @@ public class BatchSourceExecutor<T> implements Source<T> {
   }
 
   private void stop() throws Exception {
+    isRunning = false;
     Exception ex = null;
     if (discoveryTriggerer != null) {
       try {
@@ -185,6 +217,14 @@ public class BatchSourceExecutor<T> implements Source<T> {
       }
       discoveryTriggerer = null;
     }
+
+    discoveryThread.shutdownNow();
+    try {
+      discoveryThread.awaitTermination(10, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      log.warn("Shutdown of discovery thread was interrupted");
+    }
+
     if (intermediateTopicConsumer != null) {
       try {
         intermediateTopicConsumer.close();
@@ -255,11 +295,24 @@ public class BatchSourceExecutor<T> implements Source<T> {
     }
   }
 
-
-  private void retrieveNextTask() throws Exception {
-    currentTask = intermediateTopicConsumer.receive();
-    return;
+  private Message<byte[]> retrieveNextTask() throws Exception {
+    while(true) {
+      if (currentError != null) {
+        throw currentError;
+      }
+      Message<byte[]> taskMessage = intermediateTopicConsumer.receive(5, TimeUnit.SECONDS);
+      if (taskMessage != null) {
+        return taskMessage;
+      }
+    }
   }
 
+  private void setCurrentError(Throwable error) {
+    if (error instanceof Exception) {
+      currentError = (Exception) error;
+    } else {
+      currentError = new RuntimeException(error.getCause());
+    }
+  }
 }
 
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutorTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutorTest.java
index 5125453..65a8928 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutorTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutorTest.java
@@ -21,25 +21,28 @@ package org.apache.pulsar.functions.source.batch;
 
 import com.google.gson.Gson;
 import lombok.Getter;
-import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.common.io.BatchSourceConfig;
 import org.apache.pulsar.functions.api.Record;
-
 import org.apache.pulsar.io.core.BatchPushSource;
 import org.apache.pulsar.io.core.BatchSource;
 import org.apache.pulsar.io.core.BatchSourceTriggerer;
 import org.apache.pulsar.io.core.SourceContext;
 import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import java.util.*;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.function.Consumer;
 
 /**
@@ -49,13 +52,13 @@ public class BatchSourceExecutorTest {
 
   public static class TestBatchSource implements BatchSource<String> {
     @Getter
-    private static int prepareCount;
+    public static int prepareCount;
     @Getter
-    private static int discoverCount;
+    public static int discoverCount;
     @Getter
-    private static int recordCount;
+    public static int recordCount;
     @Getter
-    private static int closeCount;
+    public static int closeCount;
     private Record record = Mockito.mock(Record.class);
     public TestBatchSource() { }
 
@@ -93,15 +96,22 @@ public class BatchSourceExecutorTest {
     }
   }
 
+  public static class TestBatchSourceFailDiscovery extends TestBatchSource {
+    @Override
+    public void discover(Consumer<byte[]> taskEater) throws Exception {
+      throw new Exception("test");
+    }
+  }
+
   public static class TestBatchPushSource extends BatchPushSource<String> {
     @Getter
-    private static int prepareCount;
+    public static int prepareCount;
     @Getter
-    private static int discoverCount;
+    public static int discoverCount;
     @Getter
-    private static int recordCount;
+    public static int recordCount;
     @Getter
-    private static int closeCount;
+    public static int closeCount;
     private Record record = Mockito.mock(Record.class);
     public TestBatchPushSource() { }
 
@@ -135,7 +145,10 @@ public class BatchSourceExecutorTest {
     }
   }
 
+  public static LinkedBlockingQueue<String> triggerQueue = new LinkedBlockingQueue<>();
+  public static LinkedBlockingQueue<String> completedQueue = new LinkedBlockingQueue<>();
   public static class TestDiscoveryTriggerer implements BatchSourceTriggerer {
+    @Getter
     private Consumer<String> trigger;
     private Thread thread;
 
@@ -150,12 +163,12 @@ public class BatchSourceExecutorTest {
 
     @Override
     public void start(Consumer<String> trigger) {
+
       this.trigger = trigger;
       thread = new Thread(() -> {
         while(true) {
           try {
-            Thread.sleep(100);
-            trigger.accept("Triggered");
+            trigger.accept(triggerQueue.take());
           } catch (InterruptedException e) {
             break;
           }
@@ -186,7 +199,6 @@ public class BatchSourceExecutorTest {
   private ConsumerBuilder consumerBuilder;
   private org.apache.pulsar.client.api.Consumer<byte[]> consumer;
   private TypedMessageBuilder<byte[]> messageBuilder;
-  private CyclicBarrier discoveryBarrier;
   private Message<byte[]> discoveredTask;
 
   private static Map<String, Object> createConfig(String className, BatchSourceConfig batchConfig) {
@@ -208,6 +220,14 @@ public class BatchSourceExecutorTest {
 
   @BeforeMethod
   public void setUp() throws Exception {
+    TestBatchSource.closeCount = 0;
+    TestBatchSource.discoverCount = 0;
+    TestBatchSource.prepareCount = 0;
+    TestBatchSource.recordCount = 0;
+    TestBatchPushSource.closeCount = 0;
+    TestBatchPushSource.discoverCount = 0;
+    TestBatchPushSource.prepareCount = 0;
+    TestBatchPushSource.recordCount = 0;
     testBatchSource = new TestBatchSource();
     testBatchPushSource = new TestBatchPushSource();
     batchSourceExecutor = new BatchSourceExecutor<>();
@@ -225,9 +245,12 @@ public class BatchSourceExecutorTest {
     Mockito.doReturn(consumerBuilder).when(consumerBuilder).properties(Mockito.anyMap());
     Mockito.doReturn(consumerBuilder).when(consumerBuilder).topic(Mockito.any());
     discoveredTask = Mockito.mock(Message.class);
+    Mockito.doReturn(MessageId.latest).when(discoveredTask).getMessageId();
     consumer = Mockito.mock(org.apache.pulsar.client.api.Consumer.class);
     Mockito.doReturn(discoveredTask).when(consumer).receive();
+    Mockito.doReturn(discoveredTask).when(consumer).receive(Mockito.anyInt(), Mockito.any());
     Mockito.doReturn(CompletableFuture.completedFuture(consumer)).when(consumerBuilder).subscribeAsync();
+    Mockito.doReturn(CompletableFuture.completedFuture(null)).when(consumer).acknowledgeAsync(Mockito.any(MessageId.class));
     Mockito.doReturn(consumerBuilder).when(context).newConsumerBuilder(Schema.BYTES);
     messageBuilder = Mockito.mock(TypedMessageBuilder.class);
     Mockito.doReturn(messageBuilder).when(messageBuilder).value(Mockito.any());
@@ -235,16 +258,13 @@ public class BatchSourceExecutorTest {
     Mockito.doReturn(messageBuilder).when(context).newOutputMessage(Mockito.anyString(), Mockito.any());
 
     // Discovery
-    discoveryBarrier = new CyclicBarrier(2);
-    Mockito.doAnswer(new Answer<MessageId>() {
-      @Override public MessageId answer(InvocationOnMock invocation) {
-        try {
-          discoveryBarrier.await();
-        } catch (Exception e) {
-          throw new RuntimeException();
-        }
-        return null;
+    Mockito.doAnswer((Answer<MessageId>) invocation -> {
+      try {
+        completedQueue.put("done");
+      } catch (Exception e) {
+        throw new RuntimeException();
       }
+      return null;
     }).when(messageBuilder).send();
   }
 
@@ -333,43 +353,53 @@ public class BatchSourceExecutorTest {
     batchSourceExecutor.open(pushConfig, context);
   }
 
-  @Test
+  @Test (timeOut = 5000)
   public void testLifeCycle() throws Exception {
     batchSourceExecutor.open(config, context);
-    Assert.assertTrue(testBatchSource.getDiscoverCount() < 1);
-    discoveryBarrier.await();
-    Assert.assertTrue(testBatchSource.getDiscoverCount() >= 1);
-    Assert.assertTrue(testBatchSource.getDiscoverCount() <= 2);
+    Assert.assertEquals(testBatchSource.getDiscoverCount(), 0);
+    triggerQueue.put("trigger");
+    completedQueue.take();
+    Assert.assertEquals(testBatchSource.getDiscoverCount(), 1);
     for (int i = 0; i < 5; ++i) {
       batchSourceExecutor.read();
     }
     Assert.assertEquals(testBatchSource.getRecordCount(), 6);
-    Assert.assertTrue(testBatchSource.getDiscoverCount() >= 1);
-    Assert.assertTrue(testBatchSource.getDiscoverCount() <= 2);
-    discoveryBarrier.await();
-    Assert.assertTrue(testBatchSource.getDiscoverCount() >= 2);
-    Assert.assertTrue(testBatchSource.getDiscoverCount() <= 3);
+    Assert.assertEquals(testBatchSource.getDiscoverCount(), 1);
+    triggerQueue.put("trigger");
+    completedQueue.take();
+    Assert.assertTrue(testBatchSource.getDiscoverCount() == 2);
     batchSourceExecutor.close();
     Assert.assertEquals(testBatchSource.getCloseCount(), 1);
   }
 
-  @Test
+  @Test (timeOut = 5000)
   public void testPushLifeCycle() throws Exception {
     batchSourceExecutor.open(pushConfig, context);
-    Assert.assertTrue(testBatchPushSource.getDiscoverCount() < 1);
-    discoveryBarrier.await();
-    Assert.assertTrue(testBatchPushSource.getDiscoverCount() >= 1);
-    Assert.assertTrue(testBatchPushSource.getDiscoverCount() <= 2);
+    Assert.assertEquals(testBatchPushSource.getDiscoverCount(), 0);
+    triggerQueue.put("trigger");
+    completedQueue.take();
+    Assert.assertEquals(testBatchPushSource.getDiscoverCount(), 1);
     for (int i = 0; i < 5; ++i) {
       batchSourceExecutor.read();
     }
     Assert.assertEquals(testBatchPushSource.getRecordCount(), 5);
-    Assert.assertTrue(testBatchPushSource.getDiscoverCount() >= 1);
-    Assert.assertTrue(testBatchPushSource.getDiscoverCount() <= 2);
-    discoveryBarrier.await();
-    Assert.assertTrue(testBatchPushSource.getDiscoverCount() >= 2);
-    Assert.assertTrue(testBatchPushSource.getDiscoverCount() <= 3);
+    Assert.assertEquals(testBatchPushSource.getDiscoverCount(), 1);
+    triggerQueue.put("trigger");
+    completedQueue.take();
+    Assert.assertEquals(testBatchPushSource.getDiscoverCount(), 2);
     batchSourceExecutor.close();
     Assert.assertEquals(testBatchPushSource.getCloseCount(), 1);
   }
+
+
+  @Test(expectedExceptions = Exception.class, expectedExceptionsMessageRegExp = "test", timeOut = 1000)
+  public void testDiscoveryPhaseError() throws Exception {
+    config = createConfig(TestBatchSourceFailDiscovery.class.getName(), testBatchConfig);
+    batchSourceExecutor.open(config, context);
+    triggerQueue.put("trigger");
+    while (true) {
+      batchSourceExecutor.read();
+      Thread.sleep(100);
+    }
+  }
 }
\ No newline at end of file