You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2020/06/09 23:58:48 UTC
[pulsar] branch master updated: Have metadata tailer use its own
thread for processing (#7211)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e64d951 Have metadata tailer use its own thread for processing (#7211)
e64d951 is described below
commit e64d951bb6002e07a64674a7373d945d3e224bb6
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Tue Jun 9 16:58:32 2020 -0700
Have metadata tailer use its own thread for processing (#7211)
* Have metadata tailer use its own thread for processing
* Merged with master
* Address comments
* Address comments
Co-authored-by: Sanjeev Kulkarni <sa...@splunk.com>
---
.../functions/worker/FunctionMetaDataManager.java | 18 ++---
.../worker/FunctionMetaDataTopicTailer.java | 90 +++++++++++++---------
.../pulsar/functions/worker/WorkerService.java | 2 +-
.../worker/FunctionMetaDataManagerTest.java | 24 +++---
.../worker/FunctionMetaDataTopicTailerTest.java | 37 ++++++---
5 files changed, 101 insertions(+), 70 deletions(-)
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
index 2ab913b..80f577d 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
@@ -60,6 +60,7 @@ public class FunctionMetaDataManager implements AutoCloseable {
private final SchedulerManager schedulerManager;
private final WorkerConfig workerConfig;
private final PulsarClient pulsarClient;
+ private final ErrorNotifier errorNotifier;
private FunctionMetaDataTopicTailer functionMetaDataTopicTailer;
@@ -69,12 +70,14 @@ public class FunctionMetaDataManager implements AutoCloseable {
public FunctionMetaDataManager(WorkerConfig workerConfig,
SchedulerManager schedulerManager,
- PulsarClient pulsarClient) throws PulsarClientException {
+ PulsarClient pulsarClient,
+ ErrorNotifier errorNotifier) throws PulsarClientException {
this.workerConfig = workerConfig;
this.pulsarClient = pulsarClient;
this.serviceRequestManager = getServiceRequestManager(
this.pulsarClient, this.workerConfig.getFunctionMetadataTopic());
this.schedulerManager = schedulerManager;
+ this.errorNotifier = errorNotifier;
}
/**
@@ -88,17 +91,12 @@ public class FunctionMetaDataManager implements AutoCloseable {
public void initialize() {
log.info("/** Initializing Function Metadata Manager **/");
try {
- Reader<byte[]> reader = pulsarClient.newReader()
- .topic(this.workerConfig.getFunctionMetadataTopic())
- .startMessageId(MessageId.earliest)
- .readerName(workerConfig.getWorkerId() + "-function-metadata-manager")
- .create();
-
- this.functionMetaDataTopicTailer = new FunctionMetaDataTopicTailer(this, reader);
+ this.functionMetaDataTopicTailer = new FunctionMetaDataTopicTailer(this,
+ pulsarClient.newReader(), this.workerConfig, this.errorNotifier);
// read all existing messages
this.setInitializePhase(true);
- while (reader.hasMessageAvailable()) {
- this.functionMetaDataTopicTailer.processRequest(reader.readNext());
+ while (this.functionMetaDataTopicTailer.getReader().hasMessageAvailable()) {
+ this.functionMetaDataTopicTailer.processRequest(this.functionMetaDataTopicTailer.getReader().readNext());
}
this.setInitializePhase(false);
// schedule functions if necessary
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
index b7108e9..55a67a6 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
@@ -19,78 +19,92 @@
package org.apache.pulsar.functions.worker;
import java.io.IOException;
-import java.util.function.Function;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ReaderBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.proto.Request.ServiceRequest;
@Slf4j
public class FunctionMetaDataTopicTailer
- implements java.util.function.Consumer<Message<byte[]>>, Function<Throwable, Void>, AutoCloseable {
+ implements Runnable, AutoCloseable {
private final FunctionMetaDataManager functionMetaDataManager;
+ @Getter
private final Reader<byte[]> reader;
+ private final Thread readerThread;
+ private volatile boolean running;
+ private ErrorNotifier errorNotifier;
public FunctionMetaDataTopicTailer(FunctionMetaDataManager functionMetaDataManager,
- Reader<byte[]> reader)
+ ReaderBuilder readerBuilder, WorkerConfig workerConfig,
+ ErrorNotifier errorNotifier)
throws PulsarClientException {
this.functionMetaDataManager = functionMetaDataManager;
- this.reader = reader;
+ this.reader = readerBuilder
+ .topic(workerConfig.getFunctionMetadataTopic())
+ .startMessageId(MessageId.earliest)
+ .readerName(workerConfig.getWorkerId() + "-function-metadata-manager")
+ .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-metadata-manager")
+ .create();
+ readerThread = new Thread(this);
+ readerThread.setName("function-metadata-tailer-thread");
+ this.errorNotifier = errorNotifier;
}
public void start() {
- receiveOne();
+ running = true;
+ readerThread.start();
}
- private void receiveOne() {
- reader.readNextAsync()
- .thenAccept(this)
- .exceptionally(this);
+ @Override
+ public void run() {
+ while(running) {
+ try {
+ Message<byte[]> msg = reader.readNext();
+ processRequest(msg);
+ } catch (Throwable th) {
+ if (running) {
+ log.error("Encountered error in metadata tailer", th);
+ // trigger fatal error
+ running = false;
+ errorNotifier.triggerError(th);
+ } else {
+ if (!(th instanceof InterruptedException || th.getCause() instanceof InterruptedException)) {
+ log.warn("Encountered error when metadata tailer is not running", th);
+ }
+ return;
+ }
+ }
+ }
}
@Override
public void close() {
log.info("Stopping function metadata tailer");
try {
- reader.close();
+ running = false;
+ if (readerThread != null && readerThread.isAlive()) {
+ readerThread.interrupt();
+ }
+ if (reader != null) {
+ reader.close();
+ }
} catch (IOException e) {
log.error("Failed to stop function metadata tailer", e);
}
- log.info("Stopped function function metadata tailer");
+ log.info("Stopped function metadata tailer");
}
- public void processRequest(Message<byte[]> msg) {
- ServiceRequest serviceRequest;
-
- try {
- serviceRequest = ServiceRequest.parseFrom(msg.getData());
- } catch (IOException e) {
- log.error("Received bad service request at message {}", msg.getMessageId(), e);
- // TODO: find a better way to handle bad request
- throw new RuntimeException(e);
- }
+ public void processRequest(Message<byte[]> msg) throws IOException {
+ ServiceRequest serviceRequest = ServiceRequest.parseFrom(msg.getData());
if (log.isDebugEnabled()) {
log.debug("Received Service Request: {}", serviceRequest);
}
-
this.functionMetaDataManager.processRequest(msg.getMessageId(), serviceRequest);
}
-
- @Override
- public void accept(Message<byte[]> msg) {
-
- processRequest(msg);
- // receive next request
- receiveOne();
- }
-
- @Override
- public Void apply(Throwable cause) {
- log.error("Failed to retrieve messages from function state topic", cause);
- // TODO: find a better way to handle consumer functions
- throw new RuntimeException(cause);
- }
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index ec2c44b..e3ea170 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -169,7 +169,7 @@ public class WorkerService {
//create function meta data manager
this.functionMetaDataManager = new FunctionMetaDataManager(
- this.workerConfig, this.schedulerManager, this.client);
+ this.workerConfig, this.schedulerManager, this.client, errorNotifier);
this.connectorsManager = new ConnectorsManager(workerConfig);
this.functionsManager = new FunctionsManager(workerConfig);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
index da6852c..20f30a9 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
@@ -63,7 +63,7 @@ public class FunctionMetaDataManagerTest {
FunctionMetaDataManager functionMetaDataManager = spy(
new FunctionMetaDataManager(new WorkerConfig(),
mock(SchedulerManager.class),
- mockPulsarClient()));
+ mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
Map<String, Function.FunctionMetaData> functionMetaDataMap1 = new HashMap<>();
Function.FunctionMetaData f1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
@@ -104,7 +104,7 @@ public class FunctionMetaDataManagerTest {
FunctionMetaDataManager functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
mock(SchedulerManager.class),
- mockPulsarClient()));
+ mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
Function.FunctionMetaData m1 = Function.FunctionMetaData.newBuilder()
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")).build();
@@ -135,7 +135,7 @@ public class FunctionMetaDataManagerTest {
functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
mock(SchedulerManager.class),
- mockPulsarClient()));
+ mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
Map<String, Function.FunctionMetaData> functionMetaDataMap = new HashMap<>();
Function.FunctionMetaData m2 = Function.FunctionMetaData.newBuilder()
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
@@ -177,7 +177,7 @@ public class FunctionMetaDataManagerTest {
FunctionMetaDataManager functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
mock(SchedulerManager.class),
- mockPulsarClient()));
+ mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
Map<String, Function.FunctionMetaData> functionMetaDataMap1 = new HashMap<>();
Function.FunctionMetaData f1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
@@ -232,7 +232,7 @@ public class FunctionMetaDataManagerTest {
FunctionMetaDataManager functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
mock(SchedulerManager.class),
- mockPulsarClient()));
+ mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
Function.FunctionMetaData m1 = Function.FunctionMetaData.newBuilder()
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
.setNamespace("namespace-1").setTenant("tenant-1")).setVersion(version).build();
@@ -271,7 +271,7 @@ public class FunctionMetaDataManagerTest {
FunctionMetaDataManager functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
mock(SchedulerManager.class),
- mockPulsarClient()));
+ mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
Mockito.doNothing().when(functionMetaDataManager).processUpdate(any(Request.ServiceRequest.class));
Mockito.doNothing().when(functionMetaDataManager).proccessDeregister(any(Request.ServiceRequest.class));
@@ -309,7 +309,7 @@ public class FunctionMetaDataManagerTest {
FunctionMetaDataManager functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
schedulerManager,
- mockPulsarClient()));
+ mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
// worker has no record of function
Function.FunctionMetaData m1 = Function.FunctionMetaData.newBuilder()
@@ -337,7 +337,7 @@ public class FunctionMetaDataManagerTest {
functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
schedulerManager,
- mockPulsarClient()));
+ mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
Function.FunctionMetaData m3 = Function.FunctionMetaData.newBuilder()
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
@@ -386,7 +386,7 @@ public class FunctionMetaDataManagerTest {
functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
schedulerManager,
- mockPulsarClient()));
+ mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
Function.FunctionMetaData m4 = Function.FunctionMetaData.newBuilder()
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
@@ -423,7 +423,7 @@ public class FunctionMetaDataManagerTest {
FunctionMetaDataManager functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
schedulerManager,
- mockPulsarClient()));
+ mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
// worker has no record of function
Function.FunctionMetaData test = Function.FunctionMetaData.newBuilder()
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-2")
@@ -450,7 +450,7 @@ public class FunctionMetaDataManagerTest {
functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
schedulerManager,
- mockPulsarClient()));
+ mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
functionMetaDataManager.setFunctionMetaData(test);
Function.FunctionMetaData m2 = Function.FunctionMetaData.newBuilder()
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
@@ -477,7 +477,7 @@ public class FunctionMetaDataManagerTest {
functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
schedulerManager,
- mockPulsarClient()));
+ mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
functionMetaDataManager.setFunctionMetaData(test);
Function.FunctionMetaData m3 = Function.FunctionMetaData.newBuilder()
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailerTest.java
index bad0fff..b84af4b 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailerTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.functions.worker;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -25,14 +26,18 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.functions.proto.Request.ServiceRequest;
import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
import org.apache.pulsar.functions.worker.request.ServiceRequestUtils;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
@@ -45,13 +50,20 @@ public class FunctionMetaDataTopicTailerTest {
private static final String TEST_NAME = "test-fmt";
private final Reader reader;
+ private final ReaderBuilder readerBuilder;
private final FunctionMetaDataManager fsm;
private final FunctionMetaDataTopicTailer fsc;
public FunctionMetaDataTopicTailerTest() throws Exception {
this.reader = mock(Reader.class);
+ this.readerBuilder = mock(ReaderBuilder.class);
+ when(readerBuilder.topic(anyString())).thenReturn(readerBuilder);
+ when(readerBuilder.startMessageId(any(MessageId.class))).thenReturn(readerBuilder);
+ when(readerBuilder.readerName(anyString())).thenReturn(readerBuilder);
+ when(readerBuilder.subscriptionRolePrefix(anyString())).thenReturn(readerBuilder);
+ when(readerBuilder.create()).thenReturn(reader);
this.fsm = mock(FunctionMetaDataManager.class);
- this.fsc = new FunctionMetaDataTopicTailer(fsm, reader);
+ this.fsc = new FunctionMetaDataTopicTailer(fsm, readerBuilder, new WorkerConfig(), ErrorNotifier.getDefaultImpl() );
}
@AfterMethod
@@ -67,18 +79,25 @@ public class FunctionMetaDataTopicTailerTest {
Message msg = mock(Message.class);
when(msg.getData()).thenReturn(request.toByteArray());
-
- CompletableFuture<Message> receiveFuture = CompletableFuture.completedFuture(msg);
- when(reader.readNextAsync())
- .thenReturn(receiveFuture)
- .thenReturn(new CompletableFuture<>());
+ CountDownLatch readLatch = new CountDownLatch(1);
+ CountDownLatch processLatch = new CountDownLatch(1);
+ when(reader.readNext()).thenReturn(msg).then(new Answer<Message>() {
+ public Message answer(InvocationOnMock invocation) {
+ try {
+ readLatch.countDown();
+ processLatch.await();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ }
+ });
fsc.start();
- // wait for receive future to complete
- receiveFuture.thenApply(Function.identity()).get();
+ readLatch.await();
- verify(reader, times(2)).readNextAsync();
+ verify(reader, times(2)).readNext();
verify(fsm, times(1)).processRequest(any(), any(ServiceRequest.class));
}
}