You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2020/06/19 21:40:40 UTC
[pulsar] branch master updated: Fix leader/scheduler assignment
processing lag problem (#7237)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 68877f8 Fix leader/scheduler assignment processing lag problem (#7237)
68877f8 is described below
commit 68877f8947ce5ed28f09152cbf91ea0b0ecafb87
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Fri Jun 19 14:40:26 2020 -0700
Fix leader/scheduler assignment processing lag problem (#7237)
* Fix leader/scheduler assignment processing lag problem
* add license header
* adding more comments
* improving impl
* fixing bugs
* improving impl
* fixing tests
* adding comments
* add more testing
* addressing comments
* cleaning up
* refactoring implementation
* addressing comments
Co-authored-by: Jerry Peng <je...@splunk.com>
---
.../org/apache/pulsar/PulsarStandaloneStarter.java | 2 +-
.../worker/ClusterServiceCoordinator.java | 9 +-
.../functions/worker/FunctionAssignmentTailer.java | 170 ++++++---
.../functions/worker/FunctionMetaDataManager.java | 13 +-
.../worker/FunctionMetaDataTopicTailer.java | 4 +-
.../functions/worker/FunctionRuntimeManager.java | 52 ++-
.../pulsar/functions/worker/LeaderService.java | 134 +++++++
.../pulsar/functions/worker/MembershipManager.java | 75 +---
.../pulsar/functions/worker/SchedulerManager.java | 232 +++++++----
.../pulsar/functions/worker/WorkerService.java | 157 +++++---
.../pulsar/functions/worker/WorkerUtils.java | 16 +
.../worker/ClusterServiceCoordinatorTest.java | 14 +-
.../worker/FunctionAssignmentTailerTest.java | 422 +++++++++++++++++++++
.../worker/FunctionRuntimeManagerTest.java | 128 -------
.../pulsar/functions/worker/LeaderServiceTest.java | 152 ++++++++
.../functions/worker/MembershipManagerTest.java | 43 ---
.../functions/worker/SchedulerManagerTest.java | 120 ++++--
17 files changed, 1256 insertions(+), 487 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
index e5e9b45..9da388b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
@@ -102,7 +102,7 @@ public class PulsarStandaloneStarter extends PulsarStandalone {
bkEnsemble.stop();
}
} catch (Exception e) {
- log.error("Shutdown failed: {}", e.getMessage());
+ log.error("Shutdown failed: {}", e.getMessage(), e);
}
}
});
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java
index 419f65a..c2bde9d 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java
@@ -48,11 +48,11 @@ public class ClusterServiceCoordinator implements AutoCloseable {
private final String workerId;
private final Map<String, TimerTaskInfo> tasks = new HashMap<>();
private final ScheduledExecutorService executor;
- private final MembershipManager membershipManager;
+ private final LeaderService leaderService;
- public ClusterServiceCoordinator(String workerId, MembershipManager membershipManager) {
+ public ClusterServiceCoordinator(String workerId, LeaderService leaderService) {
this.workerId = workerId;
- this.membershipManager = membershipManager;
+ this.leaderService = leaderService;
this.executor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("cluster-service-coordinator-timer").build());
}
@@ -62,11 +62,12 @@ public class ClusterServiceCoordinator implements AutoCloseable {
}
public void start() {
+ log.info("/** Starting cluster service coordinator **/");
for (Map.Entry<String, TimerTaskInfo> entry : this.tasks.entrySet()) {
TimerTaskInfo timerTaskInfo = entry.getValue();
String taskName = entry.getKey();
this.executor.scheduleAtFixedRate(() -> {
- boolean isLeader = membershipManager.isLeader();
+ boolean isLeader = leaderService.isLeader();
if (isLeader) {
try {
timerTaskInfo.getTask().run();
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
index 3038044..e1d9c1f 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
@@ -25,95 +25,153 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
-import org.apache.pulsar.functions.proto.Function.Assignment;
import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+/**
+ * This class is responsible for reading assignments from the 'assignments' functions internal topic.
+ * Only functions worker leader writes to the topic while other workers read from the topic.
+ * When a worker become a leader, the worker will read to the end of the assignments topic and close its reader to the topic.
+ * Then the worker and new leader will be in charge of computing new assignments when necessary.
+ * The leader does not need to listen to the assignments topic because it can just update its in memory assignments map directly
+ * after it computes a new scheduling. When a worker loses leadership, the worker is start reading from the assignments topic again.
+ */
@Slf4j
public class FunctionAssignmentTailer implements AutoCloseable {
private final FunctionRuntimeManager functionRuntimeManager;
- @Getter
- private final Reader<byte[]> reader;
+ private final ReaderBuilder readerBuilder;
+ private final WorkerConfig workerConfig;
+ private final ErrorNotifier errorNotifier;
+ private Reader<byte[]> reader;
private volatile boolean isRunning = false;
+ private volatile boolean exitOnEndOfTopic = false;
+ private CompletableFuture<Void> exitFuture;
+ private Thread tailerThread;
- private final Thread tailerThread;
+ @Getter
+ private MessageId lastMessageId = null;
public FunctionAssignmentTailer(
FunctionRuntimeManager functionRuntimeManager,
ReaderBuilder readerBuilder,
WorkerConfig workerConfig,
- ErrorNotifier errorNotifier) throws PulsarClientException {
+ ErrorNotifier errorNotifier) {
this.functionRuntimeManager = functionRuntimeManager;
-
- this.reader = readerBuilder
- .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-runtime-manager")
- .readerName(workerConfig.getWorkerId() + "-function-runtime-manager")
- .topic(workerConfig.getFunctionAssignmentTopic())
- .readCompacted(true)
- .startMessageId(MessageId.earliest)
- .create();
-
- this.tailerThread = new Thread(() -> {
- while(isRunning) {
- try {
- Message<byte[]> msg = reader.readNext();
- processAssignment(msg);
- } catch (Throwable th) {
- if (isRunning) {
- log.error("Encountered error in assignment tailer", th);
- // trigger fatal error
- isRunning = false;
- errorNotifier.triggerError(th);
- } else {
- if (!(th instanceof InterruptedException || th.getCause() instanceof InterruptedException)) {
- log.warn("Encountered error when assignment tailer is not running", th);
- }
- }
+ this.exitFuture = new CompletableFuture<>();
+ this.readerBuilder = readerBuilder;
+ this.workerConfig = workerConfig;
+ this.errorNotifier = errorNotifier;
+ }
- }
+ public synchronized CompletableFuture<Void> triggerReadToTheEndAndExit() {
+ exitOnEndOfTopic = true;
+ return this.exitFuture;
+ }
+
+ public void startFromMessage(MessageId startMessageId) throws PulsarClientException {
+ if (!isRunning) {
+ isRunning = true;
+ if (reader == null) {
+ reader = createReader(startMessageId);
}
- });
- this.tailerThread.setName("assignment-tailer-thread");
+ if (tailerThread == null || !tailerThread.isAlive()) {
+ tailerThread = getTailerThread();
+ }
+ exitFuture = new CompletableFuture<>();
+ tailerThread.start();
+ }
}
- public void start() {
- isRunning = true;
- tailerThread.start();
+ public synchronized void start() throws PulsarClientException {
+ MessageId startMessageId = lastMessageId == null ? MessageId.earliest : lastMessageId;
+ startFromMessage(startMessageId);
}
@Override
- public void close() {
- log.info("Stopping function assignment tailer");
+ public synchronized void close() {
+ log.info("Closing function assignment tailer");
try {
isRunning = false;
- if (tailerThread != null && tailerThread.isAlive()) {
- tailerThread.interrupt();
+
+ if (tailerThread != null) {
+ while (true) {
+ tailerThread.interrupt();
+
+ try {
+ tailerThread.join(5000, 0);
+ } catch (InterruptedException e) {
+ log.warn("Waiting for assignment tailer thread to stop is interrupted", e);
+ }
+
+ if (tailerThread.isAlive()) {
+ log.warn("Assignment tailer thread is still alive. Will attempt to interrupt again.");
+ } else {
+ break;
+ }
+ }
+ tailerThread = null;
}
if (reader != null) {
reader.close();
+ reader = null;
}
+
+ exitFuture = null;
+ exitOnEndOfTopic = false;
+
} catch (IOException e) {
log.error("Failed to stop function assignment tailer", e);
}
- log.info("Stopped function assignment tailer");
+ }
+
+ private Reader<byte[]> createReader(MessageId startMessageId) throws PulsarClientException {
+ log.info("Assignment tailer will start reading from message id {}", startMessageId);
+
+ return WorkerUtils.createReader(
+ readerBuilder,
+ workerConfig.getWorkerId() + "-function-assignment-tailer",
+ workerConfig.getFunctionAssignmentTopic(),
+ startMessageId);
}
- public void processAssignment(Message<byte[]> msg) {
-
- if(msg.getData()==null || (msg.getData().length==0)) {
- log.info("Received assignment delete: {}", msg.getKey());
- this.functionRuntimeManager.deleteAssignment(msg.getKey());
- } else {
- Assignment assignment;
- try {
- assignment = Assignment.parseFrom(msg.getData());
- } catch (IOException e) {
- log.error("[{}] Received bad assignment update at message {}", reader.getTopic(), msg.getMessageId(), e);
- throw new RuntimeException(e);
+ private Thread getTailerThread() {
+ Thread t = new Thread(() -> {
+ while (isRunning) {
+ try {
+ Message<byte[]> msg = reader.readNext(5, TimeUnit.SECONDS);
+ if (msg == null) {
+ if (exitOnEndOfTopic && !reader.hasMessageAvailable()) {
+ break;
+ }
+ } else {
+ functionRuntimeManager.processAssignmentMessage(msg);
+ // keep track of last message id
+ lastMessageId = msg.getMessageId();
+ }
+ } catch (Throwable th) {
+ if (isRunning) {
+ log.error("Encountered error in assignment tailer", th);
+ // trigger fatal error
+ isRunning = false;
+ errorNotifier.triggerError(th);
+ } else {
+ if (!(th instanceof InterruptedException || th.getCause() instanceof InterruptedException)) {
+ log.warn("Encountered error when assignment tailer is not running", th);
+ }
+ }
+ }
}
- log.info("Received assignment update: {}", assignment);
- this.functionRuntimeManager.processAssignment(assignment);
- }
+ log.info("tailer thread exiting");
+ exitFuture.complete(null);
+ });
+ t.setName("assignment-tailer-thread");
+ return t;
+ }
+
+ Thread getThread() {
+ return tailerThread;
}
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
index 80f577d..6496f6e 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
@@ -89,7 +89,6 @@ public class FunctionMetaDataManager implements AutoCloseable {
* 1. Consume all existing function meta data upon start to establish existing state
*/
public void initialize() {
- log.info("/** Initializing Function Metadata Manager **/");
try {
this.functionMetaDataTopicTailer = new FunctionMetaDataTopicTailer(this,
pulsarClient.newReader(), this.workerConfig, this.errorNotifier);
@@ -99,16 +98,20 @@ public class FunctionMetaDataManager implements AutoCloseable {
this.functionMetaDataTopicTailer.processRequest(this.functionMetaDataTopicTailer.getReader().readNext());
}
this.setInitializePhase(false);
- // schedule functions if necessary
- this.schedulerManager.schedule();
- // start function metadata tailer
- this.functionMetaDataTopicTailer.start();
+
} catch (Exception e) {
log.error("Failed to initialize meta data store", e);
throw new RuntimeException(e);
}
}
+
+ public void start() {
+ // schedule functions if necessary
+ this.schedulerManager.schedule();
+ // start function metadata tailer
+ this.functionMetaDataTopicTailer.start();
+ }
/**
* Get the function metadata for a function
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
index 55a67a6..f0626ce 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
@@ -48,8 +48,8 @@ public class FunctionMetaDataTopicTailer
this.reader = readerBuilder
.topic(workerConfig.getFunctionMetadataTopic())
.startMessageId(MessageId.earliest)
- .readerName(workerConfig.getWorkerId() + "-function-metadata-manager")
- .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-metadata-manager")
+ .readerName(workerConfig.getWorkerId() + "-function-metadata-tailer")
+ .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-metadata-tailer")
.create();
readerThread = new Thread(this);
readerThread.setName("function-metadata-tailer-thread");
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index bc1ca53..bdde8a8 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -26,6 +26,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.common.functions.WorkerInfo;
@@ -53,6 +54,7 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
+import java.io.IOException;
import java.net.URI;
import java.util.*;
import java.util.Map.Entry;
@@ -109,8 +111,6 @@ public class FunctionRuntimeManager implements AutoCloseable{
@Getter
final WorkerConfig workerConfig;
- private FunctionAssignmentTailer functionAssignmentTailer;
-
@Setter
@Getter
private FunctionActioner functionActioner;
@@ -130,7 +130,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
private final FunctionMetaDataManager functionMetaDataManager;
private final ErrorNotifier errorNotifier;
-
+
public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerService, Namespace dlogNamespace,
MembershipManager membershipManager, ConnectorsManager connectorsManager, FunctionsManager functionsManager,
FunctionMetaDataManager functionMetaDataManager, ErrorNotifier errorNotifier) throws Exception {
@@ -210,21 +210,23 @@ public class FunctionRuntimeManager implements AutoCloseable{
* 2. After current assignments are read, assignments belonging to this worker will be processed
*/
public void initialize() {
- log.info("/** Initializing Runtime Manager **/");
try {
- this.functionAssignmentTailer = new FunctionAssignmentTailer(
- this,
- this.getWorkerService().getClient().newReader(),
- this.workerConfig,
- this.errorNotifier);
+ Reader<byte[]> reader = WorkerUtils.createReader(
+ workerService.getClient().newReader(),
+ workerConfig.getWorkerId() + "-function-assignment-initialize",
+ workerConfig.getFunctionAssignmentTopic(),
+ MessageId.earliest);
+
// start init phase
this.isInitializePhase = true;
// read all existing messages
- while (this.functionAssignmentTailer.getReader().hasMessageAvailable()) {
- this.functionAssignmentTailer.processAssignment(this.functionAssignmentTailer.getReader().readNext());
+ while (reader.hasMessageAvailable()) {
+ processAssignmentMessage(reader.readNext());
}
// init phase is done
this.isInitializePhase = false;
+ // close reader
+ reader.close();
// realize existing assignments
Map<String, Assignment> assignmentMap = workerIdToAssignments.get(this.workerConfig.getWorkerId());
if (assignmentMap != null) {
@@ -243,14 +245,6 @@ public class FunctionRuntimeManager implements AutoCloseable{
/**
* Starts the function runtime manager
*/
- public void start() {
- log.info("/** Starting Function Runtime Manager **/");
- this.functionAssignmentTailer.start();
- }
-
- /**
- * Public methods
- */
/**
* Get current assignments
@@ -611,6 +605,25 @@ public class FunctionRuntimeManager implements AutoCloseable{
return functionStats.calculateOverall();
}
+
+ public synchronized void processAssignmentMessage(Message<byte[]> msg) {
+
+ if(msg.getData()==null || (msg.getData().length==0)) {
+ log.info("Received assignment delete: {}", msg.getKey());
+ deleteAssignment(msg.getKey());
+ } else {
+ Assignment assignment;
+ try {
+ assignment = Assignment.parseFrom(msg.getData());
+ } catch (IOException e) {
+ log.error("[{}] Received bad assignment update at message {}", msg.getMessageId(), e);
+ throw new RuntimeException(e);
+ }
+ log.info("Received assignment update: {}", assignment);
+ processAssignment(assignment);
+ }
+ }
+
/**
* Process an assignment update from the assignment topic
* @param newAssignment the assignment
@@ -835,7 +848,6 @@ public class FunctionRuntimeManager implements AutoCloseable{
@Override
public void close() throws Exception {
- this.functionAssignmentTailer.close();
stopAllOwnedFunctions();
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
new file mode 100644
index 0000000..3d7ec92
--- /dev/null
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerEventListener;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@Slf4j
+public class LeaderService implements AutoCloseable, ConsumerEventListener {
+
+ private final String consumerName;
+ private final FunctionAssignmentTailer functionAssignmentTailer;
+ private final ErrorNotifier errorNotifier;
+ private final SchedulerManager schedulerManager;
+ private ConsumerImpl<byte[]> consumer;
+ private final WorkerConfig workerConfig;
+ private final PulsarClient pulsarClient;
+ private final AtomicBoolean isLeader = new AtomicBoolean(false);
+
+ static final String COORDINATION_TOPIC_SUBSCRIPTION = "participants";
+
+ private static String WORKER_IDENTIFIER = "id";
+
+ public LeaderService(WorkerService workerService,
+ PulsarClient pulsarClient,
+ FunctionAssignmentTailer functionAssignmentTailer,
+ SchedulerManager schedulerManager,
+ ErrorNotifier errorNotifier) {
+ this.workerConfig = workerService.getWorkerConfig();
+ this.pulsarClient = pulsarClient;
+ this.functionAssignmentTailer = functionAssignmentTailer;
+ this.schedulerManager = schedulerManager;
+ this.errorNotifier = errorNotifier;
+ consumerName = String.format(
+ "%s:%s:%d",
+ workerConfig.getWorkerId(),
+ workerConfig.getWorkerHostname(),
+ workerConfig.getWorkerPort()
+ );
+
+ }
+
+ public void start() throws PulsarClientException {
+ // the leaders service is using a `coordination` topic for leader election.
+ // we don't produce any messages into this topic, we only use the `failover` subscription
+ // to elect an active consumer as the leader worker. The leader worker will be responsible
+ // for scheduling snapshots for FMT and doing task assignment.
+ consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
+ .topic(workerConfig.getClusterCoordinationTopic())
+ .subscriptionName(COORDINATION_TOPIC_SUBSCRIPTION)
+ .subscriptionType(SubscriptionType.Failover)
+ .consumerEventListener(this)
+ .property(WORKER_IDENTIFIER, consumerName)
+ .consumerName(consumerName)
+ .subscribe();
+ }
+
+ @Override
+ public synchronized void becameActive(Consumer<?> consumer, int partitionId) {
+ if (isLeader.compareAndSet(false, true)) {
+ log.info("Worker {} became the leader.", consumerName);
+ try {
+ // trigger read to the end of the topic and exit
+ // Since the leader can just update its in memory assignments cache directly
+ functionAssignmentTailer.triggerReadToTheEndAndExit().get();
+ functionAssignmentTailer.close();
+
+ // make sure scheduler is initialized because this worker
+ // is the leader and may need to start computing and writing assignments
+ schedulerManager.initialize();
+ } catch (Throwable th) {
+ log.error("Encountered error when initializing to become leader", th);
+ errorNotifier.triggerError(th);
+ }
+ }
+ }
+
+ @Override
+ public synchronized void becameInactive(Consumer<?> consumer, int partitionId) {
+ if (isLeader.compareAndSet(true, false)) {
+ log.info("Worker {} lost the leadership.", consumerName);
+ // when a worker has lost leadership it needs to start reading from the assignment topic again
+ try {
+ // stop scheduler manager since we are not the leader anymore
+ // will block if a schedule invocation is in process
+ schedulerManager.close();
+
+ // starting reading from assignment topic from the last published message of the scheduler
+ if (schedulerManager.getLastMessageProduced() == null) {
+ functionAssignmentTailer.start();
+ } else {
+ functionAssignmentTailer.startFromMessage(schedulerManager.getLastMessageProduced());
+ }
+ } catch (Throwable th) {
+ log.error("Encountered error in routine when worker lost leadership", th);
+ errorNotifier.triggerError(th);
+ }
+ }
+ }
+
+ public boolean isLeader() {
+ return isLeader.get();
+ }
+
+ @Override
+ public void close() throws PulsarClientException {
+ if (consumer != null) {
+ consumer.close();
+ }
+ }
+}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
index 50a2f97..749f3a3 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
@@ -54,14 +54,10 @@ import static org.apache.pulsar.functions.worker.SchedulerManager.checkHeartBeat
* A simple implementation of leader election using a pulsar topic.
*/
@Slf4j
-public class MembershipManager implements AutoCloseable, ConsumerEventListener {
+public class MembershipManager implements AutoCloseable {
- private final String consumerName;
- private final ConsumerImpl<byte[]> consumer;
private final WorkerConfig workerConfig;
private PulsarAdmin pulsarAdmin;
- private final CompletableFuture<Void> firstConsumerEventFuture;
- private final AtomicBoolean isLeader = new AtomicBoolean();
static final String COORDINATION_TOPIC_SUBSCRIPTION = "participants";
@@ -72,50 +68,9 @@ public class MembershipManager implements AutoCloseable, ConsumerEventListener {
@VisibleForTesting
Map<Function.Instance, Long> unsignedFunctionDurations = new HashMap<>();
- MembershipManager(WorkerService service, PulsarClient client, PulsarAdmin pulsarAdmin)
- throws PulsarClientException {
- this.workerConfig = service.getWorkerConfig();
+ MembershipManager(WorkerService workerService, PulsarClient pulsarClient, PulsarAdmin pulsarAdmin) {
+ this.workerConfig = workerService.getWorkerConfig();
this.pulsarAdmin = pulsarAdmin;
- consumerName = String.format(
- "%s:%s:%d",
- workerConfig.getWorkerId(),
- workerConfig.getWorkerHostname(),
- workerConfig.getWorkerPort()
- );
- firstConsumerEventFuture = new CompletableFuture<>();
- // the membership manager is using a `coordination` topic for leader election.
- // we don't produce any messages into this topic, we only use the `failover` subscription
- // to elect an active consumer as the leader worker. The leader worker will be responsible
- // for scheduling snapshots for FMT and doing task assignment.
- consumer = (ConsumerImpl<byte[]>) client.newConsumer()
- .topic(workerConfig.getClusterCoordinationTopic())
- .subscriptionName(COORDINATION_TOPIC_SUBSCRIPTION)
- .subscriptionType(SubscriptionType.Failover)
- .consumerEventListener(this)
- .property(WORKER_IDENTIFIER, consumerName)
- .subscribe();
-
- isLeader.set(checkLeader(service, consumer.getConsumerName()));
- }
-
- @Override
- public void becameActive(Consumer<?> consumer, int partitionId) {
- firstConsumerEventFuture.complete(null);
- if (isLeader.compareAndSet(false, true)) {
- log.info("Worker {} became the leader.", consumerName);
- }
- }
-
- @Override
- public void becameInactive(Consumer<?> consumer, int partitionId) {
- firstConsumerEventFuture.complete(null);
- if (isLeader.compareAndSet(true, false)) {
- log.info("Worker {} lost the leadership.", consumerName);
- }
- }
-
- public boolean isLeader() {
- return isLeader.get();
}
public List<WorkerInfo> getCurrentMembership() {
@@ -163,8 +118,8 @@ public class MembershipManager implements AutoCloseable, ConsumerEventListener {
}
@Override
- public void close() throws PulsarClientException {
- consumer.close();
+ public void close() {
+
}
public void checkFailures(FunctionMetaDataManager functionMetaDataManager,
@@ -274,24 +229,4 @@ public class MembershipManager implements AutoCloseable, ConsumerEventListener {
schedulerManager.schedule();
}
}
-
- /**
- * Private methods
- */
-
- private boolean checkLeader(WorkerService service, String consumerName) {
- try {
- TopicStats stats = service.getBrokerAdmin().topics()
- .getStats(service.getWorkerConfig().getClusterCoordinationTopic());
- String activeConsumerName = stats != null
- && stats.subscriptions.get(COORDINATION_TOPIC_SUBSCRIPTION) != null
- ? stats.subscriptions.get(COORDINATION_TOPIC_SUBSCRIPTION).activeConsumerName
- : null;
- return consumerName != null && consumerName.equalsIgnoreCase(activeConsumerName);
- } catch (Exception e) {
- log.warn("Failed to check leader {}", e.getMessage());
- }
- return false;
- }
-
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index 9c93443..33213a8 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -18,52 +18,74 @@
*/
package org.apache.pulsar.functions.worker;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.Assignment;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
import org.apache.pulsar.functions.proto.Function.Instance;
import org.apache.pulsar.functions.utils.Actions;
-import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.scheduler.IScheduler;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-
-import lombok.Setter;
-import lombok.extern.slf4j.Slf4j;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
@Slf4j
+/**
+ * The scheduler manager is used to compute scheduling of function instances
+ * Only the leader computes new schedulings and writes assignments to the assignment topic
+ * The lifecyle of this class is the following:
+ * 1. When worker becomes leader, this class with me initialized
+ * 2. When worker loses leadership, this class will be closed which also closes the worker's producer to the assignments topic
+ */
public class SchedulerManager implements AutoCloseable {
private final WorkerConfig workerConfig;
+ private final ErrorNotifier errorNotifier;
+ private ThreadPoolExecutor executorService;
+ private final PulsarClient pulsarClient;
@Setter
private FunctionMetaDataManager functionMetaDataManager;
@Setter
+ private LeaderService leaderService;
+
+ @Setter
private MembershipManager membershipManager;
@Setter
@@ -71,27 +93,36 @@ public class SchedulerManager implements AutoCloseable {
private final IScheduler scheduler;
- private final Producer<byte[]> producer;
+ private Producer<byte[]> producer;
- private final ScheduledExecutorService executorService;
+ private ScheduledExecutorService scheduledExecutorService;
private final PulsarAdmin admin;
-
+
+ @Getter
+ private Lock schedulerLock = new ReentrantLock(true);
+
+ private volatile boolean isRunning = false;
+
AtomicBoolean isCompactionNeeded = new AtomicBoolean(false);
private static final long DEFAULT_ADMIN_API_BACKOFF_SEC = 60;
public static final String HEARTBEAT_TENANT = "pulsar-function";
public static final String HEARTBEAT_NAMESPACE = "heartbeat";
- public SchedulerManager(WorkerConfig workerConfig, PulsarClient pulsarClient, PulsarAdmin admin, ScheduledExecutorService executor) {
+ @Getter
+ private MessageId lastMessageProduced = null;
+
+ public SchedulerManager(WorkerConfig workerConfig,
+ PulsarClient pulsarClient,
+ PulsarAdmin admin,
+ ErrorNotifier errorNotifier) {
this.workerConfig = workerConfig;
+ this.pulsarClient = pulsarClient;
this.admin = admin;
this.scheduler = Reflections.createInstance(workerConfig.getSchedulerClassName(), IScheduler.class,
Thread.currentThread().getContextClassLoader());
-
- this.producer = createProducer(pulsarClient, workerConfig);
- this.executorService = executor;
+ this.errorNotifier = errorNotifier;
- scheduleCompaction(executor, workerConfig.getTopicCompactionFrequencySec());
}
private static Producer<byte[]> createProducer(PulsarClient client, WorkerConfig config) {
@@ -136,26 +167,58 @@ public class SchedulerManager implements AutoCloseable {
return producer.get();
}
+ public synchronized void initialize() {
+ if (!isRunning) {
+ log.info("Initializing scheduler manager");
+ executorService = new ThreadPoolExecutor(1, 5, 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(5));
+ executorService.setThreadFactory(new ThreadFactoryBuilder().setNameFormat("worker-scheduler-%d").build());
+ scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("worker-assignment-topic-compactor"));
+ scheduleCompaction(this.scheduledExecutorService, workerConfig.getTopicCompactionFrequencySec());
+
+ producer = createProducer(pulsarClient, workerConfig);
+ isRunning = true;
+ lastMessageProduced = null;
+ }
+ }
+
public Future<?> schedule() {
- return executorService.submit(() -> {
- synchronized (SchedulerManager.this) {
- boolean isLeader = membershipManager.isLeader();
- if (isLeader) {
- try {
- invokeScheduler();
- } catch (Exception e) {
- log.warn("Failed to invoke scheduler", e);
- throw e;
+ if (!leaderService.isLeader()) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ // make sure we are initialized before scheduling
+ initialize();
+
+ try {
+ return executorService.submit(() -> {
+ try {
+ schedulerLock.lock();
+
+ boolean isLeader = leaderService.isLeader();
+ if (isLeader) {
+ try {
+ invokeScheduler();
+ } catch (Throwable th) {
+ log.error("Encountered error when invoking scheduler", th);
+ errorNotifier.triggerError(th);
+ }
}
+ } finally {
+ schedulerLock.unlock();
}
- }
- });
+ });
+ } catch (RejectedExecutionException e) {
+ // task queue is full so just ignore
+ log.debug("Rejected task to invoke scheduler since task queue is already full");
+ return CompletableFuture.completedFuture(null);
+ }
}
private void scheduleCompaction(ScheduledExecutorService executor, long scheduleFrequencySec) {
if (executor != null) {
executor.scheduleWithFixedDelay(() -> {
- if (membershipManager.isLeader() && isCompactionNeeded.get()) {
+ if (leaderService.isLeader() && isCompactionNeeded.get()) {
compactAssignmentTopic();
isCompactionNeeded.set(false);
}
@@ -164,20 +227,21 @@ public class SchedulerManager implements AutoCloseable {
}
@VisibleForTesting
- public void invokeScheduler() {
+ void invokeScheduler() {
- Set<String> currentMembership = this.membershipManager.getCurrentMembership()
+ Set<String> currentMembership = membershipManager.getCurrentMembership()
.stream().map(workerInfo -> workerInfo.getWorkerId()).collect(Collectors.toSet());
- List<FunctionMetaData> allFunctions = this.functionMetaDataManager.getAllFunctionMetaData();
+ List<FunctionMetaData> allFunctions = functionMetaDataManager.getAllFunctionMetaData();
Map<String, Function.Instance> allInstances = computeAllInstances(allFunctions, functionRuntimeManager.getRuntimeFactory().externallyManaged());
- Map<String, Map<String, Assignment>> workerIdToAssignments = this.functionRuntimeManager
+ Map<String, Map<String, Assignment>> workerIdToAssignments = functionRuntimeManager
.getCurrentAssignments();
//delete assignments of functions and instances that don't exist anymore
Iterator<Map.Entry<String, Map<String, Assignment>>> it = workerIdToAssignments.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, Map<String, Assignment>> workerIdToAssignmentEntry = it.next();
+ String workerId = workerIdToAssignmentEntry.getKey();
Map<String, Assignment> functionMap = workerIdToAssignmentEntry.getValue();
// remove instances that don't exist anymore
@@ -185,7 +249,14 @@ public class SchedulerManager implements AutoCloseable {
String fullyQualifiedInstanceId = entry.getKey();
boolean deleted = !allInstances.containsKey(fullyQualifiedInstanceId);
if (deleted) {
- publishNewAssignment(entry.getValue().toBuilder().build(), true);
+ Assignment assignment = entry.getValue();
+ MessageId messageId = publishNewAssignment(assignment.toBuilder().build(), true);
+
+ // Directly update in memory assignment cache since I am leader
+ log.info("Deleting assignment: {}", assignment);
+ functionRuntimeManager.deleteAssignment(fullyQualifiedInstanceId);
+ // update message id associated with current view of assignments map
+ lastMessageProduced = messageId;
}
return deleted;
});
@@ -198,11 +269,18 @@ public class SchedulerManager implements AutoCloseable {
if (!assignment.getInstance().equals(instance)) {
functionMap.put(fullyQualifiedInstanceId, assignment.toBuilder().setInstance(instance).build());
- publishNewAssignment(assignment.toBuilder().setInstance(instance).build().toBuilder().build(), false);
+ Assignment newAssignment = assignment.toBuilder().setInstance(instance).build().toBuilder().build();
+ MessageId messageId = publishNewAssignment(newAssignment, false);
+
+ // Directly update in memory assignment cache since I am leader
+ log.info("Updating assignment: {}", assignment);
+ functionRuntimeManager.processAssignment(newAssignment);
+ // update message id associated with current view of assignments map
+ lastMessageProduced = messageId;
+ }
+ if (functionMap.isEmpty()) {
+ it.remove();
}
- }
- if (functionMap.isEmpty()) {
- it.remove();
}
}
@@ -222,10 +300,10 @@ public class SchedulerManager implements AutoCloseable {
.flatMap(stringMapEntry -> stringMapEntry.getValue().values().stream())
.collect(Collectors.toList());
- Pair<List<Function.Instance>, List<Assignment>> unassignedInstances = this.getUnassignedFunctionInstances(workerIdToAssignments,
- allInstances);
+ Pair<List<Function.Instance>, List<Assignment>> unassignedInstances
+ = getUnassignedFunctionInstances(workerIdToAssignments, allInstances);
- List<Assignment> assignments = this.scheduler.schedule(unassignedInstances.getLeft(), currentAssignments, currentMembership);
+ List<Assignment> assignments = scheduler.schedule(unassignedInstances.getLeft(), currentAssignments, currentMembership);
assignments.addAll(unassignedInstances.getRight());
if (log.isDebugEnabled()) {
@@ -235,37 +313,43 @@ public class SchedulerManager implements AutoCloseable {
isCompactionNeeded.set(!assignments.isEmpty());
for(Assignment assignment : assignments) {
- publishNewAssignment(assignment, false);
+ MessageId messageId = publishNewAssignment(assignment, false);
+
+ // Directly update in memory assignment cache since I am leader
+ log.info("Adding assignment: {}", assignment);
+ functionRuntimeManager.processAssignment(assignment);
+ // update message id associated with current view of assignments map
+ lastMessageProduced = messageId;
}
}
- public void compactAssignmentTopic() {
+ private void compactAssignmentTopic() {
if (this.admin != null) {
try {
this.admin.topics().triggerCompaction(workerConfig.getFunctionAssignmentTopic());
} catch (PulsarAdminException e) {
log.error("Failed to trigger compaction", e);
- executorService.schedule(() -> compactAssignmentTopic(), DEFAULT_ADMIN_API_BACKOFF_SEC,
+ scheduledExecutorService.schedule(() -> compactAssignmentTopic(), DEFAULT_ADMIN_API_BACKOFF_SEC,
TimeUnit.SECONDS);
}
}
}
- private void publishNewAssignment(Assignment assignment, boolean deleted) {
+ private MessageId publishNewAssignment(Assignment assignment, boolean deleted) {
try {
String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
// publish empty message with instance-id key so, compactor can delete and skip delivery of this instance-id
// message
- producer.newMessage().key(fullyQualifiedInstanceId)
- .value(deleted ? "".getBytes() : assignment.toByteArray()).sendAsync().get();
+ return producer.newMessage().key(fullyQualifiedInstanceId)
+ .value(deleted ? "".getBytes() : assignment.toByteArray()).send();
} catch (Exception e) {
log.error("Failed to {} assignment update {}", assignment, deleted ? "send" : "deleted", e);
throw new RuntimeException(e);
}
}
- public static Map<String, Function.Instance> computeAllInstances(List<FunctionMetaData> allFunctions,
+ private static Map<String, Function.Instance> computeAllInstances(List<FunctionMetaData> allFunctions,
boolean externallyManagedRuntime) {
Map<String, Function.Instance> functionInstances = new HashMap<>();
for (FunctionMetaData functionMetaData : allFunctions) {
@@ -276,7 +360,7 @@ public class SchedulerManager implements AutoCloseable {
return functionInstances;
}
- public static List<Function.Instance> computeInstances(FunctionMetaData functionMetaData,
+ static List<Function.Instance> computeInstances(FunctionMetaData functionMetaData,
boolean externallyManagedRuntime) {
List<Function.Instance> functionInstances = new LinkedList<>();
if (!externallyManagedRuntime) {
@@ -323,15 +407,35 @@ public class SchedulerManager implements AutoCloseable {
}
@Override
- public void close() {
+ public synchronized void close() {
+ log.info("Closing scheduler manager");
try {
- this.producer.close();
- } catch (PulsarClientException e) {
- log.warn("Failed to shutdown scheduler manager assignment producer", e);
+ // make sure we are not closing while a scheduling is being calculated
+ schedulerLock.lock();
+
+ isRunning = false;
+
+ if (scheduledExecutorService != null) {
+ scheduledExecutorService.shutdown();
+ }
+
+ if (executorService != null) {
+ executorService.shutdown();
+ }
+
+ if (producer != null) {
+ try {
+ producer.close();
+ } catch (PulsarClientException e) {
+ log.warn("Failed to shutdown scheduler manager assignment producer", e);
+ }
+ }
+ } finally {
+ schedulerLock.unlock();
}
}
- public static String checkHeartBeatFunction(Instance funInstance) {
+ static String checkHeartBeatFunction(Instance funInstance) {
if (funInstance.getFunctionMetaData() != null
&& funInstance.getFunctionMetaData().getFunctionDetails() != null) {
FunctionDetails funDetails = funInstance.getFunctionMetaData().getFunctionDetails();
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index e3ea170..532bc32 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -70,15 +70,15 @@ public class WorkerService {
private PulsarAdmin brokerAdmin;
private PulsarAdmin functionAdmin;
private final MetricsGenerator metricsGenerator;
- private final ScheduledExecutorService executor;
@VisibleForTesting
private URI dlogUri;
+ private LeaderService leaderService;
+ private FunctionAssignmentTailer functionAssignmentTailer;
public WorkerService(WorkerConfig workerConfig) {
this.workerConfig = workerConfig;
this.statsUpdater = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("worker-stats-updater"));
- this.executor = Executors.newScheduledThreadPool(10, new DefaultThreadFactory("pulsar-worker"));
this.metricsGenerator = new MetricsGenerator(this.statsUpdater, workerConfig);
}
@@ -87,7 +87,7 @@ public class WorkerService {
AuthenticationService authenticationService,
AuthorizationService authorizationService,
ErrorNotifier errorNotifier) throws InterruptedException {
- log.info("Starting worker {}...", workerConfig.getWorkerId());
+ log.info("/** Starting worker id={} **/", workerConfig.getWorkerId());
try {
log.info("Worker Configs: {}", new ObjectMapper().writerWithDefaultPrettyPrinter()
@@ -98,13 +98,13 @@ public class WorkerService {
try {
// create the dlog namespace for storing function packages
- this.dlogUri = dlogUri;
+ dlogUri = dlogUri;
DistributedLogConfiguration dlogConf = WorkerUtils.getDlogConf(workerConfig);
try {
this.dlogNamespace = NamespaceBuilder.newBuilder()
.conf(dlogConf)
.clientId("function-worker-" + workerConfig.getWorkerId())
- .uri(this.dlogUri)
+ .uri(dlogUri)
.build();
} catch (Exception e) {
log.error("Failed to initialize dlog namespace {} for storing function packages",
@@ -146,7 +146,7 @@ public class WorkerService {
workerConfig.getTlsTrustCertsFilePath(), workerConfig.isTlsAllowInsecureConnection(),
workerConfig.isTlsHostnameVerificationEnable());
- this.client = WorkerUtils.getPulsarClient(this.workerConfig.getPulsarServiceUrl(),
+ this.client = WorkerUtils.getPulsarClient(workerConfig.getPulsarServiceUrl(),
workerConfig.getClientAuthenticationPlugin(),
workerConfig.getClientAuthenticationParameters(),
workerConfig.isUseTls(), pulsarClientTlsTrustCertsFilePath,
@@ -156,16 +156,14 @@ public class WorkerService {
this.functionAdmin = WorkerUtils.getPulsarAdminClient(functionWebServiceUrl);
- this.client = WorkerUtils.getPulsarClient(this.workerConfig.getPulsarServiceUrl());
+ this.client = WorkerUtils.getPulsarClient(workerConfig.getPulsarServiceUrl());
}
- log.info("Created Pulsar client");
brokerAdmin.topics().createNonPartitionedTopic(workerConfig.getFunctionAssignmentTopic());
brokerAdmin.topics().createNonPartitionedTopic(workerConfig.getClusterCoordinationTopic());
brokerAdmin.topics().createNonPartitionedTopic(workerConfig.getFunctionMetadataTopic());
//create scheduler manager
- this.schedulerManager = new SchedulerManager(this.workerConfig, this.client, this.brokerAdmin,
- this.executor);
+ this.schedulerManager = new SchedulerManager(workerConfig, client, brokerAdmin, errorNotifier);
//create function meta data manager
this.functionMetaDataManager = new FunctionMetaDataManager(
@@ -179,52 +177,89 @@ public class WorkerService {
if (!brokerAdmin.topics().getSubscriptions(coordinationTopic).contains(MembershipManager.COORDINATION_TOPIC_SUBSCRIPTION)) {
brokerAdmin.topics().createSubscription(coordinationTopic, MembershipManager.COORDINATION_TOPIC_SUBSCRIPTION, MessageId.earliest);
}
- this.membershipManager = new MembershipManager(this, this.client, this.brokerAdmin);
+ this.membershipManager = new MembershipManager(this, client, brokerAdmin);
+
// create function runtime manager
this.functionRuntimeManager = new FunctionRuntimeManager(
- this.workerConfig,
+ workerConfig,
this,
- this.dlogNamespace,
- this.membershipManager,
+ dlogNamespace,
+ membershipManager,
connectorsManager,
functionsManager,
functionMetaDataManager,
errorNotifier);
- // Setting references to managers in scheduler
- this.schedulerManager.setFunctionMetaDataManager(this.functionMetaDataManager);
- this.schedulerManager.setFunctionRuntimeManager(this.functionRuntimeManager);
- this.schedulerManager.setMembershipManager(this.membershipManager);
+
+ // initialize function assignment tailer that reads from the assignment topic
+ this.functionAssignmentTailer = new FunctionAssignmentTailer(
+ functionRuntimeManager,
+ client.newReader(),
+ workerConfig,
+ errorNotifier);
// initialize function metadata manager
- this.functionMetaDataManager.initialize();
+ log.info("/** Initializing Metdata Manager **/");
+ functionMetaDataManager.initialize();
// initialize function runtime manager
- this.functionRuntimeManager.initialize();
+ log.info("/** Initializing Runtime Manager **/");
+ functionRuntimeManager.initialize();
+
+ this.leaderService = new LeaderService(this,
+ client,
+ functionAssignmentTailer,
+ schedulerManager,
+ errorNotifier);
+
+ // Setting references to managers in scheduler
+ schedulerManager.setFunctionMetaDataManager(functionMetaDataManager);
+ schedulerManager.setFunctionRuntimeManager(functionRuntimeManager);
+ schedulerManager.setMembershipManager(membershipManager);
+ schedulerManager.setLeaderService(leaderService);
this.authenticationService = authenticationService;
this.authorizationService = authorizationService;
- // Starting cluster services
- log.info("Start cluster services...");
- this.clusterServiceCoordinator = new ClusterServiceCoordinator(
- this.workerConfig.getWorkerId(),
- membershipManager);
-
- this.clusterServiceCoordinator.addTask("membership-monitor",
- this.workerConfig.getFailureCheckFreqMs(),
- () -> membershipManager.checkFailures(
- functionMetaDataManager, functionRuntimeManager, schedulerManager));
+ // Start function assignment tailer
+ log.info("/** Starting Function Assignment Tailer **/");
+ functionAssignmentTailer.start();
- this.clusterServiceCoordinator.start();
+ log.info("/** Start Leader Service **/");
+ leaderService.start();
+
+ // start function metadata manager
+ log.info("/** Starting Metdata Manager **/");
+ functionMetaDataManager.start();
- // Start function runtime manager
- this.functionRuntimeManager.start();
+ // Starting cluster services
+ this.clusterServiceCoordinator = new ClusterServiceCoordinator(
+ workerConfig.getWorkerId(),
+ leaderService);
+
+ clusterServiceCoordinator.addTask("membership-monitor",
+ workerConfig.getFailureCheckFreqMs(),
+ () -> {
+ // computing a new schedule and checking for failures cannot happen concurrently
+ // both paths of code modify internally cached assignments map in function runtime manager
+ try {
+ schedulerManager.getSchedulerLock().lock();
+ membershipManager.checkFailures(
+ functionMetaDataManager, functionRuntimeManager, schedulerManager);
+ } finally {
+ schedulerManager.getSchedulerLock().unlock();
+ }
+ });
+
+ log.info("/** Starting Cluster Service Coordinator **/");
+ clusterServiceCoordinator.start();
// indicate function worker service is done initializing
this.isInitialized = true;
+
+ log.info("/** Started worker id={} **/", workerConfig.getWorkerId());
} catch (Throwable t) {
log.error("Error Starting up in worker", t);
throw new RuntimeException(t);
@@ -239,18 +274,20 @@ public class WorkerService {
log.warn("Failed to close function metadata manager", e);
}
}
- if (null != functionRuntimeManager) {
+
+ if (null != functionAssignmentTailer) {
try {
- functionRuntimeManager.close();
+ functionAssignmentTailer.close();
} catch (Exception e) {
- log.warn("Failed to close function runtime manager", e);
+ log.warn("Failed to close function assignment tailer", e);
}
}
- if (null != client) {
+
+ if (null != functionRuntimeManager) {
try {
- client.close();
- } catch (PulsarClientException e) {
- log.warn("Failed to close pulsar client", e);
+ functionRuntimeManager.close();
+ } catch (Exception e) {
+ log.warn("Failed to close function runtime manager", e);
}
}
@@ -259,38 +296,46 @@ public class WorkerService {
}
if (null != membershipManager) {
- try {
- membershipManager.close();
- } catch (PulsarClientException e) {
- log.warn("Failed to close membership manager", e);
- }
+ membershipManager.close();
}
if (null != schedulerManager) {
schedulerManager.close();
}
- if (null != this.brokerAdmin) {
- this.brokerAdmin.close();
+ if (null != leaderService) {
+ try {
+ leaderService.close();
+ } catch (PulsarClientException e) {
+ log.warn("Failed to close leader service", e);
+ }
+ }
+
+ if (null != client) {
+ try {
+ client.close();
+ } catch (PulsarClientException e) {
+ log.warn("Failed to close pulsar client", e);
+ }
}
- if (null != this.functionAdmin) {
- this.functionAdmin.close();
+ if (null != brokerAdmin) {
+ brokerAdmin.close();
}
- if (null != this.stateStoreAdminClient) {
- this.stateStoreAdminClient.close();
+ if (null != functionAdmin) {
+ functionAdmin.close();
}
- if (null != this.dlogNamespace) {
- this.dlogNamespace.close();
+ if (null != stateStoreAdminClient) {
+ stateStoreAdminClient.close();
}
- if(this.executor != null) {
- this.executor.shutdown();
+ if (null != dlogNamespace) {
+ dlogNamespace.close();
}
- if (this.statsUpdater != null) {
+ if (statsUpdater != null) {
statsUpdater.shutdownNow();
}
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
index 5cad320..fe1f11d 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
@@ -33,8 +33,11 @@ import org.apache.distributedlog.metadata.DLMetadata;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.functions.proto.Function;
@@ -315,4 +318,17 @@ public final class WorkerUtils {
return false;
}
+
+ public static Reader<byte[]> createReader(ReaderBuilder readerBuilder,
+ String readerName,
+ String topic,
+ MessageId startMessageId) throws PulsarClientException {
+ return readerBuilder
+ .subscriptionRolePrefix(readerName)
+ .readerName(readerName)
+ .topic(topic)
+ .readCompacted(true)
+ .startMessageId(startMessageId)
+ .create();
+ }
}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinatorTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinatorTest.java
index 763465b..a9e33da 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinatorTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinatorTest.java
@@ -52,7 +52,7 @@ public class ClusterServiceCoordinatorTest {
return new org.powermock.modules.testng.PowerMockObjectFactory();
}
- private MembershipManager membershipManager;
+ private LeaderService leaderService;
private ClusterServiceCoordinator coordinator;
private ScheduledExecutorService mockExecutor;
private MockExecutorController mockExecutorController;
@@ -70,8 +70,8 @@ public class ClusterServiceCoordinatorTest {
any(ThreadFactory.class))
).thenReturn(mockExecutor);
- this.membershipManager = mock(MembershipManager.class);
- this.coordinator = new ClusterServiceCoordinator("test-coordinator", membershipManager);
+ this.leaderService = mock(LeaderService.class);
+ this.coordinator = new ClusterServiceCoordinator("test-coordinator", leaderService);
}
@@ -94,18 +94,18 @@ public class ClusterServiceCoordinatorTest {
.scheduleAtFixedRate(any(Runnable.class), eq(interval), eq(interval), eq(TimeUnit.MILLISECONDS));
// when task is executed, it is the leader
- when(membershipManager.isLeader()).thenReturn(true);
+ when(leaderService.isLeader()).thenReturn(true);
mockExecutorController.advance(Duration.ofMillis(interval));
- verify(membershipManager, times(1)).isLeader();
+ verify(leaderService, times(1)).isLeader();
verify(mockTask, times(1)).run();
// when task is executed, it is not the leader
- when(membershipManager.isLeader()).thenReturn(false);
+ when(leaderService.isLeader()).thenReturn(false);
mockExecutorController.advance(Duration.ofMillis(interval));
// `isLeader` is called twice, however the task is only executed once (when it was leader)
- verify(membershipManager, times(2)).isLeader();
+ verify(leaderService, times(2)).isLeader();
verify(mockTask, times(1)).run();
}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailerTest.java
new file mode 100644
index 0000000..28eb06a
--- /dev/null
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailerTest.java
@@ -0,0 +1,422 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import io.netty.buffer.Unpooled;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ReaderBuilder;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
+import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+
+@Slf4j
+public class FunctionAssignmentTailerTest {
+
+ @Test(timeOut = 10000)
+ public void testErrorNotifier() throws Exception {
+ WorkerConfig workerConfig = new WorkerConfig();
+ workerConfig.setWorkerId("worker-1");
+ workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+ workerConfig.setFunctionRuntimeFactoryConfigs(
+ ObjectMapperFactory.getThreadLocal().convertValue(
+ new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
+ workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+ workerConfig.setStateStorageServiceUrl("foo");
+ workerConfig.setFunctionAssignmentTopicName("assignments");
+
+ Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
+ Function.FunctionDetails.newBuilder()
+ .setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build();
+
+ Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
+ Function.FunctionDetails.newBuilder()
+ .setTenant("test-tenant").setNamespace("test-namespace").setName("func-2")).build();
+
+ Function.Assignment assignment1 = Function.Assignment.newBuilder()
+ .setWorkerId("worker-1")
+ .setInstance(Function.Instance.newBuilder()
+ .setFunctionMetaData(function1).setInstanceId(0).build())
+ .build();
+ Function.Assignment assignment2 = Function.Assignment.newBuilder()
+ .setWorkerId("worker-1")
+ .setInstance(Function.Instance.newBuilder()
+ .setFunctionMetaData(function2).setInstanceId(0).build())
+ .build();
+
+ ArrayBlockingQueue<Message<byte[]>> messageList = new ArrayBlockingQueue<>(2);
+ PulsarApi.MessageMetadata.Builder msgMetadataBuilder = PulsarApi.MessageMetadata.newBuilder();
+ Message message1 = spy(new MessageImpl("foo", MessageId.latest.toString(),
+ new HashMap<>(), Unpooled.copiedBuffer(assignment1.toByteArray()), null, msgMetadataBuilder));
+ doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance())).when(message1).getKey();
+
+ Message message2 = spy(new MessageImpl("foo", MessageId.latest.toString(),
+ new HashMap<>(), Unpooled.copiedBuffer(assignment2.toByteArray()), null, msgMetadataBuilder));
+ doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment2.getInstance())).when(message2).getKey();
+
+ PulsarClient pulsarClient = mock(PulsarClient.class);
+
+ Reader<byte[]> reader = mock(Reader.class);
+
+ when(reader.readNext(anyInt(), any())).thenAnswer(new Answer<Message<byte[]>>() {
+ @Override
+ public Message<byte[]> answer(InvocationOnMock invocationOnMock) throws Throwable {
+ return messageList.poll(10, TimeUnit.SECONDS);
+ }
+ });
+
+ when(reader.readNextAsync()).thenAnswer(new Answer<CompletableFuture<Message<byte[]>>>() {
+ @Override
+ public CompletableFuture<Message<byte[]>> answer(InvocationOnMock invocationOnMock) throws Throwable {
+ return new CompletableFuture<>();
+ }
+ });
+
+ when(reader.hasMessageAvailable()).thenAnswer(new Answer<Boolean>() {
+ @Override
+ public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable {
+ return !messageList.isEmpty();
+ }
+ });
+
+ ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
+ doReturn(readerBuilder).when(pulsarClient).newReader();
+ doReturn(readerBuilder).when(readerBuilder).topic(anyString());
+ doReturn(readerBuilder).when(readerBuilder).readerName(anyString());
+ doReturn(readerBuilder).when(readerBuilder).subscriptionRolePrefix(anyString());
+ doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
+ doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
+ doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean());
+
+ doReturn(reader).when(readerBuilder).create();
+ WorkerService workerService = mock(WorkerService.class);
+ doReturn(pulsarClient).when(workerService).getClient();
+ doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
+
+ ErrorNotifier errorNotifier = spy(ErrorNotifier.getDefaultImpl());
+
+ // test new assignment add functions
+ FunctionRuntimeManager functionRuntimeManager = mock(FunctionRuntimeManager.class);
+
+ FunctionAssignmentTailer functionAssignmentTailer = spy(new FunctionAssignmentTailer(functionRuntimeManager, readerBuilder, workerConfig, errorNotifier));
+
+ functionAssignmentTailer.start();
+
+ // verify no errors occured
+ verify(errorNotifier, times(0)).triggerError(any());
+
+ messageList.add(message1);
+
+ verify(errorNotifier, times(0)).triggerError(any());
+
+ // trigger an error to be thrown
+ doThrow(new RuntimeException("test")).when(functionRuntimeManager).processAssignmentMessage(any());
+
+ messageList.add(message2);
+
+ try {
+ errorNotifier.waitForError();
+ } catch (Exception e) {
+ assertEquals(e.getCause().getMessage(), "test");
+ }
+ verify(errorNotifier, times(1)).triggerError(any());
+
+ functionAssignmentTailer.close();
+ }
+
+ @Test(timeOut = 10000)
+ public void testProcessingAssignments() throws Exception {
+ WorkerConfig workerConfig = new WorkerConfig();
+ workerConfig.setWorkerId("worker-1");
+ workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+ workerConfig.setFunctionRuntimeFactoryConfigs(
+ ObjectMapperFactory.getThreadLocal().convertValue(
+ new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
+ workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+ workerConfig.setStateStorageServiceUrl("foo");
+ workerConfig.setFunctionAssignmentTopicName("assignments");
+
+ Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
+ Function.FunctionDetails.newBuilder()
+ .setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build();
+
+ Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
+ Function.FunctionDetails.newBuilder()
+ .setTenant("test-tenant").setNamespace("test-namespace").setName("func-2")).build();
+
+ Function.Assignment assignment1 = Function.Assignment.newBuilder()
+ .setWorkerId("worker-1")
+ .setInstance(Function.Instance.newBuilder()
+ .setFunctionMetaData(function1).setInstanceId(0).build())
+ .build();
+ Function.Assignment assignment2 = Function.Assignment.newBuilder()
+ .setWorkerId("worker-1")
+ .setInstance(Function.Instance.newBuilder()
+ .setFunctionMetaData(function2).setInstanceId(0).build())
+ .build();
+
+ ArrayBlockingQueue<Message<byte[]>> messageList = new ArrayBlockingQueue<>(2);
+
+ MessageId messageId1 = new MessageIdImpl(1, 1, -1);
+ MessageId messageId2 = new MessageIdImpl(1, 2, -1);
+
+ PulsarApi.MessageMetadata.Builder msgMetadataBuilder = PulsarApi.MessageMetadata.newBuilder();
+ Message message1 = spy(new MessageImpl("foo", messageId1.toString(),
+ new HashMap<>(), Unpooled.copiedBuffer(assignment1.toByteArray()), null, msgMetadataBuilder));
+ doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance())).when(message1).getKey();
+
+ Message message2 = spy(new MessageImpl("foo", messageId2.toString(),
+ new HashMap<>(), Unpooled.copiedBuffer(assignment2.toByteArray()), null, msgMetadataBuilder));
+ doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment2.getInstance())).when(message2).getKey();
+
+ PulsarClient pulsarClient = mock(PulsarClient.class);
+
+ Reader<byte[]> reader = mock(Reader.class);
+
+ when(reader.readNext(anyInt(), any())).thenAnswer(new Answer<Message<byte[]>>() {
+ @Override
+ public Message<byte[]> answer(InvocationOnMock invocationOnMock) throws Throwable {
+ return messageList.poll(10, TimeUnit.SECONDS);
+ }
+ });
+
+ when(reader.readNextAsync()).thenAnswer(new Answer<CompletableFuture<Message<byte[]>>>() {
+ @Override
+ public CompletableFuture<Message<byte[]>> answer(InvocationOnMock invocationOnMock) throws Throwable {
+ return new CompletableFuture<>();
+ }
+ });
+
+ when(reader.hasMessageAvailable()).thenAnswer(new Answer<Boolean>() {
+ @Override
+ public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable {
+ return !messageList.isEmpty();
+ }
+ });
+
+ ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
+ doReturn(readerBuilder).when(pulsarClient).newReader();
+ doReturn(readerBuilder).when(readerBuilder).topic(anyString());
+ doReturn(readerBuilder).when(readerBuilder).readerName(anyString());
+ doReturn(readerBuilder).when(readerBuilder).subscriptionRolePrefix(anyString());
+ doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
+ doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
+ doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean());
+
+ doReturn(reader).when(readerBuilder).create();
+ WorkerService workerService = mock(WorkerService.class);
+ doReturn(pulsarClient).when(workerService).getClient();
+ doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
+
+ ErrorNotifier errorNotifier = spy(ErrorNotifier.getDefaultImpl());
+
+ // test new assignment add functions
+ FunctionRuntimeManager functionRuntimeManager = mock(FunctionRuntimeManager.class);
+
+ FunctionAssignmentTailer functionAssignmentTailer = spy(new FunctionAssignmentTailer(functionRuntimeManager, readerBuilder, workerConfig, errorNotifier));
+
+ functionAssignmentTailer.start();
+
+ messageList.add(message1);
+ for (int i = 0; i < 10; i++) {
+ try {
+ verify(functionRuntimeManager, times(1)).processAssignmentMessage(eq(message1));
+ break;
+ } catch (org.mockito.exceptions.verification.WantedButNotInvoked e) {
+ if (i == 9) {
+ throw e;
+ }
+ }
+ Thread.sleep(200);
+ }
+
+ messageList.add(message2);
+ for (int i = 0; i < 10; i++) {
+ try {
+ verify(functionRuntimeManager, times(1)).processAssignmentMessage(eq(message2));
+ break;
+ } catch (org.mockito.exceptions.verification.WantedButNotInvoked e) {
+ if (i == 9) {
+ throw e;
+ }
+ }
+ Thread.sleep(200);
+ }
+
+ Assert.assertEquals(functionAssignmentTailer.getLastMessageId(), message2.getMessageId());
+ functionAssignmentTailer.close();
+ }
+
+ @Test(timeOut = 10000)
+ public void testTriggerReadToTheEndAndExit() throws Exception {
+ WorkerConfig workerConfig = new WorkerConfig();
+ workerConfig.setWorkerId("worker-1");
+ workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+ workerConfig.setFunctionRuntimeFactoryConfigs(
+ ObjectMapperFactory.getThreadLocal().convertValue(
+ new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
+ workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+ workerConfig.setStateStorageServiceUrl("foo");
+ workerConfig.setFunctionAssignmentTopicName("assignments");
+
+ Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
+ Function.FunctionDetails.newBuilder()
+ .setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build();
+
+ Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
+ Function.FunctionDetails.newBuilder()
+ .setTenant("test-tenant").setNamespace("test-namespace").setName("func-2")).build();
+
+ Function.Assignment assignment1 = Function.Assignment.newBuilder()
+ .setWorkerId("worker-1")
+ .setInstance(Function.Instance.newBuilder()
+ .setFunctionMetaData(function1).setInstanceId(0).build())
+ .build();
+ Function.Assignment assignment2 = Function.Assignment.newBuilder()
+ .setWorkerId("worker-1")
+ .setInstance(Function.Instance.newBuilder()
+ .setFunctionMetaData(function2).setInstanceId(0).build())
+ .build();
+
+ ArrayBlockingQueue<Message<byte[]>> messageList = new ArrayBlockingQueue<>(2);
+
+ MessageId messageId1 = new MessageIdImpl(1, 1, -1);
+ MessageId messageId2 = new MessageIdImpl(1, 2, -1);
+
+ PulsarApi.MessageMetadata.Builder msgMetadataBuilder = PulsarApi.MessageMetadata.newBuilder();
+ Message message1 = spy(new MessageImpl("foo", messageId1.toString(),
+ new HashMap<>(), Unpooled.copiedBuffer(assignment1.toByteArray()), null, msgMetadataBuilder));
+ doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance())).when(message1).getKey();
+
+ Message message2 = spy(new MessageImpl("foo", messageId2.toString(),
+ new HashMap<>(), Unpooled.copiedBuffer(assignment2.toByteArray()), null, msgMetadataBuilder));
+ doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment2.getInstance())).when(message2).getKey();
+
+ PulsarClient pulsarClient = mock(PulsarClient.class);
+
+ Reader<byte[]> reader = mock(Reader.class);
+
+ when(reader.readNext(anyInt(), any())).thenAnswer(new Answer<Message<byte[]>>() {
+ @Override
+ public Message<byte[]> answer(InvocationOnMock invocationOnMock) throws Throwable {
+ return messageList.poll(10, TimeUnit.MILLISECONDS);
+ }
+ });
+
+ when(reader.readNextAsync()).thenAnswer(new Answer<CompletableFuture<Message<byte[]>>>() {
+ @Override
+ public CompletableFuture<Message<byte[]>> answer(InvocationOnMock invocationOnMock) throws Throwable {
+ return new CompletableFuture<>();
+ }
+ });
+
+ when(reader.hasMessageAvailable()).thenAnswer(new Answer<Boolean>() {
+ @Override
+ public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable {
+ return !messageList.isEmpty();
+ }
+ });
+
+ ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
+ doReturn(readerBuilder).when(pulsarClient).newReader();
+ doReturn(readerBuilder).when(readerBuilder).topic(anyString());
+ doReturn(readerBuilder).when(readerBuilder).readerName(anyString());
+ doReturn(readerBuilder).when(readerBuilder).subscriptionRolePrefix(anyString());
+ doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
+ doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
+ doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean());
+
+ doReturn(reader).when(readerBuilder).create();
+ WorkerService workerService = mock(WorkerService.class);
+ doReturn(pulsarClient).when(workerService).getClient();
+ doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
+
+ ErrorNotifier errorNotifier = spy(ErrorNotifier.getDefaultImpl());
+
+ // test new assignment add functions
+ FunctionRuntimeManager functionRuntimeManager = mock(FunctionRuntimeManager.class);
+
+ FunctionAssignmentTailer functionAssignmentTailer = spy(new FunctionAssignmentTailer(functionRuntimeManager, readerBuilder, workerConfig, errorNotifier));
+
+ functionAssignmentTailer.start();
+
+ messageList.add(message1);
+ for (int i = 0; i < 10; i++) {
+ try {
+ verify(functionRuntimeManager, times(1)).processAssignmentMessage(eq(message1));
+ break;
+ } catch (org.mockito.exceptions.verification.WantedButNotInvoked e) {
+ if (i == 9) {
+ throw e;
+ }
+ }
+ Thread.sleep(200);
+ }
+
+ functionAssignmentTailer.triggerReadToTheEndAndExit().get();
+ for (int i = 0; i < 10; i++) {
+ if(!functionAssignmentTailer.getThread().isAlive()) {
+ break;
+ }
+
+ if (i == 9) {
+ Assert.assertFalse(functionAssignmentTailer.getThread().isAlive());
+ }
+ Thread.sleep(200);
+ }
+
+ messageList.add(message2);
+ Assert.assertEquals(functionAssignmentTailer.getLastMessageId(), message1.getMessageId());
+
+ functionAssignmentTailer.close();
+ }
+}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index d486707..54b47f5 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -47,9 +47,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.any;
@@ -57,7 +55,6 @@ import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.argThat;
import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
@@ -483,131 +480,6 @@ public class FunctionRuntimeManagerTest {
assertEquals(functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0"), functionRuntimeInfo);
}
- @Test(timeOut = 10000)
- public void testErrorNotifier() throws Exception {
- WorkerConfig workerConfig = new WorkerConfig();
- workerConfig.setWorkerId("worker-1");
- workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
- workerConfig.setFunctionRuntimeFactoryConfigs(
- ObjectMapperFactory.getThreadLocal().convertValue(
- new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
- workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
- workerConfig.setStateStorageServiceUrl("foo");
- workerConfig.setFunctionAssignmentTopicName("assignments");
-
- Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
- Function.FunctionDetails.newBuilder()
- .setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build();
-
- Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
- Function.FunctionDetails.newBuilder()
- .setTenant("test-tenant").setNamespace("test-namespace").setName("func-2")).build();
-
- Function.Assignment assignment1 = Function.Assignment.newBuilder()
- .setWorkerId("worker-1")
- .setInstance(Function.Instance.newBuilder()
- .setFunctionMetaData(function1).setInstanceId(0).build())
- .build();
- Function.Assignment assignment2 = Function.Assignment.newBuilder()
- .setWorkerId("worker-1")
- .setInstance(Function.Instance.newBuilder()
- .setFunctionMetaData(function2).setInstanceId(0).build())
- .build();
-
- ArrayBlockingQueue<Message<byte[]>> messageList = new ArrayBlockingQueue<>(2);
- PulsarApi.MessageMetadata.Builder msgMetadataBuilder = PulsarApi.MessageMetadata.newBuilder();
- Message message1 = spy(new MessageImpl("foo", MessageId.latest.toString(),
- new HashMap<>(), Unpooled.copiedBuffer(assignment1.toByteArray()), null, msgMetadataBuilder));
- doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance())).when(message1).getKey();
-
- Message message2 = spy(new MessageImpl("foo", MessageId.latest.toString(),
- new HashMap<>(), Unpooled.copiedBuffer(assignment2.toByteArray()), null, msgMetadataBuilder));
- doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment2.getInstance())).when(message2).getKey();
-
- PulsarClient pulsarClient = mock(PulsarClient.class);
-
- Reader<byte[]> reader = mock(Reader.class);
-
-
- when(reader.readNext()).thenAnswer(new Answer<Message<byte[]>>() {
- @Override
- public Message<byte[]> answer(InvocationOnMock invocationOnMock) throws Throwable {
- return messageList.poll(10, TimeUnit.SECONDS);
- }
- });
-
- when(reader.readNextAsync()).thenAnswer(new Answer<CompletableFuture<Message<byte[]>>>() {
- @Override
- public CompletableFuture<Message<byte[]>> answer(InvocationOnMock invocationOnMock) throws Throwable {
- return new CompletableFuture<>();
- }
- });
-
- when(reader.hasMessageAvailable()).thenAnswer(new Answer<Boolean>() {
- @Override
- public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable {
- return !messageList.isEmpty();
- }
- });
-
- ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
- doReturn(readerBuilder).when(pulsarClient).newReader();
- doReturn(readerBuilder).when(readerBuilder).topic(anyString());
- doReturn(readerBuilder).when(readerBuilder).readerName(anyString());
- doReturn(readerBuilder).when(readerBuilder).subscriptionRolePrefix(anyString());
- doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
- doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
- doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean());
-
- doReturn(reader).when(readerBuilder).create();
- WorkerService workerService = mock(WorkerService.class);
- doReturn(pulsarClient).when(workerService).getClient();
- doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
-
- ErrorNotifier errorNotifier = spy(ErrorNotifier.getDefaultImpl());
-
- // test new assignment add functions
- FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager(
- workerConfig,
- workerService,
- mock(Namespace.class),
- mock(MembershipManager.class),
- mock(ConnectorsManager.class),
- mock(FunctionsManager.class),
- mock(FunctionMetaDataManager.class),
- errorNotifier));
- FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
- doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
- doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class));
- doNothing().when(functionActioner).terminateFunction(any(FunctionRuntimeInfo.class));
- functionRuntimeManager.setFunctionActioner(functionActioner);
-
- functionRuntimeManager.initialize();
-
- // verify no errors occured
- verify(errorNotifier, times(0)).triggerError(any());
-
- messageList.add(message1);
-
- functionRuntimeManager.start();
-
- verify(errorNotifier, times(0)).triggerError(any());
-
- // trigger an error to be thrown
- doThrow(new RuntimeException("test")).when(functionRuntimeManager).processAssignment(any());
-
- messageList.add(message2);
-
- try {
- errorNotifier.waitForError();
- } catch (Exception e) {
- assertEquals(e.getCause().getMessage(), "test");
- }
- verify(errorNotifier, times(1)).triggerError(any());
-
- functionRuntimeManager.close();
- }
-
@Test
public void testRuntimeManagerInitialize() throws Exception {
WorkerConfig workerConfig = new WorkerConfig();
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java
new file mode 100644
index 0000000..445498a
--- /dev/null
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.ConsumerEventListener;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+public class LeaderServiceTest {
+
+ private final WorkerConfig workerConfig;
+ private LeaderService leaderService;
+ private PulsarClientImpl mockClient;
+ AtomicReference<ConsumerEventListener> listenerHolder;
+ private ConsumerImpl mockConsumer;
+ private FunctionAssignmentTailer functionAssignmentTailer;
+ private SchedulerManager schedulerManager;
+
+ public LeaderServiceTest() {
+ this.workerConfig = new WorkerConfig();
+ workerConfig.setWorkerId("worker-1");
+ workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+ workerConfig.setFunctionRuntimeFactoryConfigs(
+ ObjectMapperFactory.getThreadLocal().convertValue(
+ new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
+ workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+ workerConfig.setStateStorageServiceUrl("foo");
+ workerConfig.setWorkerPort(1234);
+ }
+
+ @BeforeMethod
+ public void setup() throws PulsarClientException {
+ mockClient = mock(PulsarClientImpl.class);
+
+ mockConsumer = mock(ConsumerImpl.class);
+ ConsumerBuilder<byte[]> mockConsumerBuilder = mock(ConsumerBuilder.class);
+
+ when(mockConsumerBuilder.topic(anyString())).thenReturn(mockConsumerBuilder);
+ when(mockConsumerBuilder.subscriptionName(anyString())).thenReturn(mockConsumerBuilder);
+ when(mockConsumerBuilder.subscriptionType(any(SubscriptionType.class))).thenReturn(mockConsumerBuilder);
+ when(mockConsumerBuilder.property(anyString(), anyString())).thenReturn(mockConsumerBuilder);
+ when(mockConsumerBuilder.consumerName(anyString())).thenReturn(mockConsumerBuilder);
+
+ when(mockConsumerBuilder.subscribe()).thenReturn(mockConsumer);
+ WorkerService workerService = mock(WorkerService.class);
+ doReturn(workerConfig).when(workerService).getWorkerConfig();
+
+ listenerHolder = new AtomicReference<>();
+ when(mockConsumerBuilder.consumerEventListener(any(ConsumerEventListener.class))).thenAnswer(invocationOnMock -> {
+
+ ConsumerEventListener listener = invocationOnMock.getArgument(0);
+ listenerHolder.set(listener);
+
+ return mockConsumerBuilder;
+ });
+
+ when(mockClient.newConsumer()).thenReturn(mockConsumerBuilder);
+
+ schedulerManager = mock(SchedulerManager.class);
+
+
+ functionAssignmentTailer = mock(FunctionAssignmentTailer.class);
+ when(functionAssignmentTailer.triggerReadToTheEndAndExit()).thenReturn(CompletableFuture.completedFuture(null));
+
+ leaderService = spy(new LeaderService(workerService, mockClient, functionAssignmentTailer, schedulerManager, ErrorNotifier.getDefaultImpl()));
+ leaderService.start();
+ }
+
+ @Test
+ public void testLeaderService() throws Exception {
+ MessageId messageId = new MessageIdImpl(1, 2, -1);
+ when(schedulerManager.getLastMessageProduced()).thenReturn(messageId);
+
+ assertFalse(leaderService.isLeader());
+ verify(mockClient, times(1)).newConsumer();
+
+ listenerHolder.get().becameActive(mockConsumer, 0);
+ assertTrue(leaderService.isLeader());
+
+ verify(functionAssignmentTailer, times(1)).triggerReadToTheEndAndExit();
+ verify(functionAssignmentTailer, times(1)).close();
+ verify(schedulerManager, times((1))).initialize();
+
+ listenerHolder.get().becameInactive(mockConsumer, 0);
+ assertFalse(leaderService.isLeader());
+
+ verify(functionAssignmentTailer, times(1)).startFromMessage(messageId);
+ verify(schedulerManager, times(1)).close();
+ }
+
+ @Test
+ public void testLeaderServiceNoNewScheduling() throws Exception {
+ when(schedulerManager.getLastMessageProduced()).thenReturn(null);
+
+ assertFalse(leaderService.isLeader());
+ verify(mockClient, times(1)).newConsumer();
+
+ listenerHolder.get().becameActive(mockConsumer, 0);
+ assertTrue(leaderService.isLeader());
+
+ verify(functionAssignmentTailer, times(1)).triggerReadToTheEndAndExit();
+ verify(functionAssignmentTailer, times(1)).close();
+ verify(schedulerManager, times((1))).initialize();
+
+ listenerHolder.get().becameInactive(mockConsumer, 0);
+ assertFalse(leaderService.isLeader());
+
+ verify(functionAssignmentTailer, times(1)).start();
+ verify(schedulerManager, times(1)).close();
+ }
+}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index 7acb3f7..71fe6b2 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -28,14 +28,11 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -71,46 +68,6 @@ public class MembershipManagerTest {
workerConfig.setStateStorageServiceUrl("foo");
}
- @Test
- public void testConsumerEventListener() throws Exception {
- PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
- PulsarAdmin mockAdmin = mock(PulsarAdmin.class);
-
- ConsumerImpl<byte[]> mockConsumer = mock(ConsumerImpl.class);
- ConsumerBuilder<byte[]> mockConsumerBuilder = mock(ConsumerBuilder.class);
-
- when(mockConsumerBuilder.topic(anyString())).thenReturn(mockConsumerBuilder);
- when(mockConsumerBuilder.subscriptionName(anyString())).thenReturn(mockConsumerBuilder);
- when(mockConsumerBuilder.subscriptionType(any(SubscriptionType.class))).thenReturn(mockConsumerBuilder);
- when(mockConsumerBuilder.property(anyString(), anyString())).thenReturn(mockConsumerBuilder);
-
- when(mockConsumerBuilder.subscribe()).thenReturn(mockConsumer);
- WorkerService workerService = mock(WorkerService.class);
- doReturn(workerConfig).when(workerService).getWorkerConfig();
-
- AtomicReference<ConsumerEventListener> listenerHolder = new AtomicReference<>();
- when(mockConsumerBuilder.consumerEventListener(any(ConsumerEventListener.class))).thenAnswer(invocationOnMock -> {
-
- ConsumerEventListener listener = invocationOnMock.getArgument(0);
- listenerHolder.set(listener);
-
- return mockConsumerBuilder;
- });
-
- when(mockClient.newConsumer()).thenReturn(mockConsumerBuilder);
-
- MembershipManager membershipManager = spy(new MembershipManager(workerService, mockClient, mockAdmin));
- assertFalse(membershipManager.isLeader());
- verify(mockClient, times(1))
- .newConsumer();
-
- listenerHolder.get().becameActive(mockConsumer, 0);
- assertTrue(membershipManager.isLeader());
-
- listenerHolder.get().becameInactive(mockConsumer, 0);
- assertFalse(membershipManager.isLeader());
- }
-
private static PulsarClient mockPulsarClient() throws PulsarClientException {
PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index fafed1f..e72b81e 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.functions.worker;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.InvalidProtocolBufferException;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.prometheus.client.CollectorRegistry;
@@ -38,6 +39,7 @@ import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler;
+import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
import org.mockito.invocation.Invocation;
import org.testng.Assert;
@@ -47,18 +49,29 @@ import org.testng.annotations.Test;
import java.lang.reflect.Method;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.anyInt;
@@ -68,6 +81,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
@@ -83,6 +97,8 @@ public class SchedulerManagerTest {
private Producer producer;
private TypedMessageBuilder<byte[]> message;
private ScheduledExecutorService executor;
+ private LeaderService leaderService;
+ private ErrorNotifier errorNotifier;
@BeforeMethod
public void setup() {
@@ -123,13 +139,16 @@ public class SchedulerManagerTest {
this.executor = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("worker-test"));
- schedulerManager = spy(new SchedulerManager(workerConfig, pulsarClient, null, executor));
+ errorNotifier = spy(ErrorNotifier.getDefaultImpl());
+ schedulerManager = spy(new SchedulerManager(workerConfig, pulsarClient, null, errorNotifier));
functionRuntimeManager = mock(FunctionRuntimeManager.class);
functionMetaDataManager = mock(FunctionMetaDataManager.class);
membershipManager = mock(MembershipManager.class);
+ leaderService = mock(LeaderService.class);
schedulerManager.setFunctionMetaDataManager(functionMetaDataManager);
schedulerManager.setFunctionRuntimeManager(functionRuntimeManager);
schedulerManager.setMembershipManager(membershipManager);
+ schedulerManager.setLeaderService(leaderService);
}
@AfterMethod
@@ -171,16 +190,17 @@ public class SchedulerManagerTest {
doReturn(workerInfoList).when(membershipManager).getCurrentMembership();
// i am not leader
- doReturn(false).when(membershipManager).isLeader();
+ doReturn(false).when(leaderService).isLeader();
callSchedule();
verify(producer, times(0)).sendAsync(any());
// i am leader
- doReturn(true).when(membershipManager).isLeader();
+ doReturn(true).when(leaderService).isLeader();
callSchedule();
List<Invocation> invocations = getMethodInvocationDetails(schedulerManager,
- SchedulerManager.class.getMethod("invokeScheduler"));
+ SchedulerManager.class.getDeclaredMethod("invokeScheduler"));
Assert.assertEquals(invocations.size(), 1);
+ verify(errorNotifier, times(0)).triggerError(any());
}
@Test
@@ -217,12 +237,13 @@ public class SchedulerManagerTest {
doReturn(workerInfoList).when(membershipManager).getCurrentMembership();
// i am leader
- doReturn(true).when(membershipManager).isLeader();
+ doReturn(true).when(leaderService).isLeader();
callSchedule();
- List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync"));
+ List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("send"));
Assert.assertEquals(invocations.size(), 0);
+ verify(errorNotifier, times(0)).triggerError(any());
}
@Test
@@ -264,11 +285,11 @@ public class SchedulerManagerTest {
doReturn(workerInfoList).when(membershipManager).getCurrentMembership();
// i am leader
- doReturn(true).when(membershipManager).isLeader();
+ doReturn(true).when(leaderService).isLeader();
callSchedule();
- List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync"));
+ List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("send"));
Assert.assertEquals(invocations.size(), 1);
invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value",
Object.class));
@@ -283,6 +304,8 @@ public class SchedulerManagerTest {
.build();
Assert.assertEquals(assignment2, assignments);
+ // make sure we also directly added the assignment to in memory assignment cache in function runtime manager
+ verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment2));
}
@Test
@@ -334,20 +357,21 @@ public class SchedulerManagerTest {
doReturn(workerInfoList).when(membershipManager).getCurrentMembership();
// i am leader
- doReturn(true).when(membershipManager).isLeader();
+ doReturn(true).when(leaderService).isLeader();
callSchedule();
- List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync"));
+ List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("send"));
Assert.assertEquals(invocations.size(), 1);
invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value",
Object.class));
byte[] send = (byte[]) invocations.get(0).getRawArguments()[0];
- Assignment assignments = Assignment.parseFrom(send);
-
- log.info("assignments: {}", assignments);
+ // delete assignment message should only have key = full qualified instance id and value = null;
Assert.assertEquals(0, send.length);
+
+ // make sure we also directly deleted the assignment from the in memory assignment cache in function runtime manager
+ verify(functionRuntimeManager, times(1)).deleteAssignment(eq(FunctionCommon.getFullyQualifiedInstanceId(assignment2.getInstance())));
}
@Test
@@ -391,11 +415,11 @@ public class SchedulerManagerTest {
doReturn(workerInfoList).when(membershipManager).getCurrentMembership();
// i am leader
- doReturn(true).when(membershipManager).isLeader();
+ doReturn(true).when(leaderService).isLeader();
callSchedule();
- List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync"));
+ List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("send"));
Assert.assertEquals(invocations.size(), 1);
invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value",
Object.class));
@@ -443,7 +467,7 @@ public class SchedulerManagerTest {
callSchedule();
- invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync"));
+ invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("send"));
Assert.assertEquals(invocations.size(), 4);
invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value",
Object.class));
@@ -502,18 +526,26 @@ public class SchedulerManagerTest {
doReturn(workerInfoList).when(membershipManager).getCurrentMembership();
// i am leader
- doReturn(true).when(membershipManager).isLeader();
+ doReturn(true).when(leaderService).isLeader();
callSchedule();
- List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync"));
+ List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("send"));
Assert.assertEquals(invocations.size(), 3);
invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value",
Object.class));
- byte[] send = (byte[]) invocations.get(0).getRawArguments()[0];
- Assignment assignments = Assignment.parseFrom(send);
- log.info("assignments: {}", assignments);
+ for (int i = 0; i < invocations.size(); i++) {
+ Invocation invocation = invocations.get(i);
+ byte[] send = (byte[]) invocation.getRawArguments()[0];
+ Assignment assignment = Assignment.parseFrom(send);
+ Assignment expectedAssignment = Function.Assignment.newBuilder()
+ .setWorkerId("worker-1")
+ .setInstance(Function.Instance.newBuilder()
+ .setFunctionMetaData(function2).setInstanceId(i).build())
+ .build();
+ Assert.assertEquals(assignment, expectedAssignment);
+ }
Set<Assignment> allAssignments = Sets.newHashSet();
invocations.forEach(invocation -> {
@@ -544,6 +576,12 @@ public class SchedulerManagerTest {
assertTrue(allAssignments.contains(assignment2_2));
assertTrue(allAssignments.contains(assignment2_3));
+ // make sure we also directly add the assignment to the in memory assignment cache in function runtime manager
+ verify(functionRuntimeManager, times(3)).processAssignment(any());
+ verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment2_1));
+ verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment2_2));
+ verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment2_3));
+
// updating assignments
currentAssignments.get("worker-1").put(FunctionCommon.getFullyQualifiedInstanceId(assignment2_1.getInstance()), assignment2_1);
currentAssignments.get("worker-1").put(FunctionCommon.getFullyQualifiedInstanceId(assignment2_2.getInstance()), assignment2_2);
@@ -568,7 +606,7 @@ public class SchedulerManagerTest {
callSchedule();
- invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync"));
+ invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("send"));
Assert.assertEquals(invocations.size(), 6);
invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value",
Object.class));
@@ -583,6 +621,14 @@ public class SchedulerManagerTest {
});
assertTrue(allAssignments2.contains(assignment2Scaled));
+
+ // make sure we also directly removed the assignment from the in memory assignment cache in function runtime manager
+ verify(functionRuntimeManager, times(2)).deleteAssignment(anyString());
+ verify(functionRuntimeManager, times(1)).deleteAssignment(eq(FunctionCommon.getFullyQualifiedInstanceId(assignment2_2.getInstance())));
+ verify(functionRuntimeManager, times(1)).deleteAssignment(eq(FunctionCommon.getFullyQualifiedInstanceId(assignment2_2.getInstance())));
+
+ verify(functionRuntimeManager, times(4)).processAssignment(any());
+ verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment2Scaled));
}
@Test
@@ -621,11 +667,11 @@ public class SchedulerManagerTest {
doReturn(workerInfoList).when(membershipManager).getCurrentMembership();
// i am leader
- doReturn(true).when(membershipManager).isLeader();
+ doReturn(true).when(leaderService).isLeader();
callSchedule();
- List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync"));
+ List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("send"));
Assert.assertEquals(invocations.size(), 2);
invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value",
Object.class));
@@ -683,7 +729,7 @@ public class SchedulerManagerTest {
doReturn(workerInfoList).when(membershipManager).getCurrentMembership();
// i am leader
- doReturn(true).when(membershipManager).isLeader();
+ doReturn(true).when(leaderService).isLeader();
callSchedule();
@@ -703,7 +749,7 @@ public class SchedulerManagerTest {
.setFunctionMetaData(function2).setInstanceId(2).build())
.build();
- List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync"));
+ List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("send"));
Assert.assertEquals(invocations.size(), 3);
invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value",
Object.class));
@@ -722,6 +768,12 @@ public class SchedulerManagerTest {
assertTrue(allAssignments.contains(assignment2_2));
assertTrue(allAssignments.contains(assignment2_3));
+ // make sure we also directly add the assignment to the in memory assignment cache in function runtime manager
+ verify(functionRuntimeManager, times(3)).processAssignment(any());
+ verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment2_1));
+ verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment2_2));
+ verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment2_3));
+
// updating assignments
currentAssignments.get("worker-1").put(FunctionCommon.getFullyQualifiedInstanceId(assignment2_1.getInstance()), assignment2_1);
currentAssignments.get("worker-1").put(FunctionCommon.getFullyQualifiedInstanceId(assignment2_2.getInstance()), assignment2_2);
@@ -757,7 +809,7 @@ public class SchedulerManagerTest {
callSchedule();
- invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync"));
+ invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("send"));
Assert.assertEquals(invocations.size(), 6);
invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value",
Object.class));
@@ -774,10 +826,16 @@ public class SchedulerManagerTest {
assertTrue(allAssignments2.contains(assignment2Updated1));
assertTrue(allAssignments2.contains(assignment2Updated2));
assertTrue(allAssignments2.contains(assignment2Updated3));
+
+ // make sure we also directly updated the assignment to the in memory assignment cache in function runtime manager
+ verify(functionRuntimeManager, times(6)).processAssignment(any());
+ verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment2Updated1));
+ verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment2Updated2));
+ verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment2Updated3));
}
@Test
- public void testAssignmentWorkerDoesNotExist() throws InterruptedException, NoSuchMethodException, TimeoutException, ExecutionException, InvalidProtocolBufferException {
+ public void testAssignmentWorkerDoesNotExist() throws InterruptedException, NoSuchMethodException, TimeoutException, ExecutionException {
List<Function.FunctionMetaData> functionMetaDataList = new LinkedList<>();
long version = 5;
Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder()
@@ -827,15 +885,15 @@ public class SchedulerManagerTest {
doReturn(workerInfoList).when(membershipManager).getCurrentMembership();
// i am leader
- doReturn(true).when(membershipManager).isLeader();
+ doReturn(true).when(leaderService).isLeader();
callSchedule();
- List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync"));
+ List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("send"));
Assert.assertEquals(invocations.size(), 0);
}
- private void callSchedule() throws NoSuchMethodException, InterruptedException,
+ private void callSchedule() throws InterruptedException,
TimeoutException, ExecutionException {
Future<?> complete = schedulerManager.schedule();