You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2020/06/08 05:13:19 UTC

[pulsar] branch master updated: Add ability for function services to notify errors (#7187)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4beebb3  Add ability for function services to notify errors (#7187)
4beebb3 is described below

commit 4beebb31a05afb0e310e7deed7ea1aac17b9521b
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Sun Jun 7 22:13:05 2020 -0700

    Add ability for function services to notify errors (#7187)
    
    * Add ability for function services to notify errors
---
 .../org/apache/pulsar/broker/PulsarService.java    |   4 +-
 .../worker/PulsarWorkerAssignmentTest.java         |   2 +-
 .../pulsar/functions/worker/ErrorNotifier.java     |  56 +++++++
 .../functions/worker/FunctionAssignmentTailer.java |  20 ++-
 .../worker/FunctionMetaDataTopicTailer.java        |   6 +-
 .../functions/worker/FunctionRuntimeManager.java   |  12 +-
 .../functions/worker/FunctionWorkerStarter.java    |   3 +-
 .../org/apache/pulsar/functions/worker/Worker.java |  19 ++-
 .../pulsar/functions/worker/WorkerService.java     |  12 +-
 .../worker/FunctionRuntimeManagerTest.java         | 172 +++++++++++++++++++--
 .../functions/worker/MembershipManagerTest.java    |  13 +-
 11 files changed, 276 insertions(+), 43 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index b2d5b7a..4848b41 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -116,6 +116,7 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.compaction.Compactor;
 import org.apache.pulsar.compaction.TwoPhaseCompactor;
+import org.apache.pulsar.functions.worker.ErrorNotifier;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.WorkerUtils;
@@ -1265,7 +1266,8 @@ public class PulsarService implements AutoCloseable {
                 throw ioe;
             }
             LOG.info("Function worker service setup completed");
-            functionWorkerService.get().start(dlogURI, authenticationService, authorizationService);
+            // TODO figure out how to handle errors from function worker service
+            functionWorkerService.get().start(dlogURI, authenticationService, authorizationService, new ErrorNotifier());
             LOG.info("Function worker service started");
         }
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
index 70ab543..26fef7c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
@@ -287,7 +287,7 @@ public class PulsarWorkerAssignmentTest {
         final URI dlUri = functionsWorkerService.getDlogUri();
         functionsWorkerService.stop();
         functionsWorkerService = new WorkerService(workerConfig);
-        functionsWorkerService.start(dlUri, new AuthenticationService(PulsarConfigurationLoader.convertFrom(workerConfig)), null);
+        functionsWorkerService.start(dlUri, new AuthenticationService(PulsarConfigurationLoader.convertFrom(workerConfig)), null, new ErrorNotifier());
         final FunctionRuntimeManager runtimeManager2 = functionsWorkerService.getFunctionRuntimeManager();
         retryStrategically((test) -> {
             try {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ErrorNotifier.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ErrorNotifier.java
new file mode 100644
index 0000000..9a51943
--- /dev/null
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ErrorNotifier.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class ErrorNotifier implements Serializable, AutoCloseable {
+
+  private static final long serialVersionUID = 1L;
+
+  private final AtomicReference<Throwable> error = new AtomicReference<>();
+
+  private volatile boolean isRunning;
+
+  public ErrorNotifier() {
+    isRunning = true;
+  }
+
+  public synchronized void triggerError(Throwable th) {
+    error.set(th);
+    this.notify();
+  }
+
+  public synchronized void waitForError() throws Exception {
+    while (isRunning && error.get() == null) {
+      this.wait();
+    }
+
+    if (isRunning) {
+      throw new Exception(error.get());
+    }
+  }
+
+  @Override
+  public synchronized void close() {
+    isRunning = false;
+    this.notify();
+  }
+}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
index 98d5745..40cc581 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
@@ -39,7 +39,11 @@ public class FunctionAssignmentTailer implements AutoCloseable {
 
     private final Thread tailerThread;
     
-    public FunctionAssignmentTailer(FunctionRuntimeManager functionRuntimeManager, ReaderBuilder readerBuilder, WorkerConfig workerConfig) throws PulsarClientException {
+    public FunctionAssignmentTailer(
+            FunctionRuntimeManager functionRuntimeManager,
+            ReaderBuilder readerBuilder,
+            WorkerConfig workerConfig,
+            ErrorNotifier errorNotifier) throws PulsarClientException {
         this.functionRuntimeManager = functionRuntimeManager;
         
         this.reader = readerBuilder
@@ -49,21 +53,21 @@ public class FunctionAssignmentTailer implements AutoCloseable {
           .readCompacted(true)
           .startMessageId(MessageId.earliest)
           .create();
-
+        
         this.tailerThread = new Thread(() -> {
             while(isRunning) {
                 try {
                     Message<byte[]> msg = reader.readNext();
                     processAssignment(msg);
-                } catch (Exception e) {
+                } catch (Throwable th) {
                     if (isRunning) {
-                        log.error("Encountered error in assignment tailer", e);
-
+                        log.error("Encountered error in assignment tailer", th);
                         // trigger fatal error
-                        // TODO add mechanism to notify main thread
+                        isRunning = false;
+                        errorNotifier.triggerError(th);
                     } else {
-                        if (!(e instanceof InterruptedException)) {
-                            log.warn("Encountered error when assignment tailer is not running", e);
+                        if (!(th instanceof InterruptedException)) {
+                            log.warn("Encountered error when assignment tailer is not running", th);
                         }
                     }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
index 4bcaaf7..b7108e9 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
@@ -53,13 +53,13 @@ public class FunctionMetaDataTopicTailer
 
     @Override
     public void close() {
-        log.info("Stopping function state consumer");
+        log.info("Stopping function metadata tailer");
         try {
             reader.close();
         } catch (IOException e) {
-            log.error("Failed to stop function state consumer", e);
+            log.error("Failed to stop function metadata tailer", e);
         }
-        log.info("Stopped function state consumer");
+        log.info("Stopped function function metadata tailer");
     }
 
     public void processRequest(Message<byte[]> msg) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 0b8e509..bc1ca53 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -129,10 +129,11 @@ public class FunctionRuntimeManager implements AutoCloseable{
 
     private final FunctionMetaDataManager functionMetaDataManager;
 
-
+    private final ErrorNotifier errorNotifier;
+    
     public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerService, Namespace dlogNamespace,
                                   MembershipManager membershipManager, ConnectorsManager connectorsManager, FunctionsManager functionsManager,
-                                  FunctionMetaDataManager functionMetaDataManager) throws Exception {
+                                  FunctionMetaDataManager functionMetaDataManager, ErrorNotifier errorNotifier) throws Exception {
         this.workerConfig = workerConfig;
         this.workerService = workerService;
         this.functionAdmin = workerService.getFunctionAdmin();
@@ -200,6 +201,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
 
         this.membershipManager = membershipManager;
         this.functionMetaDataManager = functionMetaDataManager;
+        this.errorNotifier = errorNotifier;
     }
 
     /**
@@ -210,7 +212,11 @@ public class FunctionRuntimeManager implements AutoCloseable{
     public void initialize() {
         log.info("/** Initializing Runtime Manager **/");
         try {
-            this.functionAssignmentTailer = new FunctionAssignmentTailer(this, this.getWorkerService().getClient().newReader(), workerConfig);
+            this.functionAssignmentTailer = new FunctionAssignmentTailer(
+                    this,
+                    this.getWorkerService().getClient().newReader(),
+                    this.workerConfig,
+                    this.errorNotifier);
             // start init phase
             this.isInitializePhase = true;
             // read all existing messages
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionWorkerStarter.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionWorkerStarter.java
index ab5332b..90ee762 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionWorkerStarter.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionWorkerStarter.java
@@ -65,8 +65,7 @@ public class FunctionWorkerStarter {
         final Worker worker = new Worker(workerConfig);
         try {
             worker.start();
-        }catch(Exception e){
-            log.error("Failed to start function worker", e);
+        } catch (Throwable th) {
             worker.stop();
             System.exit(-1);
         }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
index 6245e59..4c53537 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
@@ -57,19 +57,28 @@ public class Worker {
             new DefaultThreadFactory("zk-cache-callback"));
     private GlobalZooKeeperCache globalZkCache;
     private ConfigurationCacheService configurationCacheService;
+    private final ErrorNotifier errorNotifier;
 
     public Worker(WorkerConfig workerConfig) {
         this.workerConfig = workerConfig;
         this.workerService = new WorkerService(workerConfig);
+        this.errorNotifier = new ErrorNotifier();
     }
 
     protected void start() throws Exception {
-        URI dlogUri = initialize(this.workerConfig);
+        URI dlogUri = initialize(workerConfig);
 
-        workerService.start(dlogUri, getAuthenticationService(), getAuthorizationService());
-        this.server = new WorkerServer(workerService);
-        this.server.start();
-        log.info("Start worker server on port {}...", this.workerConfig.getWorkerPort());
+        workerService.start(dlogUri, getAuthenticationService(), getAuthorizationService(), errorNotifier);
+        server = new WorkerServer(workerService);
+        server.start();
+        log.info("/** Started worker server on port={} **/", this.workerConfig.getWorkerPort());
+        
+        try {
+            errorNotifier.waitForError();
+        } catch (Throwable th) {
+            log.error("!-- Fatal error encountered. Worker will exit now. --!", th);
+            throw th;
+        }
     }
 
     private static URI initialize(WorkerConfig workerConfig)
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index edaade6..ec2c44b 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -85,7 +85,8 @@ public class WorkerService {
 
     public void start(URI dlogUri,
                       AuthenticationService authenticationService,
-                      AuthorizationService authorizationService) throws InterruptedException {
+                      AuthorizationService authorizationService,
+                      ErrorNotifier errorNotifier) throws InterruptedException {
         log.info("Starting worker {}...", workerConfig.getWorkerId());
 
         try {
@@ -182,7 +183,14 @@ public class WorkerService {
 
             // create function runtime manager
             this.functionRuntimeManager = new FunctionRuntimeManager(
-                this.workerConfig, this, this.dlogNamespace, this.membershipManager, connectorsManager, functionsManager, functionMetaDataManager);
+                    this.workerConfig,
+                    this,
+                    this.dlogNamespace,
+                    this.membershipManager,
+                    connectorsManager,
+                    functionsManager,
+                    functionMetaDataManager,
+                    errorNotifier);
 
             // Setting references to managers in scheduler
             this.schedulerManager.setFunctionMetaDataManager(this.functionMetaDataManager);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index d266ae0..429e728 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -47,7 +47,9 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.any;
@@ -55,6 +57,7 @@ import static org.mockito.Mockito.anyBoolean;
 import static org.mockito.Mockito.anyString;
 import static org.mockito.Mockito.argThat;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.spy;
@@ -102,7 +105,8 @@ public class FunctionRuntimeManagerTest {
                 mock(MembershipManager.class),
                 mock(ConnectorsManager.class),
                 mock(FunctionsManager.class),
-                mock(FunctionMetaDataManager.class)));
+                mock(FunctionMetaDataManager.class),
+                mock(ErrorNotifier.class)));
         FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
         doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
         doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class));
@@ -185,7 +189,8 @@ public class FunctionRuntimeManagerTest {
                 mock(MembershipManager.class),
                 mock(ConnectorsManager.class),
                 mock(FunctionsManager.class),
-                mock(FunctionMetaDataManager.class)));
+                mock(FunctionMetaDataManager.class),
+                mock(ErrorNotifier.class)));
         FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
         doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
         doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class));
@@ -271,7 +276,8 @@ public class FunctionRuntimeManagerTest {
                 mock(MembershipManager.class),
                 mock(ConnectorsManager.class),
                 mock(FunctionsManager.class),
-                mock(FunctionMetaDataManager.class));
+                mock(FunctionMetaDataManager.class),
+                mock(ErrorNotifier.class));
         FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
         doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
         doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class));
@@ -401,7 +407,8 @@ public class FunctionRuntimeManagerTest {
                 mock(MembershipManager.class),
                 mock(ConnectorsManager.class),
                 mock(FunctionsManager.class),
-                mock(FunctionMetaDataManager.class));
+                mock(FunctionMetaDataManager.class),
+                mock(ErrorNotifier.class));
         FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
         doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
         doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class));
@@ -476,6 +483,131 @@ public class FunctionRuntimeManagerTest {
         assertEquals(functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0"), functionRuntimeInfo);
     }
 
+    @Test(timeOut = 10000)
+    public void testErrorNotifier() throws Exception {
+        WorkerConfig workerConfig = new WorkerConfig();
+        workerConfig.setWorkerId("worker-1");
+        workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+        workerConfig.setFunctionRuntimeFactoryConfigs(
+                ObjectMapperFactory.getThreadLocal().convertValue(
+                        new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
+        workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+        workerConfig.setStateStorageServiceUrl("foo");
+        workerConfig.setFunctionAssignmentTopicName("assignments");
+
+        Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
+                Function.FunctionDetails.newBuilder()
+                        .setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build();
+
+        Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
+                Function.FunctionDetails.newBuilder()
+                        .setTenant("test-tenant").setNamespace("test-namespace").setName("func-2")).build();
+
+        Function.Assignment assignment1 = Function.Assignment.newBuilder()
+                .setWorkerId("worker-1")
+                .setInstance(Function.Instance.newBuilder()
+                        .setFunctionMetaData(function1).setInstanceId(0).build())
+                .build();
+        Function.Assignment assignment2 = Function.Assignment.newBuilder()
+                .setWorkerId("worker-1")
+                .setInstance(Function.Instance.newBuilder()
+                        .setFunctionMetaData(function2).setInstanceId(0).build())
+                .build();
+
+        ArrayBlockingQueue<Message<byte[]>> messageList = new ArrayBlockingQueue<>(2);
+        PulsarApi.MessageMetadata.Builder msgMetadataBuilder = PulsarApi.MessageMetadata.newBuilder();
+        Message message1 = spy(new MessageImpl("foo", MessageId.latest.toString(),
+                new HashMap<>(), Unpooled.copiedBuffer(assignment1.toByteArray()), null, msgMetadataBuilder));
+        doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance())).when(message1).getKey();
+
+        Message message2 = spy(new MessageImpl("foo", MessageId.latest.toString(),
+                new HashMap<>(), Unpooled.copiedBuffer(assignment2.toByteArray()), null, msgMetadataBuilder));
+        doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment2.getInstance())).when(message2).getKey();
+
+        PulsarClient pulsarClient = mock(PulsarClient.class);
+
+        Reader<byte[]> reader = mock(Reader.class);
+
+
+        when(reader.readNext()).thenAnswer(new Answer<Message<byte[]>>() {
+            @Override
+            public Message<byte[]> answer(InvocationOnMock invocationOnMock) throws Throwable {
+                return messageList.poll(10, TimeUnit.SECONDS);
+            }
+        });
+
+        when(reader.readNextAsync()).thenAnswer(new Answer<CompletableFuture<Message<byte[]>>>() {
+            @Override
+            public CompletableFuture<Message<byte[]>> answer(InvocationOnMock invocationOnMock) throws Throwable {
+                return new CompletableFuture<>();
+            }
+        });
+
+        when(reader.hasMessageAvailable()).thenAnswer(new Answer<Boolean>() {
+            @Override
+            public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable {
+                return !messageList.isEmpty();
+            }
+        });
+
+        ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
+        doReturn(readerBuilder).when(pulsarClient).newReader();
+        doReturn(readerBuilder).when(readerBuilder).topic(anyString());
+        doReturn(readerBuilder).when(readerBuilder).readerName(anyString());
+        doReturn(readerBuilder).when(readerBuilder).subscriptionRolePrefix(anyString());
+        doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
+        doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
+        doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean());
+
+        doReturn(reader).when(readerBuilder).create();
+        WorkerService workerService = mock(WorkerService.class);
+        doReturn(pulsarClient).when(workerService).getClient();
+        doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
+
+        ErrorNotifier errorNotifier = spy(new ErrorNotifier());
+
+        // test new assignment add functions
+        FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager(
+                workerConfig,
+                workerService,
+                mock(Namespace.class),
+                mock(MembershipManager.class),
+                mock(ConnectorsManager.class),
+                mock(FunctionsManager.class),
+                mock(FunctionMetaDataManager.class),
+                errorNotifier));
+        FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
+        doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
+        doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class));
+        doNothing().when(functionActioner).terminateFunction(any(FunctionRuntimeInfo.class));
+        functionRuntimeManager.setFunctionActioner(functionActioner);
+
+        functionRuntimeManager.initialize();
+
+        // verify no errors occured
+        verify(errorNotifier, times(0)).triggerError(any());
+
+        messageList.add(message1);
+
+        functionRuntimeManager.start();
+
+        verify(errorNotifier, times(0)).triggerError(any());
+
+        // trigger an error to be thrown
+        doThrow(new RuntimeException("test")).when(functionRuntimeManager).processAssignment(any());
+
+        messageList.add(message2);
+
+        try {
+            errorNotifier.waitForError();
+        } catch (Exception e) {
+            assertEquals(e.getCause().getMessage(), "test");
+        }
+        verify(errorNotifier, times(1)).triggerError(any());
+
+        functionRuntimeManager.close();
+    }
+
     @Test
     public void testRuntimeManagerInitialize() throws Exception {
         WorkerConfig workerConfig = new WorkerConfig();
@@ -573,6 +705,8 @@ public class FunctionRuntimeManagerTest {
         doReturn(pulsarClient).when(workerService).getClient();
         doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
 
+        ErrorNotifier errorNotifier = mock(ErrorNotifier.class);
+
         // test new assignment add functions
         FunctionRuntimeManager functionRuntimeManager = new FunctionRuntimeManager(
                 workerConfig,
@@ -581,7 +715,8 @@ public class FunctionRuntimeManagerTest {
                 mock(MembershipManager.class),
                 mock(ConnectorsManager.class),
                 mock(FunctionsManager.class),
-                mock(FunctionMetaDataManager.class));
+                mock(FunctionMetaDataManager.class),
+                errorNotifier);
         FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
         doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
         doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class));
