You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2020/06/08 05:13:19 UTC
[pulsar] branch master updated: Add ability for function services
to notify errors (#7187)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4beebb3 Add ability for function services to notify errors (#7187)
4beebb3 is described below
commit 4beebb31a05afb0e310e7deed7ea1aac17b9521b
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Sun Jun 7 22:13:05 2020 -0700
Add ability for function services to notify errors (#7187)
* Add ability for function services to notify errors
---
.../org/apache/pulsar/broker/PulsarService.java | 4 +-
.../worker/PulsarWorkerAssignmentTest.java | 2 +-
.../pulsar/functions/worker/ErrorNotifier.java | 56 +++++++
.../functions/worker/FunctionAssignmentTailer.java | 20 ++-
.../worker/FunctionMetaDataTopicTailer.java | 6 +-
.../functions/worker/FunctionRuntimeManager.java | 12 +-
.../functions/worker/FunctionWorkerStarter.java | 3 +-
.../org/apache/pulsar/functions/worker/Worker.java | 19 ++-
.../pulsar/functions/worker/WorkerService.java | 12 +-
.../worker/FunctionRuntimeManagerTest.java | 172 +++++++++++++++++++--
.../functions/worker/MembershipManagerTest.java | 13 +-
11 files changed, 276 insertions(+), 43 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index b2d5b7a..4848b41 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -116,6 +116,7 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.TwoPhaseCompactor;
+import org.apache.pulsar.functions.worker.ErrorNotifier;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.WorkerUtils;
@@ -1265,7 +1266,8 @@ public class PulsarService implements AutoCloseable {
throw ioe;
}
LOG.info("Function worker service setup completed");
- functionWorkerService.get().start(dlogURI, authenticationService, authorizationService);
+ // TODO figure out how to handle errors from function worker service
+ functionWorkerService.get().start(dlogURI, authenticationService, authorizationService, new ErrorNotifier());
LOG.info("Function worker service started");
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
index 70ab543..26fef7c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
@@ -287,7 +287,7 @@ public class PulsarWorkerAssignmentTest {
final URI dlUri = functionsWorkerService.getDlogUri();
functionsWorkerService.stop();
functionsWorkerService = new WorkerService(workerConfig);
- functionsWorkerService.start(dlUri, new AuthenticationService(PulsarConfigurationLoader.convertFrom(workerConfig)), null);
+ functionsWorkerService.start(dlUri, new AuthenticationService(PulsarConfigurationLoader.convertFrom(workerConfig)), null, new ErrorNotifier());
final FunctionRuntimeManager runtimeManager2 = functionsWorkerService.getFunctionRuntimeManager();
retryStrategically((test) -> {
try {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ErrorNotifier.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ErrorNotifier.java
new file mode 100644
index 0000000..9a51943
--- /dev/null
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ErrorNotifier.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class ErrorNotifier implements Serializable, AutoCloseable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final AtomicReference<Throwable> error = new AtomicReference<>();
+
+ private volatile boolean isRunning;
+
+ public ErrorNotifier() {
+ isRunning = true;
+ }
+
+ public synchronized void triggerError(Throwable th) {
+ error.set(th);
+ this.notify();
+ }
+
+ public synchronized void waitForError() throws Exception {
+ while (isRunning && error.get() == null) {
+ this.wait();
+ }
+
+ if (isRunning) {
+ throw new Exception(error.get());
+ }
+ }
+
+ @Override
+ public synchronized void close() {
+ isRunning = false;
+ this.notify();
+ }
+}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
index 98d5745..40cc581 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
@@ -39,7 +39,11 @@ public class FunctionAssignmentTailer implements AutoCloseable {
private final Thread tailerThread;
- public FunctionAssignmentTailer(FunctionRuntimeManager functionRuntimeManager, ReaderBuilder readerBuilder, WorkerConfig workerConfig) throws PulsarClientException {
+ public FunctionAssignmentTailer(
+ FunctionRuntimeManager functionRuntimeManager,
+ ReaderBuilder readerBuilder,
+ WorkerConfig workerConfig,
+ ErrorNotifier errorNotifier) throws PulsarClientException {
this.functionRuntimeManager = functionRuntimeManager;
this.reader = readerBuilder
@@ -49,21 +53,21 @@ public class FunctionAssignmentTailer implements AutoCloseable {
.readCompacted(true)
.startMessageId(MessageId.earliest)
.create();
-
+
this.tailerThread = new Thread(() -> {
while(isRunning) {
try {
Message<byte[]> msg = reader.readNext();
processAssignment(msg);
- } catch (Exception e) {
+ } catch (Throwable th) {
if (isRunning) {
- log.error("Encountered error in assignment tailer", e);
-
+ log.error("Encountered error in assignment tailer", th);
// trigger fatal error
- // TODO add mechanism to notify main thread
+ isRunning = false;
+ errorNotifier.triggerError(th);
} else {
- if (!(e instanceof InterruptedException)) {
- log.warn("Encountered error when assignment tailer is not running", e);
+ if (!(th instanceof InterruptedException)) {
+ log.warn("Encountered error when assignment tailer is not running", th);
}
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
index 4bcaaf7..b7108e9 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
@@ -53,13 +53,13 @@ public class FunctionMetaDataTopicTailer
@Override
public void close() {
- log.info("Stopping function state consumer");
+ log.info("Stopping function metadata tailer");
try {
reader.close();
} catch (IOException e) {
- log.error("Failed to stop function state consumer", e);
+ log.error("Failed to stop function metadata tailer", e);
}
- log.info("Stopped function state consumer");
+ log.info("Stopped function function metadata tailer");
}
public void processRequest(Message<byte[]> msg) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 0b8e509..bc1ca53 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -129,10 +129,11 @@ public class FunctionRuntimeManager implements AutoCloseable{
private final FunctionMetaDataManager functionMetaDataManager;
-
+ private final ErrorNotifier errorNotifier;
+
public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerService, Namespace dlogNamespace,
MembershipManager membershipManager, ConnectorsManager connectorsManager, FunctionsManager functionsManager,
- FunctionMetaDataManager functionMetaDataManager) throws Exception {
+ FunctionMetaDataManager functionMetaDataManager, ErrorNotifier errorNotifier) throws Exception {
this.workerConfig = workerConfig;
this.workerService = workerService;
this.functionAdmin = workerService.getFunctionAdmin();
@@ -200,6 +201,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
this.membershipManager = membershipManager;
this.functionMetaDataManager = functionMetaDataManager;
+ this.errorNotifier = errorNotifier;
}
/**
@@ -210,7 +212,11 @@ public class FunctionRuntimeManager implements AutoCloseable{
public void initialize() {
log.info("/** Initializing Runtime Manager **/");
try {
- this.functionAssignmentTailer = new FunctionAssignmentTailer(this, this.getWorkerService().getClient().newReader(), workerConfig);
+ this.functionAssignmentTailer = new FunctionAssignmentTailer(
+ this,
+ this.getWorkerService().getClient().newReader(),
+ this.workerConfig,
+ this.errorNotifier);
// start init phase
this.isInitializePhase = true;
// read all existing messages
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionWorkerStarter.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionWorkerStarter.java
index ab5332b..90ee762 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionWorkerStarter.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionWorkerStarter.java
@@ -65,8 +65,7 @@ public class FunctionWorkerStarter {
final Worker worker = new Worker(workerConfig);
try {
worker.start();
- }catch(Exception e){
- log.error("Failed to start function worker", e);
+ } catch (Throwable th) {
worker.stop();
System.exit(-1);
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
index 6245e59..4c53537 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
@@ -57,19 +57,28 @@ public class Worker {
new DefaultThreadFactory("zk-cache-callback"));
private GlobalZooKeeperCache globalZkCache;
private ConfigurationCacheService configurationCacheService;
+ private final ErrorNotifier errorNotifier;
public Worker(WorkerConfig workerConfig) {
this.workerConfig = workerConfig;
this.workerService = new WorkerService(workerConfig);
+ this.errorNotifier = new ErrorNotifier();
}
protected void start() throws Exception {
- URI dlogUri = initialize(this.workerConfig);
+ URI dlogUri = initialize(workerConfig);
- workerService.start(dlogUri, getAuthenticationService(), getAuthorizationService());
- this.server = new WorkerServer(workerService);
- this.server.start();
- log.info("Start worker server on port {}...", this.workerConfig.getWorkerPort());
+ workerService.start(dlogUri, getAuthenticationService(), getAuthorizationService(), errorNotifier);
+ server = new WorkerServer(workerService);
+ server.start();
+ log.info("/** Started worker server on port={} **/", this.workerConfig.getWorkerPort());
+
+ try {
+ errorNotifier.waitForError();
+ } catch (Throwable th) {
+ log.error("!-- Fatal error encountered. Worker will exit now. --!", th);
+ throw th;
+ }
}
private static URI initialize(WorkerConfig workerConfig)
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index edaade6..ec2c44b 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -85,7 +85,8 @@ public class WorkerService {
public void start(URI dlogUri,
AuthenticationService authenticationService,
- AuthorizationService authorizationService) throws InterruptedException {
+ AuthorizationService authorizationService,
+ ErrorNotifier errorNotifier) throws InterruptedException {
log.info("Starting worker {}...", workerConfig.getWorkerId());
try {
@@ -182,7 +183,14 @@ public class WorkerService {
// create function runtime manager
this.functionRuntimeManager = new FunctionRuntimeManager(
- this.workerConfig, this, this.dlogNamespace, this.membershipManager, connectorsManager, functionsManager, functionMetaDataManager);
+ this.workerConfig,
+ this,
+ this.dlogNamespace,
+ this.membershipManager,
+ connectorsManager,
+ functionsManager,
+ functionMetaDataManager,
+ errorNotifier);
// Setting references to managers in scheduler
this.schedulerManager.setFunctionMetaDataManager(this.functionMetaDataManager);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index d266ae0..429e728 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -47,7 +47,9 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.any;
@@ -55,6 +57,7 @@ import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.argThat;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
@@ -102,7 +105,8 @@ public class FunctionRuntimeManagerTest {
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
- mock(FunctionMetaDataManager.class)));
+ mock(FunctionMetaDataManager.class),
+ mock(ErrorNotifier.class)));
FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class));
@@ -185,7 +189,8 @@ public class FunctionRuntimeManagerTest {
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
- mock(FunctionMetaDataManager.class)));
+ mock(FunctionMetaDataManager.class),
+ mock(ErrorNotifier.class)));
FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class));
@@ -271,7 +276,8 @@ public class FunctionRuntimeManagerTest {
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
- mock(FunctionMetaDataManager.class));
+ mock(FunctionMetaDataManager.class),
+ mock(ErrorNotifier.class));
FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class));
@@ -401,7 +407,8 @@ public class FunctionRuntimeManagerTest {
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
- mock(FunctionMetaDataManager.class));
+ mock(FunctionMetaDataManager.class),
+ mock(ErrorNotifier.class));
FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class));
@@ -476,6 +483,131 @@ public class FunctionRuntimeManagerTest {
assertEquals(functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0"), functionRuntimeInfo);
}
+ @Test(timeOut = 10000)
+ public void testErrorNotifier() throws Exception {
+ WorkerConfig workerConfig = new WorkerConfig();
+ workerConfig.setWorkerId("worker-1");
+ workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+ workerConfig.setFunctionRuntimeFactoryConfigs(
+ ObjectMapperFactory.getThreadLocal().convertValue(
+ new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
+ workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+ workerConfig.setStateStorageServiceUrl("foo");
+ workerConfig.setFunctionAssignmentTopicName("assignments");
+
+ Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
+ Function.FunctionDetails.newBuilder()
+ .setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build();
+
+ Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
+ Function.FunctionDetails.newBuilder()
+ .setTenant("test-tenant").setNamespace("test-namespace").setName("func-2")).build();
+
+ Function.Assignment assignment1 = Function.Assignment.newBuilder()
+ .setWorkerId("worker-1")
+ .setInstance(Function.Instance.newBuilder()
+ .setFunctionMetaData(function1).setInstanceId(0).build())
+ .build();
+ Function.Assignment assignment2 = Function.Assignment.newBuilder()
+ .setWorkerId("worker-1")
+ .setInstance(Function.Instance.newBuilder()
+ .setFunctionMetaData(function2).setInstanceId(0).build())
+ .build();
+
+ ArrayBlockingQueue<Message<byte[]>> messageList = new ArrayBlockingQueue<>(2);
+ PulsarApi.MessageMetadata.Builder msgMetadataBuilder = PulsarApi.MessageMetadata.newBuilder();
+ Message message1 = spy(new MessageImpl("foo", MessageId.latest.toString(),
+ new HashMap<>(), Unpooled.copiedBuffer(assignment1.toByteArray()), null, msgMetadataBuilder));
+ doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance())).when(message1).getKey();
+
+ Message message2 = spy(new MessageImpl("foo", MessageId.latest.toString(),
+ new HashMap<>(), Unpooled.copiedBuffer(assignment2.toByteArray()), null, msgMetadataBuilder));
+ doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment2.getInstance())).when(message2).getKey();
+
+ PulsarClient pulsarClient = mock(PulsarClient.class);
+
+ Reader<byte[]> reader = mock(Reader.class);
+
+
+ when(reader.readNext()).thenAnswer(new Answer<Message<byte[]>>() {
+ @Override
+ public Message<byte[]> answer(InvocationOnMock invocationOnMock) throws Throwable {
+ return messageList.poll(10, TimeUnit.SECONDS);
+ }
+ });
+
+ when(reader.readNextAsync()).thenAnswer(new Answer<CompletableFuture<Message<byte[]>>>() {
+ @Override
+ public CompletableFuture<Message<byte[]>> answer(InvocationOnMock invocationOnMock) throws Throwable {
+ return new CompletableFuture<>();
+ }
+ });
+
+ when(reader.hasMessageAvailable()).thenAnswer(new Answer<Boolean>() {
+ @Override
+ public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable {
+ return !messageList.isEmpty();
+ }
+ });
+
+ ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
+ doReturn(readerBuilder).when(pulsarClient).newReader();
+ doReturn(readerBuilder).when(readerBuilder).topic(anyString());
+ doReturn(readerBuilder).when(readerBuilder).readerName(anyString());
+ doReturn(readerBuilder).when(readerBuilder).subscriptionRolePrefix(anyString());
+ doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
+ doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
+ doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean());
+
+ doReturn(reader).when(readerBuilder).create();
+ WorkerService workerService = mock(WorkerService.class);
+ doReturn(pulsarClient).when(workerService).getClient();
+ doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
+
+ ErrorNotifier errorNotifier = spy(new ErrorNotifier());
+
+ // test new assignment add functions
+ FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager(
+ workerConfig,
+ workerService,
+ mock(Namespace.class),
+ mock(MembershipManager.class),
+ mock(ConnectorsManager.class),
+ mock(FunctionsManager.class),
+ mock(FunctionMetaDataManager.class),
+ errorNotifier));
+ FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
+ doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
+ doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class));
+ doNothing().when(functionActioner).terminateFunction(any(FunctionRuntimeInfo.class));
+ functionRuntimeManager.setFunctionActioner(functionActioner);
+
+ functionRuntimeManager.initialize();
+
+ // verify no errors occured
+ verify(errorNotifier, times(0)).triggerError(any());
+
+ messageList.add(message1);
+
+ functionRuntimeManager.start();
+
+ verify(errorNotifier, times(0)).triggerError(any());
+
+ // trigger an error to be thrown
+ doThrow(new RuntimeException("test")).when(functionRuntimeManager).processAssignment(any());
+
+ messageList.add(message2);
+
+ try {
+ errorNotifier.waitForError();
+ } catch (Exception e) {
+ assertEquals(e.getCause().getMessage(), "test");
+ }
+ verify(errorNotifier, times(1)).triggerError(any());
+
+ functionRuntimeManager.close();
+ }
+
@Test
public void testRuntimeManagerInitialize() throws Exception {
WorkerConfig workerConfig = new WorkerConfig();
@@ -573,6 +705,8 @@ public class FunctionRuntimeManagerTest {
doReturn(pulsarClient).when(workerService).getClient();
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
+ ErrorNotifier errorNotifier = mock(ErrorNotifier.class);
+
// test new assignment add functions
FunctionRuntimeManager functionRuntimeManager = new FunctionRuntimeManager(
workerConfig,
@@ -581,7 +715,8 @@ public class FunctionRuntimeManagerTest {
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
- mock(FunctionMetaDataManager.class));
+ mock(FunctionMetaDataManager.class),
+ errorNotifier);
FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class));
@@ -604,6 +739,9 @@ public class FunctionRuntimeManagerTest {
new FunctionRuntimeInfo().setFunctionInstance(
Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0)
.build()));
+
+ // verify no errors occured
+ verify(errorNotifier, times(0)).triggerError(any());
}
@Test
@@ -649,7 +787,8 @@ public class FunctionRuntimeManagerTest {
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
- mock(FunctionMetaDataManager.class));
+ mock(FunctionMetaDataManager.class),
+ mock(ErrorNotifier.class));
functionRuntimeManager.setFunctionActioner(functionActioner);
Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder()
@@ -753,7 +892,8 @@ public class FunctionRuntimeManagerTest {
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
- mock(FunctionMetaDataManager.class));
+ mock(FunctionMetaDataManager.class),
+ mock(ErrorNotifier.class));
fail();
} catch (Exception e) {
@@ -777,7 +917,8 @@ public class FunctionRuntimeManagerTest {
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
- mock(FunctionMetaDataManager.class));
+ mock(FunctionMetaDataManager.class),
+ mock(ErrorNotifier.class));
fail();
} catch (Exception e) {
@@ -801,7 +942,8 @@ public class FunctionRuntimeManagerTest {
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
- mock(FunctionMetaDataManager.class));
+ mock(FunctionMetaDataManager.class),
+ mock(ErrorNotifier.class));
fail();
} catch (Exception e) {
@@ -825,7 +967,8 @@ public class FunctionRuntimeManagerTest {
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
- mock(FunctionMetaDataManager.class));
+ mock(FunctionMetaDataManager.class),
+ mock(ErrorNotifier.class));
assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), ThreadRuntimeFactory.class);
} catch (Exception e) {
@@ -854,7 +997,8 @@ public class FunctionRuntimeManagerTest {
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
- mock(FunctionMetaDataManager.class));
+ mock(FunctionMetaDataManager.class),
+ mock(ErrorNotifier.class));
assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), KubernetesRuntimeFactory.class);
KubernetesRuntimeFactory kubernetesRuntimeFactory = (KubernetesRuntimeFactory) functionRuntimeManager.getRuntimeFactory();
@@ -882,7 +1026,8 @@ public class FunctionRuntimeManagerTest {
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
- mock(FunctionMetaDataManager.class));
+ mock(FunctionMetaDataManager.class),
+ mock(ErrorNotifier.class));
assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), ProcessRuntimeFactory.class);
ProcessRuntimeFactory processRuntimeFactory = (ProcessRuntimeFactory) functionRuntimeManager.getRuntimeFactory();
@@ -906,7 +1051,8 @@ public class FunctionRuntimeManagerTest {
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
- mock(FunctionMetaDataManager.class));
+ mock(FunctionMetaDataManager.class),
+ mock(ErrorNotifier.class));
assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), ThreadRuntimeFactory.class);
ThreadRuntimeFactory threadRuntimeFactory = (ThreadRuntimeFactory) functionRuntimeManager.getRuntimeFactory();
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index 14d1044..7acb3f7 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -155,7 +155,8 @@ public class MembershipManagerTest {
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
- functionMetaDataManager));
+ functionMetaDataManager,
+ mock(ErrorNotifier.class)));
MembershipManager membershipManager = spy(new MembershipManager(workerService, pulsarClient, pulsarAdmin));
List<WorkerInfo> workerInfoList = new LinkedList<>();
@@ -228,7 +229,8 @@ public class MembershipManagerTest {
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
- functionMetaDataManager));
+ functionMetaDataManager,
+ mock(ErrorNotifier.class)));
MembershipManager membershipManager = spy(new MembershipManager(workerService, mockPulsarClient(), pulsarAdmin));
@@ -316,7 +318,8 @@ public class MembershipManagerTest {
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
- functionMetaDataManager));
+ functionMetaDataManager,
+ mock(ErrorNotifier.class)));
MembershipManager membershipManager = spy(new MembershipManager(workerService, mockPulsarClient(), pulsarAdmin));
List<WorkerInfo> workerInfoList = new LinkedList<>();
@@ -396,7 +399,8 @@ public class MembershipManagerTest {
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
- functionMetaDataManager));
+ functionMetaDataManager,
+ mock(ErrorNotifier.class)));
MembershipManager membershipManager = spy(new MembershipManager(workerService, mockPulsarClient(), pulsarAdmin));
List<WorkerInfo> workerInfoList = new LinkedList<>();
@@ -440,5 +444,4 @@ public class MembershipManagerTest {
verify(functionRuntimeManager, times(0)).removeAssignments(any());
assertEquals(membershipManager.unsignedFunctionDurations.size(), 0);
}
-
}