You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2020/06/09 23:58:48 UTC

[pulsar] branch master updated: Have metadata tailer use its own thread for processing (#7211)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e64d951  Have metadata tailer use its own thread for processing (#7211)
e64d951 is described below

commit e64d951bb6002e07a64674a7373d945d3e224bb6
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Tue Jun 9 16:58:32 2020 -0700

    Have metadata tailer use its own thread for processing (#7211)
    
    * Have metadata tailer use its own thread for processing
    
    * Merged with master
    
    * Address comments
    
    * Address comments
    
    Co-authored-by: Sanjeev Kulkarni <sa...@splunk.com>
---
 .../functions/worker/FunctionMetaDataManager.java  | 18 ++---
 .../worker/FunctionMetaDataTopicTailer.java        | 90 +++++++++++++---------
 .../pulsar/functions/worker/WorkerService.java     |  2 +-
 .../worker/FunctionMetaDataManagerTest.java        | 24 +++---
 .../worker/FunctionMetaDataTopicTailerTest.java    | 37 ++++++---
 5 files changed, 101 insertions(+), 70 deletions(-)

diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
index 2ab913b..80f577d 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
@@ -60,6 +60,7 @@ public class FunctionMetaDataManager implements AutoCloseable {
     private final SchedulerManager schedulerManager;
     private final WorkerConfig workerConfig;
     private final PulsarClient pulsarClient;
+    private final ErrorNotifier errorNotifier;
 
     private FunctionMetaDataTopicTailer functionMetaDataTopicTailer;
 
@@ -69,12 +70,14 @@ public class FunctionMetaDataManager implements AutoCloseable {
 
     public FunctionMetaDataManager(WorkerConfig workerConfig,
                                    SchedulerManager schedulerManager,
-                                   PulsarClient pulsarClient) throws PulsarClientException {
+                                   PulsarClient pulsarClient,
+                                   ErrorNotifier errorNotifier) throws PulsarClientException {
         this.workerConfig = workerConfig;
         this.pulsarClient = pulsarClient;
         this.serviceRequestManager = getServiceRequestManager(
                 this.pulsarClient, this.workerConfig.getFunctionMetadataTopic());
         this.schedulerManager = schedulerManager;
+        this.errorNotifier = errorNotifier;
     }
 
     /**
@@ -88,17 +91,12 @@ public class FunctionMetaDataManager implements AutoCloseable {
     public void initialize() {
         log.info("/** Initializing Function Metadata Manager **/");
         try {
-            Reader<byte[]> reader = pulsarClient.newReader()
-                    .topic(this.workerConfig.getFunctionMetadataTopic())
-                    .startMessageId(MessageId.earliest)
-                    .readerName(workerConfig.getWorkerId() + "-function-metadata-manager")
-                    .create();
-
-            this.functionMetaDataTopicTailer = new FunctionMetaDataTopicTailer(this, reader);
+            this.functionMetaDataTopicTailer = new FunctionMetaDataTopicTailer(this,
+                    pulsarClient.newReader(), this.workerConfig, this.errorNotifier);
             // read all existing messages
             this.setInitializePhase(true);
-            while (reader.hasMessageAvailable()) {
-                this.functionMetaDataTopicTailer.processRequest(reader.readNext());
+            while (this.functionMetaDataTopicTailer.getReader().hasMessageAvailable()) {
+                this.functionMetaDataTopicTailer.processRequest(this.functionMetaDataTopicTailer.getReader().readNext());
             }
             this.setInitializePhase(false);
             // schedule functions if necessary
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
index b7108e9..55a67a6 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
@@ -19,78 +19,92 @@
 package org.apache.pulsar.functions.worker;
 
 import java.io.IOException;
-import java.util.function.Function;
 
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ReaderBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.functions.proto.Request.ServiceRequest;
 
 @Slf4j
 public class FunctionMetaDataTopicTailer
-        implements java.util.function.Consumer<Message<byte[]>>, Function<Throwable, Void>, AutoCloseable {
+        implements Runnable, AutoCloseable {
 
     private final FunctionMetaDataManager functionMetaDataManager;
+    @Getter
     private final Reader<byte[]> reader;
+    private final Thread readerThread;
+    private volatile boolean running;
+    private ErrorNotifier errorNotifier;
 
     public FunctionMetaDataTopicTailer(FunctionMetaDataManager functionMetaDataManager,
-                                       Reader<byte[]> reader)
+                                       ReaderBuilder readerBuilder, WorkerConfig workerConfig,
+                                       ErrorNotifier errorNotifier)
             throws PulsarClientException {
         this.functionMetaDataManager = functionMetaDataManager;
-        this.reader = reader;
+        this.reader = readerBuilder
+                .topic(workerConfig.getFunctionMetadataTopic())
+                .startMessageId(MessageId.earliest)
+                .readerName(workerConfig.getWorkerId() + "-function-metadata-manager")
+                .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-metadata-manager")
+                .create();
+        readerThread = new Thread(this);
+        readerThread.setName("function-metadata-tailer-thread");
+        this.errorNotifier = errorNotifier;
     }
 
     public void start() {
-        receiveOne();
+        running = true;
+        readerThread.start();
     }
 
-    private void receiveOne() {
-        reader.readNextAsync()
-                .thenAccept(this)
-                .exceptionally(this);
+    @Override
+    public void run() {
+        while(running) {
+            try {
+                Message<byte[]> msg = reader.readNext();
+                processRequest(msg);
+            } catch (Throwable th) {
+                if (running) {
+                    log.error("Encountered error in metadata tailer", th);
+                    // trigger fatal error
+                    running = false;
+                    errorNotifier.triggerError(th);
+                } else {
+                    if (!(th instanceof InterruptedException || th.getCause() instanceof InterruptedException)) {
+                        log.warn("Encountered error when metadata tailer is not running", th);
+                    }
+                    return;
+                }
+            }
+        }
     }
 
     @Override
     public void close() {
         log.info("Stopping function metadata tailer");
         try {
-            reader.close();
+            running = false;
+            if (readerThread != null && readerThread.isAlive()) {
+                readerThread.interrupt();
+            }
+            if (reader != null) {
+                reader.close();
+            }
         } catch (IOException e) {
             log.error("Failed to stop function metadata tailer", e);
         }
-        log.info("Stopped function function metadata tailer");
+        log.info("Stopped function metadata tailer");
     }
 
-    public void processRequest(Message<byte[]> msg) {
-        ServiceRequest serviceRequest;
-
-        try {
-            serviceRequest = ServiceRequest.parseFrom(msg.getData());
-        } catch (IOException e) {
-            log.error("Received bad service request at message {}", msg.getMessageId(), e);
-            // TODO: find a better way to handle bad request
-            throw new RuntimeException(e);
-        }
+    public void processRequest(Message<byte[]> msg) throws IOException {
+        ServiceRequest serviceRequest = ServiceRequest.parseFrom(msg.getData());
         if (log.isDebugEnabled()) {
             log.debug("Received Service Request: {}", serviceRequest);
         }
-
         this.functionMetaDataManager.processRequest(msg.getMessageId(), serviceRequest);
     }
-
-    @Override
-    public void accept(Message<byte[]> msg) {
-
-        processRequest(msg);
-        // receive next request
-        receiveOne();
-    }
-
-    @Override
-    public Void apply(Throwable cause) {
-        log.error("Failed to retrieve messages from function state topic", cause);
-        // TODO: find a better way to handle consumer functions
-        throw new RuntimeException(cause);
-    }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index ec2c44b..e3ea170 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -169,7 +169,7 @@ public class WorkerService {
 
             //create function meta data manager
             this.functionMetaDataManager = new FunctionMetaDataManager(
-                    this.workerConfig, this.schedulerManager, this.client);
+                    this.workerConfig, this.schedulerManager, this.client, errorNotifier);
 
             this.connectorsManager = new ConnectorsManager(workerConfig);
             this.functionsManager = new FunctionsManager(workerConfig);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
index da6852c..20f30a9 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
@@ -63,7 +63,7 @@ public class FunctionMetaDataManagerTest {
         FunctionMetaDataManager functionMetaDataManager = spy(
                 new FunctionMetaDataManager(new WorkerConfig(),
                         mock(SchedulerManager.class),
-                        mockPulsarClient()));
+                        mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
 
         Map<String, Function.FunctionMetaData> functionMetaDataMap1 = new HashMap<>();
         Function.FunctionMetaData f1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
@@ -104,7 +104,7 @@ public class FunctionMetaDataManagerTest {
         FunctionMetaDataManager functionMetaDataManager = spy(
                 new FunctionMetaDataManager(workerConfig,
                         mock(SchedulerManager.class),
-                        mockPulsarClient()));
+                        mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
         Function.FunctionMetaData m1 = Function.FunctionMetaData.newBuilder()
                 .setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")).build();
 
@@ -135,7 +135,7 @@ public class FunctionMetaDataManagerTest {
         functionMetaDataManager = spy(
                 new FunctionMetaDataManager(workerConfig,
                         mock(SchedulerManager.class),
-                        mockPulsarClient()));
+                        mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
         Map<String, Function.FunctionMetaData> functionMetaDataMap = new HashMap<>();
         Function.FunctionMetaData m2 = Function.FunctionMetaData.newBuilder()
                 .setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
@@ -177,7 +177,7 @@ public class FunctionMetaDataManagerTest {
         FunctionMetaDataManager functionMetaDataManager = spy(
                 new FunctionMetaDataManager(workerConfig,
                         mock(SchedulerManager.class),
-                        mockPulsarClient()));
+                        mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
 
         Map<String, Function.FunctionMetaData> functionMetaDataMap1 = new HashMap<>();
         Function.FunctionMetaData f1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
@@ -232,7 +232,7 @@ public class FunctionMetaDataManagerTest {
         FunctionMetaDataManager functionMetaDataManager = spy(
                 new FunctionMetaDataManager(workerConfig,
                         mock(SchedulerManager.class),
-                        mockPulsarClient()));
+                        mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
         Function.FunctionMetaData m1 = Function.FunctionMetaData.newBuilder()
                 .setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
                         .setNamespace("namespace-1").setTenant("tenant-1")).setVersion(version).build();
@@ -271,7 +271,7 @@ public class FunctionMetaDataManagerTest {
         FunctionMetaDataManager functionMetaDataManager = spy(
                 new FunctionMetaDataManager(workerConfig,
                         mock(SchedulerManager.class),
-                        mockPulsarClient()));
+                        mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
 
         Mockito.doNothing().when(functionMetaDataManager).processUpdate(any(Request.ServiceRequest.class));
         Mockito.doNothing().when(functionMetaDataManager).proccessDeregister(any(Request.ServiceRequest.class));
@@ -309,7 +309,7 @@ public class FunctionMetaDataManagerTest {
         FunctionMetaDataManager functionMetaDataManager = spy(
                 new FunctionMetaDataManager(workerConfig,
                         schedulerManager,
-                        mockPulsarClient()));
+                        mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
 
         // worker has no record of function
         Function.FunctionMetaData m1 = Function.FunctionMetaData.newBuilder()
@@ -337,7 +337,7 @@ public class FunctionMetaDataManagerTest {
         functionMetaDataManager = spy(
                 new FunctionMetaDataManager(workerConfig,
                         schedulerManager,
-                        mockPulsarClient()));
+                        mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
 
         Function.FunctionMetaData m3 = Function.FunctionMetaData.newBuilder()
                 .setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
@@ -386,7 +386,7 @@ public class FunctionMetaDataManagerTest {
         functionMetaDataManager = spy(
                 new FunctionMetaDataManager(workerConfig,
                         schedulerManager,
-                        mockPulsarClient()));
+                        mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
 
         Function.FunctionMetaData m4 = Function.FunctionMetaData.newBuilder()
                 .setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
@@ -423,7 +423,7 @@ public class FunctionMetaDataManagerTest {
         FunctionMetaDataManager functionMetaDataManager = spy(
                 new FunctionMetaDataManager(workerConfig,
                         schedulerManager,
-                        mockPulsarClient()));
+                        mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
         // worker has no record of function
         Function.FunctionMetaData test = Function.FunctionMetaData.newBuilder()
                 .setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-2")
@@ -450,7 +450,7 @@ public class FunctionMetaDataManagerTest {
         functionMetaDataManager = spy(
                 new FunctionMetaDataManager(workerConfig,
                         schedulerManager,
-                        mockPulsarClient()));
+                        mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
         functionMetaDataManager.setFunctionMetaData(test);
         Function.FunctionMetaData m2 = Function.FunctionMetaData.newBuilder()
                 .setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
@@ -477,7 +477,7 @@ public class FunctionMetaDataManagerTest {
         functionMetaDataManager = spy(
                 new FunctionMetaDataManager(workerConfig,
                         schedulerManager,
-                        mockPulsarClient()));
+                        mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
         functionMetaDataManager.setFunctionMetaData(test);
 
         Function.FunctionMetaData m3 = Function.FunctionMetaData.newBuilder()
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailerTest.java
index bad0fff..b84af4b 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailerTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.functions.worker;
 
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -25,14 +26,18 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.function.Function;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.functions.proto.Request.ServiceRequest;
 import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
 import org.apache.pulsar.functions.worker.request.ServiceRequestUtils;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
 
@@ -45,13 +50,20 @@ public class FunctionMetaDataTopicTailerTest {
     private static final String TEST_NAME = "test-fmt";
 
     private final Reader reader;
+    private final ReaderBuilder readerBuilder;
     private final FunctionMetaDataManager fsm;
     private final FunctionMetaDataTopicTailer fsc;
 
     public FunctionMetaDataTopicTailerTest() throws Exception {
         this.reader = mock(Reader.class);
+        this.readerBuilder = mock(ReaderBuilder.class);
+        when(readerBuilder.topic(anyString())).thenReturn(readerBuilder);
+        when(readerBuilder.startMessageId(any(MessageId.class))).thenReturn(readerBuilder);
+        when(readerBuilder.readerName(anyString())).thenReturn(readerBuilder);
+        when(readerBuilder.subscriptionRolePrefix(anyString())).thenReturn(readerBuilder);
+        when(readerBuilder.create()).thenReturn(reader);
         this.fsm = mock(FunctionMetaDataManager.class);
-        this.fsc = new FunctionMetaDataTopicTailer(fsm, reader);
+        this.fsc = new FunctionMetaDataTopicTailer(fsm, readerBuilder, new WorkerConfig(), ErrorNotifier.getDefaultImpl() );
     }
 
     @AfterMethod
@@ -67,18 +79,25 @@ public class FunctionMetaDataTopicTailerTest {
 
         Message msg = mock(Message.class);
         when(msg.getData()).thenReturn(request.toByteArray());
-
-        CompletableFuture<Message> receiveFuture = CompletableFuture.completedFuture(msg);
-        when(reader.readNextAsync())
-            .thenReturn(receiveFuture)
-            .thenReturn(new CompletableFuture<>());
+        CountDownLatch readLatch = new CountDownLatch(1);
+        CountDownLatch processLatch = new CountDownLatch(1);
+        when(reader.readNext()).thenReturn(msg).then(new Answer<Message>() {
+            public Message answer(InvocationOnMock invocation) {
+                try {
+                    readLatch.countDown();
+                    processLatch.await();
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+                return null;
+            }
+        });
 
         fsc.start();
 
-        // wait for receive future to complete
-        receiveFuture.thenApply(Function.identity()).get();
+        readLatch.await();
 
-        verify(reader, times(2)).readNextAsync();
+        verify(reader, times(2)).readNext();
         verify(fsm, times(1)).processRequest(any(), any(ServiceRequest.class));
     }
 }