@@ -604,6 +739,9 @@ public class FunctionRuntimeManagerTest {
                 new FunctionRuntimeInfo().setFunctionInstance(
                         Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0)
                                 .build()));
+
+        // verify no errors occured
+        verify(errorNotifier, times(0)).triggerError(any());
     }
 
     @Test
@@ -649,7 +787,8 @@ public class FunctionRuntimeManagerTest {
                 mock(MembershipManager.class),
                 mock(ConnectorsManager.class),
                 mock(FunctionsManager.class),
-                mock(FunctionMetaDataManager.class));
+                mock(FunctionMetaDataManager.class),
+                mock(ErrorNotifier.class));
         functionRuntimeManager.setFunctionActioner(functionActioner);
 
         Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder()
@@ -753,7 +892,8 @@ public class FunctionRuntimeManagerTest {
                     mock(MembershipManager.class),
                     mock(ConnectorsManager.class),
                     mock(FunctionsManager.class),
-                    mock(FunctionMetaDataManager.class));
+                    mock(FunctionMetaDataManager.class),
+                    mock(ErrorNotifier.class));
 
             fail();
         } catch (Exception e) {
@@ -777,7 +917,8 @@ public class FunctionRuntimeManagerTest {
                     mock(MembershipManager.class),
                     mock(ConnectorsManager.class),
                     mock(FunctionsManager.class),
-                    mock(FunctionMetaDataManager.class));
+                    mock(FunctionMetaDataManager.class),
+                    mock(ErrorNotifier.class));
 
             fail();
         } catch (Exception e) {
@@ -801,7 +942,8 @@ public class FunctionRuntimeManagerTest {
                     mock(MembershipManager.class),
                     mock(ConnectorsManager.class),
                     mock(FunctionsManager.class),
-                    mock(FunctionMetaDataManager.class));
+                    mock(FunctionMetaDataManager.class),
+                    mock(ErrorNotifier.class));
 
             fail();
         } catch (Exception e) {
@@ -825,7 +967,8 @@ public class FunctionRuntimeManagerTest {
                     mock(MembershipManager.class),
                     mock(ConnectorsManager.class),
                     mock(FunctionsManager.class),
-                    mock(FunctionMetaDataManager.class));
+                    mock(FunctionMetaDataManager.class),
+                    mock(ErrorNotifier.class));
 
             assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), ThreadRuntimeFactory.class);
         } catch (Exception e) {
@@ -854,7 +997,8 @@ public class FunctionRuntimeManagerTest {
                 mock(MembershipManager.class),
                 mock(ConnectorsManager.class),
                 mock(FunctionsManager.class),
-                mock(FunctionMetaDataManager.class));
+                mock(FunctionMetaDataManager.class),
+                mock(ErrorNotifier.class));
 
         assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), KubernetesRuntimeFactory.class);
         KubernetesRuntimeFactory kubernetesRuntimeFactory = (KubernetesRuntimeFactory) functionRuntimeManager.getRuntimeFactory();
