You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2020/06/06 00:51:16 UTC
[pulsar] branch master updated: FunctionAssignmentTailer should use
its own thread (#7180)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new eaf828d FunctionAssignmentTailer should use its own thread (#7180)
eaf828d is described below
commit eaf828d2243ce09f2e04a0eea2802214b593dabc
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Fri Jun 5 17:50:59 2020 -0700
FunctionAssignmentTailer should use its own thread (#7180)
* FunctionAssignmentTailer should use its own thread
Co-authored-by: Jerry Peng <je...@splunk.com>
---
.../functions/worker/FunctionAssignmentTailer.java | 114 ++++++++++-----------
.../functions/worker/FunctionRuntimeManager.java | 12 +--
.../worker/FunctionRuntimeManagerTest.java | 1 +
3 files changed, 60 insertions(+), 67 deletions(-)
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
index bac3128..98d5745 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
@@ -18,56 +18,85 @@
*/
package org.apache.pulsar.functions.worker;
-import java.io.IOException;
-import java.util.function.Function;
-
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.functions.proto.Function.Assignment;
-import lombok.extern.slf4j.Slf4j;
+import java.io.IOException;
@Slf4j
-public class FunctionAssignmentTailer
- implements java.util.function.Consumer<Message<byte[]>>, Function<Throwable, Void>, AutoCloseable {
+public class FunctionAssignmentTailer implements AutoCloseable {
private final FunctionRuntimeManager functionRuntimeManager;
+ @Getter
private final Reader<byte[]> reader;
- private boolean closed = false;
+ private volatile boolean isRunning = false;
- public FunctionAssignmentTailer(FunctionRuntimeManager functionRuntimeManager, Reader<byte[]> reader) {
+ private final Thread tailerThread;
+
+ public FunctionAssignmentTailer(FunctionRuntimeManager functionRuntimeManager, ReaderBuilder readerBuilder, WorkerConfig workerConfig) throws PulsarClientException {
this.functionRuntimeManager = functionRuntimeManager;
- this.reader = reader;
- }
+
+ this.reader = readerBuilder
+ .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-runtime-manager")
+ .readerName(workerConfig.getWorkerId() + "-function-runtime-manager")
+ .topic(workerConfig.getFunctionAssignmentTopic())
+ .readCompacted(true)
+ .startMessageId(MessageId.earliest)
+ .create();
- public void start() {
- receiveOne();
+ this.tailerThread = new Thread(() -> {
+ while(isRunning) {
+ try {
+ Message<byte[]> msg = reader.readNext();
+ processAssignment(msg);
+ } catch (Exception e) {
+ if (isRunning) {
+ log.error("Encountered error in assignment tailer", e);
+
+ // trigger fatal error
+ // TODO add mechanism to notify main thread
+ } else {
+ if (!(e instanceof InterruptedException)) {
+ log.warn("Encountered error when assignment tailer is not running", e);
+ }
+ }
+
+ }
+ }
+ });
+ this.tailerThread.setName("assignment-tailer-thread");
}
- private void receiveOne() {
- reader.readNextAsync()
- .thenAccept(this)
- .exceptionally(this);
+ public void start() {
+ isRunning = true;
+ tailerThread.start();
}
@Override
public void close() {
- if (closed) {
- return;
- }
- log.info("Stopping function state consumer");
+ log.info("Stopping function assignment tailer");
try {
- closed = true;
- reader.close();
+ isRunning = false;
+ if (tailerThread != null && tailerThread.isAlive()) {
+ tailerThread.interrupt();
+ }
+ if (reader != null) {
+ reader.close();
+ }
} catch (IOException e) {
- log.error("Failed to stop function state consumer", e);
+ log.error("Failed to stop function assignment tailer", e);
}
- log.info("Stopped function state consumer");
+ log.info("Stopped function assignment tailer");
}
public void processAssignment(Message<byte[]> msg) {
+
if(msg.getData()==null || (msg.getData().length==0)) {
log.info("Received assignment delete: {}", msg.getKey());
this.functionRuntimeManager.deleteAssignment(msg.getKey());
@@ -76,42 +105,11 @@ public class FunctionAssignmentTailer
try {
assignment = Assignment.parseFrom(msg.getData());
} catch (IOException e) {
- log.error("[{}] Received bad assignment update at message {}", reader.getTopic(), msg.getMessageId(),
- e);
- // TODO: find a better way to handle bad request
+ log.error("[{}] Received bad assignment update at message {}", reader.getTopic(), msg.getMessageId(), e);
throw new RuntimeException(e);
}
log.info("Received assignment update: {}", assignment);
this.functionRuntimeManager.processAssignment(assignment);
}
}
-
- @Override
- public void accept(Message<byte[]> msg) {
- processAssignment(msg);
- // receive next request
- receiveOne();
- }
-
- @Override
- public Void apply(Throwable cause) {
- Throwable realCause = FutureUtil.unwrapCompletionException(cause);
- if (realCause instanceof AlreadyClosedException) {
- // if reader is closed because tailer is closed, ignore the exception
- if (closed) {
- // ignore
- return null;
- } else {
- log.error("Reader of assignment update topic is closed unexpectedly", cause);
- throw new RuntimeException(
- "Reader of assignment update topic is closed unexpectedly",
- cause
- );
- }
- } else {
- log.error("Failed to retrieve messages from assignment update topic", cause);
- // TODO: find a better way to handle consumer functions
- throw new RuntimeException(cause);
- }
- }
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 7d17ef3..0b8e509 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -210,17 +210,12 @@ public class FunctionRuntimeManager implements AutoCloseable{
public void initialize() {
log.info("/** Initializing Runtime Manager **/");
try {
- Reader<byte[]> reader = this.getWorkerService().getClient().newReader()
- .readerName(workerConfig.getWorkerId() + "-function-runtime-manager")
- .topic(this.getWorkerConfig().getFunctionAssignmentTopic()).readCompacted(true)
- .startMessageId(MessageId.earliest).create();
-
- this.functionAssignmentTailer = new FunctionAssignmentTailer(this, reader);
+ this.functionAssignmentTailer = new FunctionAssignmentTailer(this, this.getWorkerService().getClient().newReader(), workerConfig);
// start init phase
this.isInitializePhase = true;
// read all existing messages
- while (reader.hasMessageAvailable()) {
- this.functionAssignmentTailer.processAssignment(reader.readNext());
+ while (this.functionAssignmentTailer.getReader().hasMessageAvailable()) {
+ this.functionAssignmentTailer.processAssignment(this.functionAssignmentTailer.getReader().readNext());
}
// init phase is done
this.isInitializePhase = false;
@@ -244,7 +239,6 @@ public class FunctionRuntimeManager implements AutoCloseable{
*/
public void start() {
log.info("/** Starting Function Runtime Manager **/");
- log.info("Starting function assignment tailer...");
this.functionAssignmentTailer.start();
}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index 4230304..d266ae0 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -563,6 +563,7 @@ public class FunctionRuntimeManagerTest {
doReturn(readerBuilder).when(pulsarClient).newReader();
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
doReturn(readerBuilder).when(readerBuilder).readerName(anyString());
+ doReturn(readerBuilder).when(readerBuilder).subscriptionRolePrefix(anyString());
doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean());