You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2020/11/10 22:15:28 UTC
[pulsar] branch master updated: Optimize batch source discovery and
task ack (#8498)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5c756ee Optimize batch source discovery and task ack (#8498)
5c756ee is described below
commit 5c756ee1b8c7a4360520bde2c3f5596826d70fc6
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Tue Nov 10 14:14:54 2020 -0800
Optimize batch source discovery and task ack (#8498)
Co-authored-by: Jerry Peng <je...@splunk.com>
---
.../functions/instance/JavaInstanceRunnable.java | 6 +-
.../source/batch/BatchSourceExecutor.java | 99 +++++++++++++----
.../source/batch/BatchSourceExecutorTest.java | 120 +++++++++++++--------
3 files changed, 154 insertions(+), 71 deletions(-)
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 2b2f360..6b27868 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -460,7 +460,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
}
}
- private Record readInput() {
+ private Record readInput() throws Exception {
Record record;
if (!(this.source instanceof PulsarSource)) {
Thread.currentThread().setContextClassLoader(functionClassLoader);
@@ -469,8 +469,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
record = this.source.read();
} catch (Exception e) {
stats.incrSourceExceptions(e);
- log.info("Encountered exception in source read: ", e);
- throw new RuntimeException(e);
+ log.error("Encountered exception in source read", e);
+ throw e;
} finally {
Thread.currentThread().setContextClassLoader(instanceClassLoader);
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java
index 4474b9b..5937616 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.functions.source.batch;
import com.google.gson.Gson;
+import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -41,6 +42,9 @@ import org.apache.pulsar.io.core.SourceContext;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
/**
* BatchSourceExecutor wraps BatchSource as Source. Thus from Pulsar IO perspective, it is running a regular
@@ -63,13 +67,16 @@ public class BatchSourceExecutor<T> implements Source<T> {
private String batchSourceClassName;
private BatchSource<T> batchSource;
private String intermediateTopicName;
+ private volatile Exception currentError = null;
+ private volatile boolean isRunning = false;
+ private ExecutorService discoveryThread = Executors.newSingleThreadExecutor(new DefaultThreadFactory("batch-source-discovery"));
@Override
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
this.config = config;
this.sourceContext = sourceContext;
this.intermediateTopicName = SourceConfigUtils.computeBatchSourceIntermediateTopicName(sourceContext.getTenant(),
- sourceContext.getNamespace(), sourceContext.getSourceName()).toString();
+ sourceContext.getNamespace(), sourceContext.getSourceName()).toString();
this.getBatchSourceConfigs(config);
this.initializeBatchSource();
this.start();
@@ -78,14 +85,21 @@ public class BatchSourceExecutor<T> implements Source<T> {
@Override
public Record<T> read() throws Exception {
while (true) {
+ if (currentError != null) {
+ throw currentError;
+ }
if (currentTask == null) {
- retrieveNextTask();
- prepareInternal();
+ currentTask = retrieveNextTask();
+ prepareInternal(currentTask);
}
Record<T> retval = batchSource.readNext();
if (retval == null) {
// signals end if this batch
- intermediateTopicConsumer.acknowledge(currentTask.getMessageId());
+ intermediateTopicConsumer.acknowledgeAsync(currentTask.getMessageId()).exceptionally(throwable -> {
+ log.error("Encountered error when acknowledging completed task with id {}", currentTask.getMessageId(), throwable);
+ setCurrentError(throwable);
+ return null;
+ });
currentTask = null;
} else {
return retval;
@@ -95,7 +109,7 @@ public class BatchSourceExecutor<T> implements Source<T> {
private void getBatchSourceConfigs(Map<String, Object> config) {
if (!config.containsKey(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY)
- || !config.containsKey(BatchSourceConfig.BATCHSOURCE_CLASSNAME_KEY)) {
+ || !config.containsKey(BatchSourceConfig.BATCHSOURCE_CLASSNAME_KEY)) {
throw new IllegalArgumentException("Batch Configs cannot be found");
}
@@ -108,18 +122,18 @@ public class BatchSourceExecutor<T> implements Source<T> {
// First init the batchsource
ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
Object userClassObject = Reflections.createInstance(
- batchSourceClassName,
- clsLoader);
+ batchSourceClassName,
+ clsLoader);
if (userClassObject instanceof BatchSource) {
- batchSource = (BatchSource) userClassObject;
+ batchSource = (BatchSource) userClassObject;
} else {
throw new IllegalArgumentException("BatchSource does not implement the correct interface");
}
// next init the discovery triggerer
Object discoveryClassObject = Reflections.createInstance(
- batchSourceConfig.getDiscoveryTriggererClassName(),
- clsLoader);
+ batchSourceConfig.getDiscoveryTriggererClassName(),
+ clsLoader);
if (discoveryClassObject instanceof BatchSourceTriggerer) {
discoveryTriggerer = (BatchSourceTriggerer) discoveryClassObject;
} else {
@@ -128,22 +142,38 @@ public class BatchSourceExecutor<T> implements Source<T> {
}
private void start() throws Exception {
+ isRunning = true;
createIntermediateTopicConsumer();
batchSource.open(this.config, this.sourceContext);
if (sourceContext.getInstanceId() == 0) {
discoveryTriggerer.init(batchSourceConfig.getDiscoveryTriggererConfig(),
- this.sourceContext);
+ this.sourceContext);
discoveryTriggerer.start(this::triggerDiscover);
}
}
- private void triggerDiscover(String discoveredEvent) {
- try {
- batchSource.discover((task) -> this.taskEater(discoveredEvent, task));
- } catch (Exception e) {
- log.error("Error on discover", e);
- throw new RuntimeException(e);
+ volatile boolean discoverInProgress = false;
+ private synchronized void triggerDiscover(String discoveredEvent) {
+
+ if (discoverInProgress) {
+ log.info("Discovery is already in progress");
+ return;
+ } else {
+ discoverInProgress = true;
}
+ // Run this code asynchronous so it doesn't block processing of the tasks
+ discoveryThread.submit(() -> {
+ try {
+ batchSource.discover(task -> taskEater(discoveredEvent, task));
+ } catch (Exception e) {
+ if (isRunning || !(e instanceof InterruptedException)) {
+ log.error("Encountered error during task discovery", e);
+ setCurrentError(e);
+ }
+ } finally {
+ discoverInProgress = false;
+ }
+ });
}
private void taskEater(String discoveredEvent, byte[] task) {
@@ -153,6 +183,7 @@ public class BatchSourceExecutor<T> implements Source<T> {
properties.put("produceTime", String.valueOf(System.currentTimeMillis()));
TypedMessageBuilder<byte[]> message = sourceContext.newOutputMessage(intermediateTopicName, Schema.BYTES);
message.value(task).properties(properties);
+ // Note: we can only make this send async if the api returns a future to the connector so that errors can be handled by the connector
message.send();
} catch (Exception e) {
log.error("error writing discovered task to intermediate topic", e);
@@ -160,9 +191,9 @@ public class BatchSourceExecutor<T> implements Source<T> {
}
}
- private void prepareInternal() {
+ private void prepareInternal(Message<byte[]> task) {
try {
- batchSource.prepare(currentTask.getValue());
+ batchSource.prepare(task.getValue());
} catch (Exception e) {
log.error("Error on prepare", e);
throw new RuntimeException(e);
@@ -175,6 +206,7 @@ public class BatchSourceExecutor<T> implements Source<T> {
}
private void stop() throws Exception {
+ isRunning = false;
Exception ex = null;
if (discoveryTriggerer != null) {
try {
@@ -185,6 +217,14 @@ public class BatchSourceExecutor<T> implements Source<T> {
}
discoveryTriggerer = null;
}
+
+ discoveryThread.shutdownNow();
+ try {
+ discoveryThread.awaitTermination(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ log.warn("Shutdown of discovery thread was interrupted");
+ }
+
if (intermediateTopicConsumer != null) {
try {
intermediateTopicConsumer.close();
@@ -255,11 +295,24 @@ public class BatchSourceExecutor<T> implements Source<T> {
}
}
-
- private void retrieveNextTask() throws Exception {
- currentTask = intermediateTopicConsumer.receive();
- return;
+ private Message<byte[]> retrieveNextTask() throws Exception {
+ while(true) {
+ if (currentError != null) {
+ throw currentError;
+ }
+ Message<byte[]> taskMessage = intermediateTopicConsumer.receive(5, TimeUnit.SECONDS);
+ if (taskMessage != null) {
+ return taskMessage;
+ }
+ }
}
+ private void setCurrentError(Throwable error) {
+ if (error instanceof Exception) {
+ currentError = (Exception) error;
+ } else {
+ currentError = new RuntimeException(error.getCause());
+ }
+ }
}
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutorTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutorTest.java
index 5125453..65a8928 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutorTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutorTest.java
@@ -21,25 +21,28 @@ package org.apache.pulsar.functions.source.batch;
import com.google.gson.Gson;
import lombok.Getter;
-import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.common.io.BatchSourceConfig;
import org.apache.pulsar.functions.api.Record;
-
import org.apache.pulsar.io.core.BatchPushSource;
import org.apache.pulsar.io.core.BatchSource;
import org.apache.pulsar.io.core.BatchSourceTriggerer;
import org.apache.pulsar.io.core.SourceContext;
import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import java.util.*;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
/**
@@ -49,13 +52,13 @@ public class BatchSourceExecutorTest {
public static class TestBatchSource implements BatchSource<String> {
@Getter
- private static int prepareCount;
+ public static int prepareCount;
@Getter
- private static int discoverCount;
+ public static int discoverCount;
@Getter
- private static int recordCount;
+ public static int recordCount;
@Getter
- private static int closeCount;
+ public static int closeCount;
private Record record = Mockito.mock(Record.class);
public TestBatchSource() { }
@@ -93,15 +96,22 @@ public class BatchSourceExecutorTest {
}
}
+ public static class TestBatchSourceFailDiscovery extends TestBatchSource {
+ @Override
+ public void discover(Consumer<byte[]> taskEater) throws Exception {
+ throw new Exception("test");
+ }
+ }
+
public static class TestBatchPushSource extends BatchPushSource<String> {
@Getter
- private static int prepareCount;
+ public static int prepareCount;
@Getter
- private static int discoverCount;
+ public static int discoverCount;
@Getter
- private static int recordCount;
+ public static int recordCount;
@Getter
- private static int closeCount;
+ public static int closeCount;
private Record record = Mockito.mock(Record.class);
public TestBatchPushSource() { }
@@ -135,7 +145,10 @@ public class BatchSourceExecutorTest {
}
}
+ public static LinkedBlockingQueue<String> triggerQueue = new LinkedBlockingQueue<>();
+ public static LinkedBlockingQueue<String> completedQueue = new LinkedBlockingQueue<>();
public static class TestDiscoveryTriggerer implements BatchSourceTriggerer {
+ @Getter
private Consumer<String> trigger;
private Thread thread;
@@ -150,12 +163,12 @@ public class BatchSourceExecutorTest {
@Override
public void start(Consumer<String> trigger) {
+
this.trigger = trigger;
thread = new Thread(() -> {
while(true) {
try {
- Thread.sleep(100);
- trigger.accept("Triggered");
+ trigger.accept(triggerQueue.take());
} catch (InterruptedException e) {
break;
}
@@ -186,7 +199,6 @@ public class BatchSourceExecutorTest {
private ConsumerBuilder consumerBuilder;
private org.apache.pulsar.client.api.Consumer<byte[]> consumer;
private TypedMessageBuilder<byte[]> messageBuilder;
- private CyclicBarrier discoveryBarrier;
private Message<byte[]> discoveredTask;
private static Map<String, Object> createConfig(String className, BatchSourceConfig batchConfig) {
@@ -208,6 +220,14 @@ public class BatchSourceExecutorTest {
@BeforeMethod
public void setUp() throws Exception {
+ TestBatchSource.closeCount = 0;
+ TestBatchSource.discoverCount = 0;
+ TestBatchSource.prepareCount = 0;
+ TestBatchSource.recordCount = 0;
+ TestBatchPushSource.closeCount = 0;
+ TestBatchPushSource.discoverCount = 0;
+ TestBatchPushSource.prepareCount = 0;
+ TestBatchPushSource.recordCount = 0;
testBatchSource = new TestBatchSource();
testBatchPushSource = new TestBatchPushSource();
batchSourceExecutor = new BatchSourceExecutor<>();
@@ -225,9 +245,12 @@ public class BatchSourceExecutorTest {
Mockito.doReturn(consumerBuilder).when(consumerBuilder).properties(Mockito.anyMap());
Mockito.doReturn(consumerBuilder).when(consumerBuilder).topic(Mockito.any());
discoveredTask = Mockito.mock(Message.class);
+ Mockito.doReturn(MessageId.latest).when(discoveredTask).getMessageId();
consumer = Mockito.mock(org.apache.pulsar.client.api.Consumer.class);
Mockito.doReturn(discoveredTask).when(consumer).receive();
+ Mockito.doReturn(discoveredTask).when(consumer).receive(Mockito.anyInt(), Mockito.any());
Mockito.doReturn(CompletableFuture.completedFuture(consumer)).when(consumerBuilder).subscribeAsync();
+ Mockito.doReturn(CompletableFuture.completedFuture(null)).when(consumer).acknowledgeAsync(Mockito.any(MessageId.class));
Mockito.doReturn(consumerBuilder).when(context).newConsumerBuilder(Schema.BYTES);
messageBuilder = Mockito.mock(TypedMessageBuilder.class);
Mockito.doReturn(messageBuilder).when(messageBuilder).value(Mockito.any());
@@ -235,16 +258,13 @@ public class BatchSourceExecutorTest {
Mockito.doReturn(messageBuilder).when(context).newOutputMessage(Mockito.anyString(), Mockito.any());
// Discovery
- discoveryBarrier = new CyclicBarrier(2);
- Mockito.doAnswer(new Answer<MessageId>() {
- @Override public MessageId answer(InvocationOnMock invocation) {
- try {
- discoveryBarrier.await();
- } catch (Exception e) {
- throw new RuntimeException();
- }
- return null;
+ Mockito.doAnswer((Answer<MessageId>) invocation -> {
+ try {
+ completedQueue.put("done");
+ } catch (Exception e) {
+ throw new RuntimeException();
}
+ return null;
}).when(messageBuilder).send();
}
@@ -333,43 +353,53 @@ public class BatchSourceExecutorTest {
batchSourceExecutor.open(pushConfig, context);
}
- @Test
+ @Test (timeOut = 5000)
public void testLifeCycle() throws Exception {
batchSourceExecutor.open(config, context);
- Assert.assertTrue(testBatchSource.getDiscoverCount() < 1);
- discoveryBarrier.await();
- Assert.assertTrue(testBatchSource.getDiscoverCount() >= 1);
- Assert.assertTrue(testBatchSource.getDiscoverCount() <= 2);
+ Assert.assertEquals(testBatchSource.getDiscoverCount(), 0);
+ triggerQueue.put("trigger");
+ completedQueue.take();
+ Assert.assertEquals(testBatchSource.getDiscoverCount(), 1);
for (int i = 0; i < 5; ++i) {
batchSourceExecutor.read();
}
Assert.assertEquals(testBatchSource.getRecordCount(), 6);
- Assert.assertTrue(testBatchSource.getDiscoverCount() >= 1);
- Assert.assertTrue(testBatchSource.getDiscoverCount() <= 2);
- discoveryBarrier.await();
- Assert.assertTrue(testBatchSource.getDiscoverCount() >= 2);
- Assert.assertTrue(testBatchSource.getDiscoverCount() <= 3);
+ Assert.assertEquals(testBatchSource.getDiscoverCount(), 1);
+ triggerQueue.put("trigger");
+ completedQueue.take();
+ Assert.assertTrue(testBatchSource.getDiscoverCount() == 2);
batchSourceExecutor.close();
Assert.assertEquals(testBatchSource.getCloseCount(), 1);
}
- @Test
+ @Test (timeOut = 5000)
public void testPushLifeCycle() throws Exception {
batchSourceExecutor.open(pushConfig, context);
- Assert.assertTrue(testBatchPushSource.getDiscoverCount() < 1);
- discoveryBarrier.await();
- Assert.assertTrue(testBatchPushSource.getDiscoverCount() >= 1);
- Assert.assertTrue(testBatchPushSource.getDiscoverCount() <= 2);
+ Assert.assertEquals(testBatchPushSource.getDiscoverCount(), 0);
+ triggerQueue.put("trigger");
+ completedQueue.take();
+ Assert.assertEquals(testBatchPushSource.getDiscoverCount(), 1);
for (int i = 0; i < 5; ++i) {
batchSourceExecutor.read();
}
Assert.assertEquals(testBatchPushSource.getRecordCount(), 5);
- Assert.assertTrue(testBatchPushSource.getDiscoverCount() >= 1);
- Assert.assertTrue(testBatchPushSource.getDiscoverCount() <= 2);
- discoveryBarrier.await();
- Assert.assertTrue(testBatchPushSource.getDiscoverCount() >= 2);
- Assert.assertTrue(testBatchPushSource.getDiscoverCount() <= 3);
+ Assert.assertEquals(testBatchPushSource.getDiscoverCount(), 1);
+ triggerQueue.put("trigger");
+ completedQueue.take();
+ Assert.assertEquals(testBatchPushSource.getDiscoverCount(), 2);
batchSourceExecutor.close();
Assert.assertEquals(testBatchPushSource.getCloseCount(), 1);
}
+
+
+ @Test(expectedExceptions = Exception.class, expectedExceptionsMessageRegExp = "test", timeOut = 1000)
+ public void testDiscoveryPhaseError() throws Exception {
+ config = createConfig(TestBatchSourceFailDiscovery.class.getName(), testBatchConfig);
+ batchSourceExecutor.open(config, context);
+ triggerQueue.put("trigger");
+ while (true) {
+ batchSourceExecutor.read();
+ Thread.sleep(100);
+ }
+ }
}
\ No newline at end of file