@@ -882,7 +1026,8 @@ public class FunctionRuntimeManagerTest {
                 mock(MembershipManager.class),
                 mock(ConnectorsManager.class),
                 mock(FunctionsManager.class),
-                mock(FunctionMetaDataManager.class));
+                mock(FunctionMetaDataManager.class),
+                mock(ErrorNotifier.class));
 
         assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), ProcessRuntimeFactory.class);
         ProcessRuntimeFactory processRuntimeFactory = (ProcessRuntimeFactory) functionRuntimeManager.getRuntimeFactory();
@@ -906,7 +1051,8 @@ public class FunctionRuntimeManagerTest {
                 mock(MembershipManager.class),
                 mock(ConnectorsManager.class),
                 mock(FunctionsManager.class),
-                mock(FunctionMetaDataManager.class));
+                mock(FunctionMetaDataManager.class),
+                mock(ErrorNotifier.class));
 
         assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), ThreadRuntimeFactory.class);
         ThreadRuntimeFactory threadRuntimeFactory = (ThreadRuntimeFactory) functionRuntimeManager.getRuntimeFactory();
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index 14d1044..7acb3f7 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -155,7 +155,8 @@ public class MembershipManagerTest {
                 mock(MembershipManager.class),
                 mock(ConnectorsManager.class),
                 mock(FunctionsManager.class),
-                functionMetaDataManager));
+                functionMetaDataManager,
+                mock(ErrorNotifier.class)));
         MembershipManager membershipManager = spy(new MembershipManager(workerService, pulsarClient, pulsarAdmin));
 
         List<WorkerInfo> workerInfoList = new LinkedList<>();
@@ -228,7 +229,8 @@ public class MembershipManagerTest {
                 mock(MembershipManager.class),
                 mock(ConnectorsManager.class),
                 mock(FunctionsManager.class),
-                functionMetaDataManager));
+                functionMetaDataManager,
+                mock(ErrorNotifier.class)));
 
         MembershipManager membershipManager = spy(new MembershipManager(workerService, mockPulsarClient(), pulsarAdmin));
 
@@ -316,7 +318,8 @@ public class MembershipManagerTest {
                 mock(MembershipManager.class),
                 mock(ConnectorsManager.class),
                 mock(FunctionsManager.class),
-                functionMetaDataManager));
+                functionMetaDataManager,
+                mock(ErrorNotifier.class)));
         MembershipManager membershipManager = spy(new MembershipManager(workerService, mockPulsarClient(), pulsarAdmin));
 
         List<WorkerInfo> workerInfoList = new LinkedList<>();
@@ -396,7 +399,8 @@ public class MembershipManagerTest {
                 mock(MembershipManager.class),
                 mock(ConnectorsManager.class),
                 mock(FunctionsManager.class),
-                functionMetaDataManager));
+                functionMetaDataManager,
+                mock(ErrorNotifier.class)));
         MembershipManager membershipManager = spy(new MembershipManager(workerService, mockPulsarClient(), pulsarAdmin));
 
         List<WorkerInfo> workerInfoList = new LinkedList<>();
@@ -440,5 +444,4 @@ public class MembershipManagerTest {
         verify(functionRuntimeManager, times(0)).removeAssignments(any());
         assertEquals(membershipManager.unsignedFunctionDurations.size(), 0);
     }
-
 }