You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2020/06/06 00:51:16 UTC

[pulsar] branch master updated: FunctionAssignmentTailer should use its own thread (#7180)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new eaf828d  FunctionAssignmentTailer should use its own thread (#7180)
eaf828d is described below

commit eaf828d2243ce09f2e04a0eea2802214b593dabc
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Fri Jun 5 17:50:59 2020 -0700

    FunctionAssignmentTailer should use its own thread (#7180)
    
    * FunctionAssignmentTailer should use its own thread
    
    Co-authored-by: Jerry Peng <je...@splunk.com>
---
 .../functions/worker/FunctionAssignmentTailer.java | 114 ++++++++++-----------
 .../functions/worker/FunctionRuntimeManager.java   |  12 +--
 .../worker/FunctionRuntimeManagerTest.java         |   1 +
 3 files changed, 60 insertions(+), 67 deletions(-)

diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
index bac3128..98d5745 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
@@ -18,56 +18,85 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import java.io.IOException;
-import java.util.function.Function;
-
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.functions.proto.Function.Assignment;
 
-import lombok.extern.slf4j.Slf4j;
+import java.io.IOException;
 
 @Slf4j
-public class FunctionAssignmentTailer
-    implements java.util.function.Consumer<Message<byte[]>>, Function<Throwable, Void>, AutoCloseable {
+public class FunctionAssignmentTailer implements AutoCloseable {
 
     private final FunctionRuntimeManager functionRuntimeManager;
+    @Getter
     private final Reader<byte[]> reader;
-    private boolean closed = false;
+    private volatile boolean isRunning = false;
 
-    public FunctionAssignmentTailer(FunctionRuntimeManager functionRuntimeManager, Reader<byte[]> reader) {
+    private final Thread tailerThread;
+    
+    public FunctionAssignmentTailer(FunctionRuntimeManager functionRuntimeManager, ReaderBuilder readerBuilder, WorkerConfig workerConfig) throws PulsarClientException {
         this.functionRuntimeManager = functionRuntimeManager;
-        this.reader = reader;
-    }
+        
+        this.reader = readerBuilder
+          .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-runtime-manager")
+          .readerName(workerConfig.getWorkerId() + "-function-runtime-manager")
+          .topic(workerConfig.getFunctionAssignmentTopic())
+          .readCompacted(true)
+          .startMessageId(MessageId.earliest)
+          .create();
 
-    public void start() {
-        receiveOne();
+        this.tailerThread = new Thread(() -> {
+            while(isRunning) {
+                try {
+                    Message<byte[]> msg = reader.readNext();
+                    processAssignment(msg);
+                } catch (Exception e) {
+                    if (isRunning) {
+                        log.error("Encountered error in assignment tailer", e);
+
+                        // trigger fatal error
+                        // TODO add mechanism to notify main thread
+                    } else {
+                        if (!(e instanceof InterruptedException)) {
+                            log.warn("Encountered error when assignment tailer is not running", e);
+                        }
+                    }
+
+                }
+            }
+        });
+        this.tailerThread.setName("assignment-tailer-thread");
     }
 
-    private void receiveOne() {
-        reader.readNextAsync()
-                .thenAccept(this)
-                .exceptionally(this);
+    public void start() {
+        isRunning = true;
+        tailerThread.start();
     }
 
     @Override
     public void close() {
-        if (closed) {
-            return;
-        }
-        log.info("Stopping function state consumer");
+        log.info("Stopping function assignment tailer");
         try {
-            closed = true;
-            reader.close();
+            isRunning = false;
+            if (tailerThread != null && tailerThread.isAlive()) {
+                tailerThread.interrupt();
+            }
+            if (reader != null) {
+                reader.close();
+            }
         } catch (IOException e) {
-            log.error("Failed to stop function state consumer", e);
+            log.error("Failed to stop function assignment tailer", e);
         }
-        log.info("Stopped function state consumer");
+        log.info("Stopped function assignment tailer");
     }
 
     public void processAssignment(Message<byte[]> msg) {
+
         if(msg.getData()==null || (msg.getData().length==0)) {
             log.info("Received assignment delete: {}", msg.getKey());
             this.functionRuntimeManager.deleteAssignment(msg.getKey());
@@ -76,42 +105,11 @@ public class FunctionAssignmentTailer
             try {
                 assignment = Assignment.parseFrom(msg.getData());
             } catch (IOException e) {
-                log.error("[{}] Received bad assignment update at message {}", reader.getTopic(), msg.getMessageId(),
-                        e);
-                // TODO: find a better way to handle bad request
+                log.error("[{}] Received bad assignment update at message {}", reader.getTopic(), msg.getMessageId(), e);
                 throw new RuntimeException(e);
             }
             log.info("Received assignment update: {}", assignment);
             this.functionRuntimeManager.processAssignment(assignment);
         }
     }
-
-    @Override
-    public void accept(Message<byte[]> msg) {
-        processAssignment(msg);
-        // receive next request
-        receiveOne();
-    }
-
-    @Override
-    public Void apply(Throwable cause) {
-        Throwable realCause = FutureUtil.unwrapCompletionException(cause);
-        if (realCause instanceof AlreadyClosedException) {
-            // if reader is closed because tailer is closed, ignore the exception
-            if (closed) {
-                // ignore
-                return null;
-            } else {
-                log.error("Reader of assignment update topic is closed unexpectedly", cause);
-                throw new RuntimeException(
-                    "Reader of assignment update topic is closed unexpectedly",
-                    cause
-                );
-            }
-        } else {
-            log.error("Failed to retrieve messages from assignment update topic", cause);
-            // TODO: find a better way to handle consumer functions
-            throw new RuntimeException(cause);
-        }
-    }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 7d17ef3..0b8e509 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -210,17 +210,12 @@ public class FunctionRuntimeManager implements AutoCloseable{
     public void initialize() {
         log.info("/** Initializing Runtime Manager **/");
         try {
-            Reader<byte[]> reader = this.getWorkerService().getClient().newReader()
-                    .readerName(workerConfig.getWorkerId() + "-function-runtime-manager")
-                    .topic(this.getWorkerConfig().getFunctionAssignmentTopic()).readCompacted(true)
-                    .startMessageId(MessageId.earliest).create();
-
-            this.functionAssignmentTailer = new FunctionAssignmentTailer(this, reader);
+            this.functionAssignmentTailer = new FunctionAssignmentTailer(this, this.getWorkerService().getClient().newReader(), workerConfig);
             // start init phase
             this.isInitializePhase = true;
             // read all existing messages
-            while (reader.hasMessageAvailable()) {
-                this.functionAssignmentTailer.processAssignment(reader.readNext());
+            while (this.functionAssignmentTailer.getReader().hasMessageAvailable()) {
+                this.functionAssignmentTailer.processAssignment(this.functionAssignmentTailer.getReader().readNext());
             }
             // init phase is done
             this.isInitializePhase = false;
@@ -244,7 +239,6 @@ public class FunctionRuntimeManager implements AutoCloseable{
      */
     public void start() {
         log.info("/** Starting Function Runtime Manager **/");
-        log.info("Starting function assignment tailer...");
         this.functionAssignmentTailer.start();
     }
 
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index 4230304..d266ae0 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -563,6 +563,7 @@ public class FunctionRuntimeManagerTest {
         doReturn(readerBuilder).when(pulsarClient).newReader();
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
         doReturn(readerBuilder).when(readerBuilder).readerName(anyString());
+        doReturn(readerBuilder).when(readerBuilder).subscriptionRolePrefix(anyString());
         doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
         doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
         doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean());