You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2020/06/19 21:40:40 UTC

[pulsar] branch master updated: Fix leader/scheduler assignment processing lag problem (#7237)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 68877f8  Fix leader/scheduler assignment processing lag problem (#7237)
68877f8 is described below

commit 68877f8947ce5ed28f09152cbf91ea0b0ecafb87
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Fri Jun 19 14:40:26 2020 -0700

    Fix leader/scheduler assignment processing lag problem (#7237)
    
    * Fix leader/scheduler assignment processing lag problem
    
    * add license header
    
    * adding more comments
    
    * improving impl
    
    * fixing bugs
    
    * improving impl
    
    * fixing tests
    
    * adding comments
    
    * add more testing
    
    * addressing comments
    
    * cleaning up
    
    * refactoring implementation
    
    * addressing comments
    
    Co-authored-by: Jerry Peng <je...@splunk.com>
---
 .../org/apache/pulsar/PulsarStandaloneStarter.java |   2 +-
 .../worker/ClusterServiceCoordinator.java          |   9 +-
 .../functions/worker/FunctionAssignmentTailer.java | 170 ++++++---
 .../functions/worker/FunctionMetaDataManager.java  |  13 +-
 .../worker/FunctionMetaDataTopicTailer.java        |   4 +-
 .../functions/worker/FunctionRuntimeManager.java   |  52 ++-
 .../pulsar/functions/worker/LeaderService.java     | 134 +++++++
 .../pulsar/functions/worker/MembershipManager.java |  75 +---
 .../pulsar/functions/worker/SchedulerManager.java  | 232 +++++++----
 .../pulsar/functions/worker/WorkerService.java     | 157 +++++---
 .../pulsar/functions/worker/WorkerUtils.java       |  16 +
 .../worker/ClusterServiceCoordinatorTest.java      |  14 +-
 .../worker/FunctionAssignmentTailerTest.java       | 422 +++++++++++++++++++++
 .../worker/FunctionRuntimeManagerTest.java         | 128 -------
 .../pulsar/functions/worker/LeaderServiceTest.java | 152 ++++++++
 .../functions/worker/MembershipManagerTest.java    |  43 ---
 .../functions/worker/SchedulerManagerTest.java     | 120 ++++--
 17 files changed, 1256 insertions(+), 487 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
index e5e9b45..9da388b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
@@ -102,7 +102,7 @@ public class PulsarStandaloneStarter extends PulsarStandalone {
                         bkEnsemble.stop();
                     }
                 } catch (Exception e) {
-                    log.error("Shutdown failed: {}", e.getMessage());
+                    log.error("Shutdown failed: {}", e.getMessage(), e);
                 }
             }
         });
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java
index 419f65a..c2bde9d 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java
@@ -48,11 +48,11 @@ public class ClusterServiceCoordinator implements AutoCloseable {
     private final String workerId;
     private final Map<String, TimerTaskInfo> tasks = new HashMap<>();
     private final ScheduledExecutorService executor;
-    private final MembershipManager membershipManager;
+    private final LeaderService leaderService;
 
-    public ClusterServiceCoordinator(String workerId, MembershipManager membershipManager) {
+    public ClusterServiceCoordinator(String workerId, LeaderService leaderService) {
         this.workerId = workerId;
-        this.membershipManager = membershipManager;
+        this.leaderService = leaderService;
         this.executor = Executors.newSingleThreadScheduledExecutor(
             new ThreadFactoryBuilder().setNameFormat("cluster-service-coordinator-timer").build());
     }
@@ -62,11 +62,12 @@ public class ClusterServiceCoordinator implements AutoCloseable {
     }
 
     public void start() {
+        log.info("/** Starting cluster service coordinator **/");
         for (Map.Entry<String, TimerTaskInfo> entry : this.tasks.entrySet()) {
             TimerTaskInfo timerTaskInfo = entry.getValue();
             String taskName = entry.getKey();
             this.executor.scheduleAtFixedRate(() -> {
-                boolean isLeader = membershipManager.isLeader();
+                boolean isLeader = leaderService.isLeader();
                 if (isLeader) {
                     try {
                         timerTaskInfo.getTask().run();
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
index 3038044..e1d9c1f 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
@@ -25,95 +25,153 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
-import org.apache.pulsar.functions.proto.Function.Assignment;
 
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
+/**
+ * This class is responsible for reading assignments from the 'assignments' functions internal topic.
+ * Only functions worker leader writes to the topic while other workers read from the topic.
+ * When a worker become a leader, the worker will read to the end of the assignments topic and close its reader to the topic.
+ * Then the worker and new leader will be in charge of computing new assignments when necessary.
+ * The leader does not need to listen to the assignments topic because it can just update its in memory assignments map directly
+ * after it computes a new scheduling.  When a worker loses leadership, the worker is start reading from the assignments topic again.
+ */
 @Slf4j
 public class FunctionAssignmentTailer implements AutoCloseable {
 
     private final FunctionRuntimeManager functionRuntimeManager;
-    @Getter
-    private final Reader<byte[]> reader;
+    private final ReaderBuilder readerBuilder;
+    private final WorkerConfig workerConfig;
+    private final ErrorNotifier errorNotifier;
+    private Reader<byte[]> reader;
     private volatile boolean isRunning = false;
+    private volatile boolean exitOnEndOfTopic = false;
+    private CompletableFuture<Void> exitFuture;
+    private Thread tailerThread;
 
-    private final Thread tailerThread;
+    @Getter
+    private MessageId lastMessageId = null;
     
     public FunctionAssignmentTailer(
             FunctionRuntimeManager functionRuntimeManager,
             ReaderBuilder readerBuilder,
             WorkerConfig workerConfig,
-            ErrorNotifier errorNotifier) throws PulsarClientException {
+            ErrorNotifier errorNotifier) {
         this.functionRuntimeManager = functionRuntimeManager;
-        
-        this.reader = readerBuilder
-          .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-runtime-manager")
-          .readerName(workerConfig.getWorkerId() + "-function-runtime-manager")
-          .topic(workerConfig.getFunctionAssignmentTopic())
-          .readCompacted(true)
-          .startMessageId(MessageId.earliest)
-          .create();
-        
-        this.tailerThread = new Thread(() -> {
-            while(isRunning) {
-                try {
-                    Message<byte[]> msg = reader.readNext();
-                    processAssignment(msg);
-                } catch (Throwable th) {
-                    if (isRunning) {
-                        log.error("Encountered error in assignment tailer", th);
-                        // trigger fatal error
-                        isRunning = false;
-                        errorNotifier.triggerError(th);
-                    } else {
-                        if (!(th instanceof InterruptedException || th.getCause() instanceof InterruptedException)) {
-                            log.warn("Encountered error when assignment tailer is not running", th);
-                        }
-                    }
+        this.exitFuture = new CompletableFuture<>();
+        this.readerBuilder = readerBuilder;
+        this.workerConfig = workerConfig;
+        this.errorNotifier = errorNotifier;
+    }
 
-                }
+    public synchronized CompletableFuture<Void> triggerReadToTheEndAndExit() {
+        exitOnEndOfTopic = true;
+        return this.exitFuture;
+    }
+
+    public void startFromMessage(MessageId startMessageId) throws PulsarClientException {
+        if (!isRunning) {
+            isRunning = true;
+            if (reader == null) {
+                reader = createReader(startMessageId);
             }
-        });
-        this.tailerThread.setName("assignment-tailer-thread");
+            if (tailerThread == null || !tailerThread.isAlive()) {
+                tailerThread = getTailerThread();
+            }
+            exitFuture = new CompletableFuture<>();
+            tailerThread.start();
+        }
     }
 
-    public void start() {
-        isRunning = true;
-        tailerThread.start();
+    public synchronized void start() throws PulsarClientException {
+        MessageId startMessageId = lastMessageId == null ? MessageId.earliest : lastMessageId;
+        startFromMessage(startMessageId);
     }
 
     @Override
-    public void close() {
-        log.info("Stopping function assignment tailer");
+    public synchronized void close() {
+        log.info("Closing function assignment tailer");
         try {
             isRunning = false;
-            if (tailerThread != null && tailerThread.isAlive()) {
-                tailerThread.interrupt();
+
+            if (tailerThread != null) {
+                while (true) {
+                    tailerThread.interrupt();
+
+                    try {
+                        tailerThread.join(5000, 0);
+                    } catch (InterruptedException e) {
+                        log.warn("Waiting for assignment tailer thread to stop is interrupted", e);
+                    }
+
+                    if (tailerThread.isAlive()) {
+                        log.warn("Assignment tailer thread is still alive.  Will attempt to interrupt again.");
+                    } else {
+                        break;
+                    }
+                }
+                tailerThread = null;
             }
             if (reader != null) {
                 reader.close();
+                reader = null;
             }
+
+            exitFuture = null;
+            exitOnEndOfTopic = false;
+            
         } catch (IOException e) {
             log.error("Failed to stop function assignment tailer", e);
         }
-        log.info("Stopped function assignment tailer");
+    }
+    
+    private Reader<byte[]> createReader(MessageId startMessageId) throws PulsarClientException {
+        log.info("Assignment tailer will start reading from message id {}", startMessageId);
+
+        return WorkerUtils.createReader(
+                readerBuilder,
+                workerConfig.getWorkerId() + "-function-assignment-tailer",
+                workerConfig.getFunctionAssignmentTopic(),
+                startMessageId);
     }
 
-    public void processAssignment(Message<byte[]> msg) {
-
-        if(msg.getData()==null || (msg.getData().length==0)) {
-            log.info("Received assignment delete: {}", msg.getKey());
-            this.functionRuntimeManager.deleteAssignment(msg.getKey());
-        } else {
-            Assignment assignment;
-            try {
-                assignment = Assignment.parseFrom(msg.getData());
-            } catch (IOException e) {
-                log.error("[{}] Received bad assignment update at message {}", reader.getTopic(), msg.getMessageId(), e);
-                throw new RuntimeException(e);
+    private Thread getTailerThread() {
+        Thread t = new Thread(() -> {
+            while (isRunning) {
+                try {
+                    Message<byte[]> msg = reader.readNext(5, TimeUnit.SECONDS);
+                    if (msg == null) {
+                        if (exitOnEndOfTopic && !reader.hasMessageAvailable()) {
+                            break;
+                        }
+                    } else {
+                        functionRuntimeManager.processAssignmentMessage(msg);
+                        // keep track of last message id
+                        lastMessageId = msg.getMessageId();
+                    }
+                } catch (Throwable th) {
+                    if (isRunning) {
+                        log.error("Encountered error in assignment tailer", th);
+                        // trigger fatal error
+                        isRunning = false;
+                        errorNotifier.triggerError(th);
+                    } else {
+                        if (!(th instanceof InterruptedException || th.getCause() instanceof InterruptedException)) {
+                            log.warn("Encountered error when assignment tailer is not running", th);
+                        }
+                    }
+                }
             }
-            log.info("Received assignment update: {}", assignment);
-            this.functionRuntimeManager.processAssignment(assignment);
-        }
+            log.info("tailer thread exiting");
+            exitFuture.complete(null);
+        });
+        t.setName("assignment-tailer-thread");
+        return t;
+    }
+
+    Thread getThread() {
+        return tailerThread;
     }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
index 80f577d..6496f6e 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
@@ -89,7 +89,6 @@ public class FunctionMetaDataManager implements AutoCloseable {
      * 1. Consume all existing function meta data upon start to establish existing state
      */
     public void initialize() {
-        log.info("/** Initializing Function Metadata Manager **/");
         try {
             this.functionMetaDataTopicTailer = new FunctionMetaDataTopicTailer(this,
                     pulsarClient.newReader(), this.workerConfig, this.errorNotifier);
@@ -99,16 +98,20 @@ public class FunctionMetaDataManager implements AutoCloseable {
                 this.functionMetaDataTopicTailer.processRequest(this.functionMetaDataTopicTailer.getReader().readNext());
             }
             this.setInitializePhase(false);
-            // schedule functions if necessary
-            this.schedulerManager.schedule();
-            // start function metadata tailer
-            this.functionMetaDataTopicTailer.start();
+            
 
         } catch (Exception e) {
             log.error("Failed to initialize meta data store", e);
             throw new RuntimeException(e);
         }
     }
+    
+    public void start() {
+        // schedule functions if necessary
+        this.schedulerManager.schedule();
+        // start function metadata tailer
+        this.functionMetaDataTopicTailer.start();
+    }
 
     /**
      * Get the function metadata for a function
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
index 55a67a6..f0626ce 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
@@ -48,8 +48,8 @@ public class FunctionMetaDataTopicTailer
         this.reader = readerBuilder
                 .topic(workerConfig.getFunctionMetadataTopic())
                 .startMessageId(MessageId.earliest)
-                .readerName(workerConfig.getWorkerId() + "-function-metadata-manager")
-                .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-metadata-manager")
+                .readerName(workerConfig.getWorkerId() + "-function-metadata-tailer")
+                .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-metadata-tailer")
                 .create();
         readerThread = new Thread(this);
         readerThread.setName("function-metadata-tailer-thread");
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index bc1ca53..bdde8a8 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -26,6 +26,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.common.functions.WorkerInfo;
@@ -53,6 +54,7 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriBuilder;
+import java.io.IOException;
 import java.net.URI;
 import java.util.*;
 import java.util.Map.Entry;
@@ -109,8 +111,6 @@ public class FunctionRuntimeManager implements AutoCloseable{
     @Getter
     final WorkerConfig workerConfig;
 
-    private FunctionAssignmentTailer functionAssignmentTailer;
-
     @Setter
     @Getter
     private FunctionActioner functionActioner;
@@ -130,7 +130,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
     private final FunctionMetaDataManager functionMetaDataManager;
 
     private final ErrorNotifier errorNotifier;
-    
+
     public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerService, Namespace dlogNamespace,
                                   MembershipManager membershipManager, ConnectorsManager connectorsManager, FunctionsManager functionsManager,
                                   FunctionMetaDataManager functionMetaDataManager, ErrorNotifier errorNotifier) throws Exception {
@@ -210,21 +210,23 @@ public class FunctionRuntimeManager implements AutoCloseable{
      * 2. After current assignments are read, assignments belonging to this worker will be processed
      */
     public void initialize() {
-        log.info("/** Initializing Runtime Manager **/");
         try {
-            this.functionAssignmentTailer = new FunctionAssignmentTailer(
-                    this,
-                    this.getWorkerService().getClient().newReader(),
-                    this.workerConfig,
-                    this.errorNotifier);
+            Reader<byte[]> reader = WorkerUtils.createReader(
+                    workerService.getClient().newReader(),
+                    workerConfig.getWorkerId() + "-function-assignment-initialize",
+                    workerConfig.getFunctionAssignmentTopic(),
+                    MessageId.earliest);
+
             // start init phase
             this.isInitializePhase = true;
             // read all existing messages
-            while (this.functionAssignmentTailer.getReader().hasMessageAvailable()) {
-                this.functionAssignmentTailer.processAssignment(this.functionAssignmentTailer.getReader().readNext());
+            while (reader.hasMessageAvailable()) {
+                processAssignmentMessage(reader.readNext());
             }
             // init phase is done
             this.isInitializePhase = false;
+            // close reader
+            reader.close();
             // realize existing assignments
             Map<String, Assignment> assignmentMap = workerIdToAssignments.get(this.workerConfig.getWorkerId());
             if (assignmentMap != null) {
@@ -243,14 +245,6 @@ public class FunctionRuntimeManager implements AutoCloseable{
     /**
      * Starts the function runtime manager
      */
-    public void start() {
-        log.info("/** Starting Function Runtime Manager **/");
-        this.functionAssignmentTailer.start();
-    }
-
-    /**
-     * Public methods
-     */
 
     /**
      * Get current assignments
@@ -611,6 +605,25 @@ public class FunctionRuntimeManager implements AutoCloseable{
         return functionStats.calculateOverall();
     }
 
+
+    public synchronized void processAssignmentMessage(Message<byte[]> msg) {
+
+        if(msg.getData()==null || (msg.getData().length==0)) {
+            log.info("Received assignment delete: {}", msg.getKey());
+            deleteAssignment(msg.getKey());
+        } else {
+            Assignment assignment;
+            try {
+                assignment = Assignment.parseFrom(msg.getData());
+            } catch (IOException e) {
+                log.error("[{}] Received bad assignment update at message {}", msg.getMessageId(), e);
+                throw new RuntimeException(e);
+            }
+            log.info("Received assignment update: {}", assignment);
+            processAssignment(assignment);
+        }
+    }
+
     /**
      * Process an assignment update from the assignment topic
      * @param newAssignment the assignment
@@ -835,7 +848,6 @@ public class FunctionRuntimeManager implements AutoCloseable{
 
     @Override
     public void close() throws Exception {
-        this.functionAssignmentTailer.close();
 
         stopAllOwnedFunctions();
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
new file mode 100644
index 0000000..3d7ec92
--- /dev/null
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerEventListener;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@Slf4j
+public class LeaderService implements AutoCloseable, ConsumerEventListener {
+
+    private final String consumerName;
+    private final FunctionAssignmentTailer functionAssignmentTailer;
+    private final ErrorNotifier errorNotifier;
+    private final SchedulerManager schedulerManager;
+    private ConsumerImpl<byte[]> consumer;
+    private final WorkerConfig workerConfig;
+    private final PulsarClient pulsarClient;
+    private final AtomicBoolean isLeader = new AtomicBoolean(false);
+
+    static final String COORDINATION_TOPIC_SUBSCRIPTION = "participants";
+
+    private static String WORKER_IDENTIFIER = "id";
+    
+    public LeaderService(WorkerService workerService,
+                         PulsarClient pulsarClient,
+                         FunctionAssignmentTailer functionAssignmentTailer,
+                         SchedulerManager schedulerManager,
+                         ErrorNotifier errorNotifier) {
+        this.workerConfig = workerService.getWorkerConfig();
+        this.pulsarClient = pulsarClient;
+        this.functionAssignmentTailer = functionAssignmentTailer;
+        this.schedulerManager = schedulerManager;
+        this.errorNotifier = errorNotifier;
+        consumerName = String.format(
+                "%s:%s:%d",
+                workerConfig.getWorkerId(),
+                workerConfig.getWorkerHostname(),
+                workerConfig.getWorkerPort()
+        );
+
+    }
+
+    public void start() throws PulsarClientException {
+        // the leaders service is using a `coordination` topic for leader election.
+        // we don't produce any messages into this topic, we only use the `failover` subscription
+        // to elect an active consumer as the leader worker. The leader worker will be responsible
+        // for scheduling snapshots for FMT and doing task assignment.
+        consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
+                .topic(workerConfig.getClusterCoordinationTopic())
+                .subscriptionName(COORDINATION_TOPIC_SUBSCRIPTION)
+                .subscriptionType(SubscriptionType.Failover)
+                .consumerEventListener(this)
+                .property(WORKER_IDENTIFIER, consumerName)
+                .consumerName(consumerName)
+                .subscribe();
+    }
+
+    @Override
+    public synchronized void becameActive(Consumer<?> consumer, int partitionId) {
+        if (isLeader.compareAndSet(false, true)) {
+            log.info("Worker {} became the leader.", consumerName);
+            try {
+                // trigger read to the end of the topic and exit
+                // Since the leader can just update its in memory assignments cache directly
+                functionAssignmentTailer.triggerReadToTheEndAndExit().get();
+                functionAssignmentTailer.close();
+
+                // make sure scheduler is initialized because this worker
+                // is the leader and may need to start computing and writing assignments
+                schedulerManager.initialize();
+            } catch (Throwable th) {
+                log.error("Encountered error when initializing to become leader", th);
+                errorNotifier.triggerError(th);
+            }
+        }
+    }
+
+    @Override
+    public synchronized void becameInactive(Consumer<?> consumer, int partitionId) {
+        if (isLeader.compareAndSet(true, false)) {
+            log.info("Worker {} lost the leadership.", consumerName);
+            // when a worker has lost leadership it needs to start reading from the assignment topic again
+            try {
+                // stop scheduler manager since we are not the leader anymore
+                // will block if a schedule invocation is in process
+                schedulerManager.close();
+
+                // starting reading from assignment topic from the last published message of the scheduler
+                if (schedulerManager.getLastMessageProduced() == null) {
+                    functionAssignmentTailer.start();
+                } else {
+                    functionAssignmentTailer.startFromMessage(schedulerManager.getLastMessageProduced());
+                }
+            } catch (Throwable th) {
+                log.error("Encountered error in routine when worker lost leadership", th);
+                errorNotifier.triggerError(th);
+            }
+        }
+    }
+
+    public boolean isLeader() {
+        return isLeader.get();
+    }
+
+    @Override
+    public void close() throws PulsarClientException {
+        if (consumer != null) {
+            consumer.close();
+        }
+    }
+}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
index 50a2f97..749f3a3 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
@@ -54,14 +54,10 @@ import static org.apache.pulsar.functions.worker.SchedulerManager.checkHeartBeat
  * A simple implementation of leader election using a pulsar topic.
  */
 @Slf4j
-public class MembershipManager implements AutoCloseable, ConsumerEventListener {
+public class MembershipManager implements AutoCloseable {
 
-    private final String consumerName;
-    private final ConsumerImpl<byte[]> consumer;
     private final WorkerConfig workerConfig;
     private PulsarAdmin pulsarAdmin;
-    private final CompletableFuture<Void> firstConsumerEventFuture;
-    private final AtomicBoolean isLeader = new AtomicBoolean();
 
     static final String COORDINATION_TOPIC_SUBSCRIPTION = "participants";
 
@@ -72,50 +68,9 @@ public class MembershipManager implements AutoCloseable, ConsumerEventListener {
     @VisibleForTesting
     Map<Function.Instance, Long> unsignedFunctionDurations = new HashMap<>();
 
-    MembershipManager(WorkerService service, PulsarClient client, PulsarAdmin pulsarAdmin)
-            throws PulsarClientException {
-        this.workerConfig = service.getWorkerConfig();
+    MembershipManager(WorkerService workerService, PulsarClient pulsarClient, PulsarAdmin pulsarAdmin) {
+        this.workerConfig = workerService.getWorkerConfig();
         this.pulsarAdmin = pulsarAdmin;
-        consumerName = String.format(
-            "%s:%s:%d",
-            workerConfig.getWorkerId(),
-            workerConfig.getWorkerHostname(),
-            workerConfig.getWorkerPort()
-        );
-        firstConsumerEventFuture = new CompletableFuture<>();
-        // the membership manager is using a `coordination` topic for leader election.
-        // we don't produce any messages into this topic, we only use the `failover` subscription
-        // to elect an active consumer as the leader worker. The leader worker will be responsible
-        // for scheduling snapshots for FMT and doing task assignment.
-        consumer = (ConsumerImpl<byte[]>) client.newConsumer()
-                .topic(workerConfig.getClusterCoordinationTopic())
-                .subscriptionName(COORDINATION_TOPIC_SUBSCRIPTION)
-                .subscriptionType(SubscriptionType.Failover)
-                .consumerEventListener(this)
-                .property(WORKER_IDENTIFIER, consumerName)
-                .subscribe();
-        
-        isLeader.set(checkLeader(service, consumer.getConsumerName()));
-    }
-
-    @Override
-    public void becameActive(Consumer<?> consumer, int partitionId) {
-        firstConsumerEventFuture.complete(null);
-        if (isLeader.compareAndSet(false, true)) {
-            log.info("Worker {} became the leader.", consumerName);
-        }
-    }
-
-    @Override
-    public void becameInactive(Consumer<?> consumer, int partitionId) {
-        firstConsumerEventFuture.complete(null);
-        if (isLeader.compareAndSet(true, false)) {
-            log.info("Worker {} lost the leadership.", consumerName);
-        }
-    }
-
-    public boolean isLeader() {
-        return isLeader.get();
     }
 
     public List<WorkerInfo> getCurrentMembership() {
@@ -163,8 +118,8 @@ public class MembershipManager implements AutoCloseable, ConsumerEventListener {
     }
 
     @Override
-    public void close() throws PulsarClientException {
-        consumer.close();
+    public void close() {
+
     }
 
     public void checkFailures(FunctionMetaDataManager functionMetaDataManager,
@@ -274,24 +229,4 @@ public class MembershipManager implements AutoCloseable, ConsumerEventListener {
             schedulerManager.schedule();
         }
     }
-
-    /**
-     * Private methods
-     */
-
-    private boolean checkLeader(WorkerService service, String consumerName) {
-        try {
-            TopicStats stats = service.getBrokerAdmin().topics()
-                    .getStats(service.getWorkerConfig().getClusterCoordinationTopic());
-            String activeConsumerName = stats != null
-                    && stats.subscriptions.get(COORDINATION_TOPIC_SUBSCRIPTION) != null
-                            ? stats.subscriptions.get(COORDINATION_TOPIC_SUBSCRIPTION).activeConsumerName
-                            : null;
-            return consumerName != null && consumerName.equalsIgnoreCase(activeConsumerName);
-        } catch (Exception e) {
-            log.warn("Failed to check leader {}", e.getMessage());
-        }
-        return false;
-    }
-    
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index 9c93443..33213a8 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -18,52 +18,74 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.Assignment;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
 import org.apache.pulsar.functions.proto.Function.Instance;
 import org.apache.pulsar.functions.utils.Actions;
-import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.worker.scheduler.IScheduler;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-
-import lombok.Setter;
-import lombok.extern.slf4j.Slf4j;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
 
 @Slf4j
+/**
+ * The scheduler manager is used to compute scheduling of function instances
+ * Only the leader computes new schedulings and writes assignments to the assignment topic
+ * The lifecyle of this class is the following:
+ *  1. When worker becomes leader, this class with me initialized
+ *  2. When worker loses leadership, this class will be closed which also closes the worker's producer to the assignments topic
+ */
 public class SchedulerManager implements AutoCloseable {
 
     private final WorkerConfig workerConfig;
+    private final ErrorNotifier errorNotifier;
+    private ThreadPoolExecutor executorService;
+    private final PulsarClient pulsarClient;
 
     @Setter
     private FunctionMetaDataManager functionMetaDataManager;
 
     @Setter
+    private LeaderService leaderService;
+
+    @Setter
     private MembershipManager membershipManager;
 
     @Setter
@@ -71,27 +93,36 @@ public class SchedulerManager implements AutoCloseable {
 
     private final IScheduler scheduler;
 
-    private final Producer<byte[]> producer;
+    private Producer<byte[]> producer;
 
-    private final ScheduledExecutorService executorService;
+    private ScheduledExecutorService scheduledExecutorService;
     
     private final PulsarAdmin admin;
-    
+
+    @Getter
+    private Lock schedulerLock = new ReentrantLock(true);
+
+    private volatile boolean isRunning = false;
+
     AtomicBoolean isCompactionNeeded = new AtomicBoolean(false);
     private static final long DEFAULT_ADMIN_API_BACKOFF_SEC = 60; 
     public static final String HEARTBEAT_TENANT = "pulsar-function";
     public static final String HEARTBEAT_NAMESPACE = "heartbeat";
 
-    public SchedulerManager(WorkerConfig workerConfig, PulsarClient pulsarClient, PulsarAdmin admin, ScheduledExecutorService executor) {
+    @Getter
+    private MessageId lastMessageProduced = null;
+
+    public SchedulerManager(WorkerConfig workerConfig,
+                            PulsarClient pulsarClient,
+                            PulsarAdmin admin,
+                            ErrorNotifier errorNotifier) {
         this.workerConfig = workerConfig;
+        this.pulsarClient = pulsarClient;
         this.admin = admin;
         this.scheduler = Reflections.createInstance(workerConfig.getSchedulerClassName(), IScheduler.class,
                 Thread.currentThread().getContextClassLoader());
-
-        this.producer = createProducer(pulsarClient, workerConfig);
-        this.executorService = executor;
+        this.errorNotifier = errorNotifier;
         
-        scheduleCompaction(executor, workerConfig.getTopicCompactionFrequencySec());
     }
 
     private static Producer<byte[]> createProducer(PulsarClient client, WorkerConfig config) {
@@ -136,26 +167,58 @@ public class SchedulerManager implements AutoCloseable {
         return producer.get();
     }
 
+    public synchronized void initialize() {
+        if (!isRunning) {
+            log.info("Initializing scheduler manager");
+            executorService = new ThreadPoolExecutor(1, 5, 0L, TimeUnit.MILLISECONDS,
+                    new LinkedBlockingQueue<>(5));
+            executorService.setThreadFactory(new ThreadFactoryBuilder().setNameFormat("worker-scheduler-%d").build());
+            scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("worker-assignment-topic-compactor"));
+            scheduleCompaction(this.scheduledExecutorService, workerConfig.getTopicCompactionFrequencySec());
+
+            producer = createProducer(pulsarClient, workerConfig);
+            isRunning = true;
+            lastMessageProduced = null;
+        }
+    }
+
     public Future<?> schedule() {
-        return executorService.submit(() -> {
-            synchronized (SchedulerManager.this) {
-                boolean isLeader = membershipManager.isLeader();
-                if (isLeader) {
-                    try {
-                        invokeScheduler();
-                    } catch (Exception e) {
-                        log.warn("Failed to invoke scheduler", e);
-                        throw e;
+        if (!leaderService.isLeader()) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        // make sure we are initialized before scheduling
+        initialize();
+
+        try {
+            return executorService.submit(() -> {
+                try {
+                    schedulerLock.lock();
+
+                    boolean isLeader = leaderService.isLeader();
+                    if (isLeader) {
+                        try {
+                            invokeScheduler();
+                        } catch (Throwable th) {
+                            log.error("Encountered error when invoking scheduler", th);
+                            errorNotifier.triggerError(th);
+                        }
                     }
+                } finally {
+                    schedulerLock.unlock();
                 }
-            }
-        });
+            });
+        } catch (RejectedExecutionException e) {
+            // task queue is full so just ignore
+            log.debug("Rejected task to invoke scheduler since task queue is already full");
+            return CompletableFuture.completedFuture(null);
+        }
     }
 
     private void scheduleCompaction(ScheduledExecutorService executor, long scheduleFrequencySec) {
         if (executor != null) {
             executor.scheduleWithFixedDelay(() -> {
-                if (membershipManager.isLeader() && isCompactionNeeded.get()) {
+                if (leaderService.isLeader() && isCompactionNeeded.get()) {
                     compactAssignmentTopic();
                     isCompactionNeeded.set(false);
                 }
@@ -164,20 +227,21 @@ public class SchedulerManager implements AutoCloseable {
     }
     
     @VisibleForTesting
-    public void invokeScheduler() {
+    void invokeScheduler() {
         
-        Set<String> currentMembership = this.membershipManager.getCurrentMembership()
+        Set<String> currentMembership = membershipManager.getCurrentMembership()
                 .stream().map(workerInfo -> workerInfo.getWorkerId()).collect(Collectors.toSet());
 
-        List<FunctionMetaData> allFunctions = this.functionMetaDataManager.getAllFunctionMetaData();
+        List<FunctionMetaData> allFunctions = functionMetaDataManager.getAllFunctionMetaData();
         Map<String, Function.Instance> allInstances = computeAllInstances(allFunctions, functionRuntimeManager.getRuntimeFactory().externallyManaged());
-        Map<String, Map<String, Assignment>> workerIdToAssignments = this.functionRuntimeManager
+        Map<String, Map<String, Assignment>> workerIdToAssignments = functionRuntimeManager
                 .getCurrentAssignments();
 
         //delete assignments of functions and instances that don't exist anymore
         Iterator<Map.Entry<String, Map<String, Assignment>>> it = workerIdToAssignments.entrySet().iterator();
         while (it.hasNext()) {
             Map.Entry<String, Map<String, Assignment>> workerIdToAssignmentEntry = it.next();
+            String workerId = workerIdToAssignmentEntry.getKey();
             Map<String, Assignment> functionMap = workerIdToAssignmentEntry.getValue();
 
             // remove instances that don't exist anymore
@@ -185,7 +249,14 @@ public class SchedulerManager implements AutoCloseable {
                 String fullyQualifiedInstanceId = entry.getKey();
                 boolean deleted = !allInstances.containsKey(fullyQualifiedInstanceId);
                 if (deleted) {
-                    publishNewAssignment(entry.getValue().toBuilder().build(), true);
+                    Assignment assignment = entry.getValue();
+                    MessageId messageId = publishNewAssignment(assignment.toBuilder().build(), true);
+                    
+                    // Directly update in memory assignment cache since I am leader
+                    log.info("Deleting assignment: {}", assignment);
+                    functionRuntimeManager.deleteAssignment(fullyQualifiedInstanceId);
+                    // update message id associated with current view of assignments map
+                    lastMessageProduced = messageId;
                 }
                 return deleted;
             });
@@ -198,11 +269,18 @@ public class SchedulerManager implements AutoCloseable {
 
                 if (!assignment.getInstance().equals(instance)) {
                     functionMap.put(fullyQualifiedInstanceId, assignment.toBuilder().setInstance(instance).build());
-                    publishNewAssignment(assignment.toBuilder().setInstance(instance).build().toBuilder().build(), false);
+                    Assignment newAssignment = assignment.toBuilder().setInstance(instance).build().toBuilder().build();
+                    MessageId messageId = publishNewAssignment(newAssignment, false);
+
+                    // Directly update in memory assignment cache since I am leader
+                    log.info("Updating assignment: {}", assignment);
+                    functionRuntimeManager.processAssignment(newAssignment);
+                    // update message id associated with current view of assignments map
+                    lastMessageProduced = messageId;
+                }
+                if (functionMap.isEmpty()) {
+                    it.remove();
                 }
-            }
-            if (functionMap.isEmpty()) {
-                it.remove();
             }
         }
 
@@ -222,10 +300,10 @@ public class SchedulerManager implements AutoCloseable {
                 .flatMap(stringMapEntry -> stringMapEntry.getValue().values().stream())
                 .collect(Collectors.toList());
 
-        Pair<List<Function.Instance>, List<Assignment>> unassignedInstances = this.getUnassignedFunctionInstances(workerIdToAssignments,
-                allInstances);
+        Pair<List<Function.Instance>, List<Assignment>> unassignedInstances 
+                = getUnassignedFunctionInstances(workerIdToAssignments, allInstances);
 
-        List<Assignment> assignments = this.scheduler.schedule(unassignedInstances.getLeft(), currentAssignments, currentMembership);
+        List<Assignment> assignments = scheduler.schedule(unassignedInstances.getLeft(), currentAssignments, currentMembership);
         assignments.addAll(unassignedInstances.getRight());
 
         if (log.isDebugEnabled()) {
@@ -235,37 +313,43 @@ public class SchedulerManager implements AutoCloseable {
         isCompactionNeeded.set(!assignments.isEmpty());
 
         for(Assignment assignment : assignments) {
-            publishNewAssignment(assignment, false);
+            MessageId messageId = publishNewAssignment(assignment, false);
+
+            // Directly update in memory assignment cache since I am leader
+            log.info("Adding assignment: {}", assignment);
+            functionRuntimeManager.processAssignment(assignment);
+            // update message id associated with current view of assignments map
+            lastMessageProduced = messageId;
         }
         
     }
 
-    public void compactAssignmentTopic() {
+    private void compactAssignmentTopic() {
         if (this.admin != null) {
             try {
                 this.admin.topics().triggerCompaction(workerConfig.getFunctionAssignmentTopic());
             } catch (PulsarAdminException e) {
                 log.error("Failed to trigger compaction", e);
-                executorService.schedule(() -> compactAssignmentTopic(), DEFAULT_ADMIN_API_BACKOFF_SEC,
+                scheduledExecutorService.schedule(() -> compactAssignmentTopic(), DEFAULT_ADMIN_API_BACKOFF_SEC,
                         TimeUnit.SECONDS);
             }
         }
     }
 
-    private void publishNewAssignment(Assignment assignment, boolean deleted) {
+    private MessageId publishNewAssignment(Assignment assignment, boolean deleted) {
         try {
             String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
             // publish empty message with instance-id key so, compactor can delete and skip delivery of this instance-id
             // message
-            producer.newMessage().key(fullyQualifiedInstanceId)
-                    .value(deleted ? "".getBytes() : assignment.toByteArray()).sendAsync().get();
+            return producer.newMessage().key(fullyQualifiedInstanceId)
+                    .value(deleted ? "".getBytes() : assignment.toByteArray()).send();
         } catch (Exception e) {
             log.error("Failed to {} assignment update {}", assignment, deleted ? "send" : "deleted", e);
             throw new RuntimeException(e);
         }
     }
 
-    public static Map<String, Function.Instance> computeAllInstances(List<FunctionMetaData> allFunctions,
+    private static Map<String, Function.Instance> computeAllInstances(List<FunctionMetaData> allFunctions,
                                                                      boolean externallyManagedRuntime) {
         Map<String, Function.Instance> functionInstances = new HashMap<>();
         for (FunctionMetaData functionMetaData : allFunctions) {
@@ -276,7 +360,7 @@ public class SchedulerManager implements AutoCloseable {
         return functionInstances;
     }
 
-    public static List<Function.Instance> computeInstances(FunctionMetaData functionMetaData,
+    static List<Function.Instance> computeInstances(FunctionMetaData functionMetaData,
                                                            boolean externallyManagedRuntime) {
         List<Function.Instance> functionInstances = new LinkedList<>();
         if (!externallyManagedRuntime) {
@@ -323,15 +407,35 @@ public class SchedulerManager implements AutoCloseable {
     }
 
     @Override
-    public void close() {
+    public synchronized void close() {
+        log.info("Closing scheduler manager");
         try {
-            this.producer.close();
-        } catch (PulsarClientException e) {
-            log.warn("Failed to shutdown scheduler manager assignment producer", e);
+            // make sure we are not closing while a scheduling is being calculated
+            schedulerLock.lock();
+
+            isRunning = false;
+
+            if (scheduledExecutorService != null) {
+                scheduledExecutorService.shutdown();
+            }
+
+            if (executorService != null) {
+                executorService.shutdown();
+            }
+
+            if (producer != null) {
+                try {
+                    producer.close();
+                } catch (PulsarClientException e) {
+                    log.warn("Failed to shutdown scheduler manager assignment producer", e);
+                }
+            }
+        } finally {
+            schedulerLock.unlock();
         }
     }
     
-    public static String checkHeartBeatFunction(Instance funInstance) {
+    static String checkHeartBeatFunction(Instance funInstance) {
         if (funInstance.getFunctionMetaData() != null
                 && funInstance.getFunctionMetaData().getFunctionDetails() != null) {
             FunctionDetails funDetails = funInstance.getFunctionMetaData().getFunctionDetails();
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index e3ea170..532bc32 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -70,15 +70,15 @@ public class WorkerService {
     private PulsarAdmin brokerAdmin;
     private PulsarAdmin functionAdmin;
     private final MetricsGenerator metricsGenerator;
-    private final ScheduledExecutorService executor;
     @VisibleForTesting
     private URI dlogUri;
+    private LeaderService leaderService;
+    private FunctionAssignmentTailer functionAssignmentTailer;
 
     public WorkerService(WorkerConfig workerConfig) {
         this.workerConfig = workerConfig;
         this.statsUpdater = Executors
                 .newSingleThreadScheduledExecutor(new DefaultThreadFactory("worker-stats-updater"));
-        this.executor = Executors.newScheduledThreadPool(10, new DefaultThreadFactory("pulsar-worker"));
         this.metricsGenerator = new MetricsGenerator(this.statsUpdater, workerConfig);
     }
 
@@ -87,7 +87,7 @@ public class WorkerService {
                       AuthenticationService authenticationService,
                       AuthorizationService authorizationService,
                       ErrorNotifier errorNotifier) throws InterruptedException {
-        log.info("Starting worker {}...", workerConfig.getWorkerId());
+        log.info("/** Starting worker id={} **/", workerConfig.getWorkerId());
 
         try {
             log.info("Worker Configs: {}", new ObjectMapper().writerWithDefaultPrettyPrinter()
@@ -98,13 +98,13 @@ public class WorkerService {
 
         try {
             // create the dlog namespace for storing function packages
-            this.dlogUri = dlogUri;
+            dlogUri = dlogUri;
             DistributedLogConfiguration dlogConf = WorkerUtils.getDlogConf(workerConfig);
             try {
                 this.dlogNamespace = NamespaceBuilder.newBuilder()
                         .conf(dlogConf)
                         .clientId("function-worker-" + workerConfig.getWorkerId())
-                        .uri(this.dlogUri)
+                        .uri(dlogUri)
                         .build();
             } catch (Exception e) {
                 log.error("Failed to initialize dlog namespace {} for storing function packages",
@@ -146,7 +146,7 @@ public class WorkerService {
                     workerConfig.getTlsTrustCertsFilePath(), workerConfig.isTlsAllowInsecureConnection(),
                     workerConfig.isTlsHostnameVerificationEnable());
 
-                this.client = WorkerUtils.getPulsarClient(this.workerConfig.getPulsarServiceUrl(),
+                this.client = WorkerUtils.getPulsarClient(workerConfig.getPulsarServiceUrl(),
                         workerConfig.getClientAuthenticationPlugin(),
                         workerConfig.getClientAuthenticationParameters(),
                         workerConfig.isUseTls(), pulsarClientTlsTrustCertsFilePath,
@@ -156,16 +156,14 @@ public class WorkerService {
 
                 this.functionAdmin = WorkerUtils.getPulsarAdminClient(functionWebServiceUrl);
 
-                this.client = WorkerUtils.getPulsarClient(this.workerConfig.getPulsarServiceUrl());
+                this.client = WorkerUtils.getPulsarClient(workerConfig.getPulsarServiceUrl());
             }
-            log.info("Created Pulsar client");
 
             brokerAdmin.topics().createNonPartitionedTopic(workerConfig.getFunctionAssignmentTopic());
             brokerAdmin.topics().createNonPartitionedTopic(workerConfig.getClusterCoordinationTopic());
             brokerAdmin.topics().createNonPartitionedTopic(workerConfig.getFunctionMetadataTopic());
             //create scheduler manager
-            this.schedulerManager = new SchedulerManager(this.workerConfig, this.client, this.brokerAdmin,
-                    this.executor);
+            this.schedulerManager = new SchedulerManager(workerConfig, client, brokerAdmin, errorNotifier);
 
             //create function meta data manager
             this.functionMetaDataManager = new FunctionMetaDataManager(
@@ -179,52 +177,89 @@ public class WorkerService {
             if (!brokerAdmin.topics().getSubscriptions(coordinationTopic).contains(MembershipManager.COORDINATION_TOPIC_SUBSCRIPTION)) {
                 brokerAdmin.topics().createSubscription(coordinationTopic, MembershipManager.COORDINATION_TOPIC_SUBSCRIPTION, MessageId.earliest);
             }
-            this.membershipManager = new MembershipManager(this, this.client, this.brokerAdmin);
+            this.membershipManager = new MembershipManager(this, client, brokerAdmin);
+
 
             // create function runtime manager
             this.functionRuntimeManager = new FunctionRuntimeManager(
-                    this.workerConfig,
+                    workerConfig,
                     this,
-                    this.dlogNamespace,
-                    this.membershipManager,
+                    dlogNamespace,
+                    membershipManager,
                     connectorsManager,
                     functionsManager,
                     functionMetaDataManager,
                     errorNotifier);
 
-            // Setting references to managers in scheduler
-            this.schedulerManager.setFunctionMetaDataManager(this.functionMetaDataManager);
-            this.schedulerManager.setFunctionRuntimeManager(this.functionRuntimeManager);
-            this.schedulerManager.setMembershipManager(this.membershipManager);
+
+            // initialize function assignment tailer that reads from the assignment topic
+            this.functionAssignmentTailer = new FunctionAssignmentTailer(
+                    functionRuntimeManager,
+                    client.newReader(),
+                    workerConfig,
+                    errorNotifier);
 
             // initialize function metadata manager
-            this.functionMetaDataManager.initialize();
+            log.info("/** Initializing Metdata Manager **/");
+            functionMetaDataManager.initialize();
 
             // initialize function runtime manager
-            this.functionRuntimeManager.initialize();
+            log.info("/** Initializing Runtime Manager **/");
+            functionRuntimeManager.initialize();
+
+            this.leaderService = new LeaderService(this,
+                    client,
+                    functionAssignmentTailer,
+                    schedulerManager,
+                    errorNotifier);
+
+            // Setting references to managers in scheduler
+            schedulerManager.setFunctionMetaDataManager(functionMetaDataManager);
+            schedulerManager.setFunctionRuntimeManager(functionRuntimeManager);
+            schedulerManager.setMembershipManager(membershipManager);
+            schedulerManager.setLeaderService(leaderService);
 
             this.authenticationService = authenticationService;
 
             this.authorizationService = authorizationService;
 
-            // Starting cluster services
-            log.info("Start cluster services...");
-            this.clusterServiceCoordinator = new ClusterServiceCoordinator(
-                    this.workerConfig.getWorkerId(),
-                    membershipManager);
-
-            this.clusterServiceCoordinator.addTask("membership-monitor",
-                    this.workerConfig.getFailureCheckFreqMs(),
-                    () -> membershipManager.checkFailures(
-                            functionMetaDataManager, functionRuntimeManager, schedulerManager));
+            // Start function assignment tailer
+            log.info("/** Starting Function Assignment Tailer **/");
+            functionAssignmentTailer.start();
 
-            this.clusterServiceCoordinator.start();
+            log.info("/** Start Leader Service **/");
+            leaderService.start();
+            
+            // start function metadata manager
+            log.info("/** Starting Metdata Manager **/");
+            functionMetaDataManager.start();
 
-            // Start function runtime manager
-            this.functionRuntimeManager.start();
+            // Starting cluster services
+            this.clusterServiceCoordinator = new ClusterServiceCoordinator(
+                    workerConfig.getWorkerId(),
+                    leaderService);
+
+            clusterServiceCoordinator.addTask("membership-monitor",
+                    workerConfig.getFailureCheckFreqMs(),
+                    () -> {
+                        // computing a new schedule and checking for failures cannot happen concurrently
+                        // both paths of code modify internally cached assignments map in function runtime manager
+                        try {
+                            schedulerManager.getSchedulerLock().lock();
+                            membershipManager.checkFailures(
+                                    functionMetaDataManager, functionRuntimeManager, schedulerManager);
+                        } finally {
+                            schedulerManager.getSchedulerLock().unlock();
+                        }
+                    });
+
+            log.info("/** Starting Cluster Service Coordinator **/");
+            clusterServiceCoordinator.start();
 
             // indicate function worker service is done initializing
             this.isInitialized = true;
+
+            log.info("/** Started worker id={} **/", workerConfig.getWorkerId());
         } catch (Throwable t) {
             log.error("Error Starting up in worker", t);
             throw new RuntimeException(t);
@@ -239,18 +274,20 @@ public class WorkerService {
                 log.warn("Failed to close function metadata manager", e);
             }
         }
-        if (null != functionRuntimeManager) {
+
+        if (null != functionAssignmentTailer) {
             try {
-                functionRuntimeManager.close();
+                functionAssignmentTailer.close();
             } catch (Exception e) {
-                log.warn("Failed to close function runtime manager", e);
+                log.warn("Failed to close function assignment tailer", e);
             }
         }
-        if (null != client) {
+
+        if (null != functionRuntimeManager) {
             try {
-                client.close();
-            } catch (PulsarClientException e) {
-                log.warn("Failed to close pulsar client", e);
+                functionRuntimeManager.close();
+            } catch (Exception e) {
+                log.warn("Failed to close function runtime manager", e);
             }
         }
 
@@ -259,38 +296,46 @@ public class WorkerService {
         }
 
         if (null != membershipManager) {
-            try {
-                membershipManager.close();
-            } catch (PulsarClientException e) {
-                log.warn("Failed to close membership manager", e);
-            }
+            membershipManager.close();
         }
 
         if (null != schedulerManager) {
             schedulerManager.close();
         }
 
-        if (null != this.brokerAdmin) {
-            this.brokerAdmin.close();
+        if (null != leaderService) {
+            try {
+                leaderService.close();
+            } catch (PulsarClientException e) {
+                log.warn("Failed to close leader service", e);
+            }
+        }
+
+        if (null != client) {
+            try {
+                client.close();
+            } catch (PulsarClientException e) {
+                log.warn("Failed to close pulsar client", e);
+            }
         }
 
-        if (null != this.functionAdmin) {
-            this.functionAdmin.close();
+        if (null != brokerAdmin) {
+            brokerAdmin.close();
         }
 
-        if (null != this.stateStoreAdminClient) {
-            this.stateStoreAdminClient.close();
+        if (null != functionAdmin) {
+            functionAdmin.close();
         }
 
-        if (null != this.dlogNamespace) {
-            this.dlogNamespace.close();
+        if (null != stateStoreAdminClient) {
+            stateStoreAdminClient.close();
         }
 
-        if(this.executor != null) {
-            this.executor.shutdown();
+        if (null != dlogNamespace) {
+            dlogNamespace.close();
         }
 
-        if (this.statsUpdater != null) {
+        if (statsUpdater != null) {
             statsUpdater.shutdownNow();
         }
     }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
index 5cad320..fe1f11d 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
@@ -33,8 +33,11 @@ import org.apache.distributedlog.metadata.DLMetadata;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
 import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.common.conf.InternalConfigurationData;
 import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.functions.proto.Function;
@@ -315,4 +318,17 @@ public final class WorkerUtils {
 
         return false;
     }
+
+    public static Reader<byte[]> createReader(ReaderBuilder readerBuilder,
+                                              String readerName,
+                                              String topic,
+                                              MessageId startMessageId) throws PulsarClientException {
+        return readerBuilder
+                .subscriptionRolePrefix(readerName)
+                .readerName(readerName)
+                .topic(topic)
+                .readCompacted(true)
+                .startMessageId(startMessageId)
+                .create();
+    }
 }
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinatorTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinatorTest.java
index 763465b..a9e33da 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinatorTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinatorTest.java
@@ -52,7 +52,7 @@ public class ClusterServiceCoordinatorTest {
         return new org.powermock.modules.testng.PowerMockObjectFactory();
     }
 
-    private MembershipManager membershipManager;
+    private LeaderService leaderService;
     private ClusterServiceCoordinator coordinator;
     private ScheduledExecutorService mockExecutor;
     private MockExecutorController mockExecutorController;
@@ -70,8 +70,8 @@ public class ClusterServiceCoordinatorTest {
                 any(ThreadFactory.class))
         ).thenReturn(mockExecutor);
 
-        this.membershipManager = mock(MembershipManager.class);
-        this.coordinator = new ClusterServiceCoordinator("test-coordinator", membershipManager);
+        this.leaderService = mock(LeaderService.class);
+        this.coordinator = new ClusterServiceCoordinator("test-coordinator", leaderService);
     }
 
 
@@ -94,18 +94,18 @@ public class ClusterServiceCoordinatorTest {
             .scheduleAtFixedRate(any(Runnable.class), eq(interval), eq(interval), eq(TimeUnit.MILLISECONDS));
 
         // when task is executed, it is the leader
-        when(membershipManager.isLeader()).thenReturn(true);
+        when(leaderService.isLeader()).thenReturn(true);
         mockExecutorController.advance(Duration.ofMillis(interval));
 
-        verify(membershipManager, times(1)).isLeader();
+        verify(leaderService, times(1)).isLeader();
         verify(mockTask, times(1)).run();
 
         // when task is executed, it is not the leader
-        when(membershipManager.isLeader()).thenReturn(false);
+        when(leaderService.isLeader()).thenReturn(false);
         mockExecutorController.advance(Duration.ofMillis(interval));
 
         // `isLeader` is called twice, however the task is only executed once (when it was leader)
-        verify(membershipManager, times(2)).isLeader();
+        verify(leaderService, times(2)).isLeader();
         verify(mockTask, times(1)).run();
     }
 
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailerTest.java
new file mode 100644
index 0000000..28eb06a
--- /dev/null
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailerTest.java
@@ -0,0 +1,422 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import io.netty.buffer.Unpooled;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ReaderBuilder;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
+import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+
+@Slf4j
+public class FunctionAssignmentTailerTest {
+
+    @Test(timeOut = 10000)
+    public void testErrorNotifier() throws Exception {
+        WorkerConfig workerConfig = new WorkerConfig();
+        workerConfig.setWorkerId("worker-1");
+        workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+        workerConfig.setFunctionRuntimeFactoryConfigs(
+                ObjectMapperFactory.getThreadLocal().convertValue(
+                        new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
+        workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+        workerConfig.setStateStorageServiceUrl("foo");
+        workerConfig.setFunctionAssignmentTopicName("assignments");
+
+        Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
+                Function.FunctionDetails.newBuilder()
+                        .setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build();
+
+        Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
+                Function.FunctionDetails.newBuilder()
+                        .setTenant("test-tenant").setNamespace("test-namespace").setName("func-2")).build();
+
+        Function.Assignment assignment1 = Function.Assignment.newBuilder()
+                .setWorkerId("worker-1")
+                .setInstance(Function.Instance.newBuilder()
+                        .setFunctionMetaData(function1).setInstanceId(0).build())
+                .build();
+        Function.Assignment assignment2 = Function.Assignment.newBuilder()
+                .setWorkerId("worker-1")
+                .setInstance(Function.Instance.newBuilder()
+                        .setFunctionMetaData(function2).setInstanceId(0).build())
+                .build();
+
+        ArrayBlockingQueue<Message<byte[]>> messageList = new ArrayBlockingQueue<>(2);
+        PulsarApi.MessageMetadata.Builder msgMetadataBuilder = PulsarApi.MessageMetadata.newBuilder();
+        Message message1 = spy(new MessageImpl("foo", MessageId.latest.toString(),
+                new HashMap<>(), Unpooled.copiedBuffer(assignment1.toByteArray()), null, msgMetadataBuilder));
+        doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance())).when(message1).getKey();
+
+        Message message2 = spy(new MessageImpl("foo", MessageId.latest.toString(),
+                new HashMap<>(), Unpooled.copiedBuffer(assignment2.toByteArray()), null, msgMetadataBuilder));
+        doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment2.getInstance())).when(message2).getKey();
+
+        PulsarClient pulsarClient = mock(PulsarClient.class);
+
+        Reader<byte[]> reader = mock(Reader.class);
+
+        when(reader.readNext(anyInt(), any())).thenAnswer(new Answer<Message<byte[]>>() {
+            @Override
+            public Message<byte[]> answer(InvocationOnMock invocationOnMock) throws Throwable {
+                return messageList.poll(10, TimeUnit.SECONDS);
+            }
+        });
+
+        when(reader.readNextAsync()).thenAnswer(new Answer<CompletableFuture<Message<byte[]>>>() {
+            @Override
+            public CompletableFuture<Message<byte[]>> answer(InvocationOnMock invocationOnMock) throws Throwable {
+                return new CompletableFuture<>();
+            }
+        });
+
+        when(reader.hasMessageAvailable()).thenAnswer(new Answer<Boolean>() {
+            @Override
+            public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable {
+                return !messageList.isEmpty();
+            }
+        });
+
+        ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
+        doReturn(readerBuilder).when(pulsarClient).newReader();
+        doReturn(readerBuilder).when(readerBuilder).topic(anyString());
+        doReturn(readerBuilder).when(readerBuilder).readerName(anyString());
+        doReturn(readerBuilder).when(readerBuilder).subscriptionRolePrefix(anyString());
+        doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
+        doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
+        doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean());
+
+        doReturn(reader).when(readerBuilder).create();
+        WorkerService workerService = mock(WorkerService.class);
+        doReturn(pulsarClient).when(workerService).getClient();
+        doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
+
+        ErrorNotifier errorNotifier = spy(ErrorNotifier.getDefaultImpl());
+
+        // test new assignment add functions
+        FunctionRuntimeManager functionRuntimeManager = mock(FunctionRuntimeManager.class);
+
+        FunctionAssignmentTailer functionAssignmentTailer = spy(new FunctionAssignmentTailer(functionRuntimeManager, readerBuilder, workerConfig, errorNotifier));
+
+        functionAssignmentTailer.start();
+
+        // verify no errors occured
+        verify(errorNotifier, times(0)).triggerError(any());
+
+        messageList.add(message1);
+
+        verify(errorNotifier, times(0)).triggerError(any());
+
+        // trigger an error to be thrown
+        doThrow(new RuntimeException("test")).when(functionRuntimeManager).processAssignmentMessage(any());
+
+        messageList.add(message2);
+
+        try {
+            errorNotifier.waitForError();
+        } catch (Exception e) {
+            assertEquals(e.getCause().getMessage(), "test");
+        }
+        verify(errorNotifier, times(1)).triggerError(any());
+
+        functionAssignmentTailer.close();
+    }
+
+    @Test(timeOut = 10000)
+    public void testProcessingAssignments() throws Exception {
+        WorkerConfig workerConfig = new WorkerConfig();
+        workerConfig.setWorkerId("worker-1");
+        workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+        workerConfig.setFunctionRuntimeFactoryConfigs(
+                ObjectMapperFactory.getThreadLocal().convertValue(
+                        new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
+        workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+        workerConfig.setStateStorageServiceUrl("foo");
+        workerConfig.setFunctionAssignmentTopicName("assignments");
+
+        Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
+                Function.FunctionDetails.newBuilder()
+                        .setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build();
+
+        Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
+                Function.FunctionDetails.newBuilder()
+                        .setTenant("test-tenant").setNamespace("test-namespace").setName("func-2")).build();
+
+        Function.Assignment assignment1 = Function.Assignment.newBuilder()
+                .setWorkerId("worker-1")
+                .setInstance(Function.Instance.newBuilder()
+                        .setFunctionMetaData(function1).setInstanceId(0).build())
+                .build();
+        Function.Assignment assignment2 = Function.Assignment.newBuilder()
+                .setWorkerId("worker-1")
+                .setInstance(Function.Instance.newBuilder()
+                        .setFunctionMetaData(function2).setInstanceId(0).build())
+                .build();
+
+        ArrayBlockingQueue<Message<byte[]>> messageList = new ArrayBlockingQueue<>(2);
+
+        MessageId messageId1 = new MessageIdImpl(1, 1, -1);
+        MessageId messageId2 = new MessageIdImpl(1, 2, -1);
+
+        PulsarApi.MessageMetadata.Builder msgMetadataBuilder = PulsarApi.MessageMetadata.newBuilder();
+        Message message1 = spy(new MessageImpl("foo", messageId1.toString(),
+                new HashMap<>(), Unpooled.copiedBuffer(assignment1.toByteArray()), null, msgMetadataBuilder));
+        doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance())).when(message1).getKey();
+
+        Message message2 = spy(new MessageImpl("foo", messageId2.toString(),
+                new HashMap<>(), Unpooled.copiedBuffer(assignment2.toByteArray()), null, msgMetadataBuilder));
+        doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment2.getInstance())).when(message2).getKey();
+
+        PulsarClient pulsarClient = mock(PulsarClient.class);
+
+        Reader<byte[]> reader = mock(Reader.class);
+
+        when(reader.readNext(anyInt(), any())).thenAnswer(new Answer<Message<byte[]>>() {
+            @Override
+            public Message<byte[]> answer(InvocationOnMock invocationOnMock) throws Throwable {
+                return messageList.poll(10, TimeUnit.SECONDS);
+            }
+        });
+
+        when(reader.readNextAsync()).thenAnswer(new Answer<CompletableFuture<Message<byte[]>>>() {
+            @Override
+            public CompletableFuture<Message<byte[]>> answer(InvocationOnMock invocationOnMock) throws Throwable {
+                return new CompletableFuture<>();
+            }
+        });
+
+        when(reader.hasMessageAvailable()).thenAnswer(new Answer<Boolean>() {
+            @Override
+            public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable {
+                return !messageList.isEmpty();
+            }
+        });
+
+        ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
+        doReturn(readerBuilder).when(pulsarClient).newReader();
+        doReturn(readerBuilder).when(readerBuilder).topic(anyString());
+        doReturn(readerBuilder).when(readerBuilder).readerName(anyString());
+        doReturn(readerBuilder).when(readerBuilder).subscriptionRolePrefix(anyString());
+        doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
+        doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
+        doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean());
+
+        doReturn(reader).when(readerBuilder).create();
+        WorkerService workerService = mock(WorkerService.class);
+        doReturn(pulsarClient).when(workerService).getClient();
+        doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
+
+        ErrorNotifier errorNotifier = spy(ErrorNotifier.getDefaultImpl());
+
+        // test new assignment add functions
+        FunctionRuntimeManager functionRuntimeManager = mock(FunctionRuntimeManager.class);
+
+        FunctionAssignmentTailer functionAssignmentTailer = spy(new FunctionAssignmentTailer(functionRuntimeManager, readerBuilder, workerConfig, errorNotifier));
+
+        functionAssignmentTailer.start();
+
+        messageList.add(message1);
+        for (int i = 0; i < 10; i++) {
+            try {
+                verify(functionRuntimeManager, times(1)).processAssignmentMessage(eq(message1));
+                break;
+            } catch (org.mockito.exceptions.verification.WantedButNotInvoked e) {
+                if (i == 9) {
+                    throw e;
+                }
+            }
+            Thread.sleep(200);
+        }
+
+        messageList.add(message2);
+        for (int i = 0; i < 10; i++) {
+            try {
+                verify(functionRuntimeManager, times(1)).processAssignmentMessage(eq(message2));
+                break;
+            } catch (org.mockito.exceptions.verification.WantedButNotInvoked e) {
+                if (i == 9) {
+                    throw e;
+                }
+            }
+            Thread.sleep(200);
+        }
+
+        Assert.assertEquals(functionAssignmentTailer.getLastMessageId(), message2.getMessageId());
+        functionAssignmentTailer.close();
+    }
+
+    @Test(timeOut = 10000)
+    public void testTriggerReadToTheEndAndExit() throws Exception {
+        WorkerConfig workerConfig = new WorkerConfig();
+        workerConfig.setWorkerId("worker-1");
+        workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+        workerConfig.setFunctionRuntimeFactoryConfigs(
+                ObjectMapperFactory.getThreadLocal().convertValue(
+                        new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
+        workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+        workerConfig.setStateStorageServiceUrl("foo");
+        workerConfig.setFunctionAssignmentTopicName("assignments");
+
+        Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
+                Function.FunctionDetails.newBuilder()
+                        .setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build();
+
+        Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
+                Function.FunctionDetails.newBuilder()
+                        .setTenant("test-tenant").setNamespace("test-namespace").setName("func-2")).build();
+
+        Function.Assignment assignment1 = Function.Assignment.newBuilder()
+                .setWorkerId("worker-1")
+                .setInstance(Function.Instance.newBuilder()
+                        .setFunctionMetaData(function1).setInstanceId(0).build())
+                .build();
+        Function.Assignment assignment2 = Function.Assignment.newBuilder()
+                .setWorkerId("worker-1")
+                .setInstance(Function.Instance.newBuilder()
+                        .setFunctionMetaData(function2).setInstanceId(0).build())
+                .build();
+
+        ArrayBlockingQueue<Message<byte[]>> messageList = new ArrayBlockingQueue<>(2);
+
+        MessageId messageId1 = new MessageIdImpl(1, 1, -1);
+        MessageId messageId2 = new MessageIdImpl(1, 2, -1);
+
+        PulsarApi.MessageMetadata.Builder msgMetadataBuilder = PulsarApi.MessageMetadata.newBuilder();
+        Message message1 = spy(new MessageImpl("foo", messageId1.toString(),
+                new HashMap<>(), Unpooled.copiedBuffer(assignment1.toByteArray()), null, msgMetadataBuilder));
+        doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance())).when(message1).getKey();
+
+        Message message2 = spy(new MessageImpl("foo", messageId2.toString(),
+                new HashMap<>(), Unpooled.copiedBuffer(assignment2.toByteArray()), null, msgMetadataBuilder));
+        doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment2.getInstance())).when(message2).getKey();
+
+        PulsarClient pulsarClient = mock(PulsarClient.class);
+
+        Reader<byte[]> reader = mock(Reader.class);
+
+        when(reader.readNext(anyInt(), any())).thenAnswer(new Answer<Message<byte[]>>() {
+            @Override
+            public Message<byte[]> answer(InvocationOnMock invocationOnMock) throws Throwable {
+                return messageList.poll(10, TimeUnit.MILLISECONDS);
+            }
+        });
+
+        when(reader.readNextAsync()).thenAnswer(new Answer<CompletableFuture<Message<byte[]>>>() {
+            @Override
+            public CompletableFuture<Message<byte[]>> answer(InvocationOnMock invocationOnMock) throws Throwable {
+                return new CompletableFuture<>();
+            }
+        });
+
+        when(reader.hasMessageAvailable()).thenAnswer(new Answer<Boolean>() {
+            @Override
+            public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable {
+                return !messageList.isEmpty();
+            }
+        });
+
+        ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
+        doReturn(readerBuilder).when(pulsarClient).newReader();
+        doReturn(readerBuilder).when(readerBuilder).topic(anyString());
+        doReturn(readerBuilder).when(readerBuilder).readerName(anyString());
+        doReturn(readerBuilder).when(readerBuilder).subscriptionRolePrefix(anyString());
+        doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
+        doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
+        doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean());
+
+        doReturn(reader).when(readerBuilder).create();
+        WorkerService workerService = mock(WorkerService.class);
+        doReturn(pulsarClient).when(workerService).getClient();
+        doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
+
+        ErrorNotifier errorNotifier = spy(ErrorNotifier.getDefaultImpl());
+
+        // test new assignment add functions
+        FunctionRuntimeManager functionRuntimeManager = mock(FunctionRuntimeManager.class);
+
+        FunctionAssignmentTailer functionAssignmentTailer = spy(new FunctionAssignmentTailer(functionRuntimeManager, readerBuilder, workerConfig, errorNotifier));
+
+        functionAssignmentTailer.start();
+
+        messageList.add(message1);
+        for (int i = 0; i < 10; i++) {
+            try {
+                verify(functionRuntimeManager, times(1)).processAssignmentMessage(eq(message1));
+                break;
+            } catch (org.mockito.exceptions.verification.WantedButNotInvoked e) {
+                if (i == 9) {
+                    throw e;
+                }
+            }
+            Thread.sleep(200);
+        }
+
+        functionAssignmentTailer.triggerReadToTheEndAndExit().get();
+        for (int i = 0; i < 10; i++) {
+            if(!functionAssignmentTailer.getThread().isAlive()) {
+                break;
+            }
+
+            if (i == 9) {
+                Assert.assertFalse(functionAssignmentTailer.getThread().isAlive());
+            }
+            Thread.sleep(200);
+        }
+
+        messageList.add(message2);
+        Assert.assertEquals(functionAssignmentTailer.getLastMessageId(), message1.getMessageId());
+
+        functionAssignmentTailer.close();
+    }
+}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index d486707..54b47f5 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -47,9 +47,7 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
 
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.any;
@@ -57,7 +55,6 @@ import static org.mockito.Mockito.anyBoolean;
 import static org.mockito.Mockito.anyString;
 import static org.mockito.Mockito.argThat;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.spy;
@@ -483,131 +480,6 @@ public class FunctionRuntimeManagerTest {
         assertEquals(functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0"), functionRuntimeInfo);
     }
 
-    @Test(timeOut = 10000)
-    public void testErrorNotifier() throws Exception {
-        WorkerConfig workerConfig = new WorkerConfig();
-        workerConfig.setWorkerId("worker-1");
-        workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
-        workerConfig.setFunctionRuntimeFactoryConfigs(
-                ObjectMapperFactory.getThreadLocal().convertValue(
-                        new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
-        workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
-        workerConfig.setStateStorageServiceUrl("foo");
-        workerConfig.setFunctionAssignmentTopicName("assignments");
-
-        Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
-                Function.FunctionDetails.newBuilder()
-                        .setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build();
-
-        Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
-                Function.FunctionDetails.newBuilder()
-                        .setTenant("test-tenant").setNamespace("test-namespace").setName("func-2")).build();
-
-        Function.Assignment assignment1 = Function.Assignment.newBuilder()
-                .setWorkerId("worker-1")
-                .setInstance(Function.Instance.newBuilder()
-                        .setFunctionMetaData(function1).setInstanceId(0).build())
-                .build();
-        Function.Assignment assignment2 = Function.Assignment.newBuilder()
-                .setWorkerId("worker-1")
-                .setInstance(Function.Instance.newBuilder()
-                        .setFunctionMetaData(function2).setInstanceId(0).build())
-                .build();
-
-        ArrayBlockingQueue<Message<byte[]>> messageList = new ArrayBlockingQueue<>(2);
-        PulsarApi.MessageMetadata.Builder msgMetadataBuilder = PulsarApi.MessageMetadata.newBuilder();
-        Message message1 = spy(new MessageImpl("foo", MessageId.latest.toString(),
-                new HashMap<>(), Unpooled.copiedBuffer(assignment1.toByteArray()), null, msgMetadataBuilder));
-        doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance())).when(message1).getKey();
-
-        Message message2 = spy(new MessageImpl("foo", MessageId.latest.toString(),
-                new HashMap<>(), Unpooled.copiedBuffer(assignment2.toByteArray()), null, msgMetadataBuilder));
-        doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment2.getInstance())).when(message2).getKey();
-
-        PulsarClient pulsarClient = mock(PulsarClient.class);
-
-        Reader<byte[]> reader = mock(Reader.class);
-
-
-        when(reader.readNext()).thenAnswer(new Answer<Message<byte[]>>() {
-            @Override
-            public Message<byte[]> answer(InvocationOnMock invocationOnMock) throws Throwable {
-                return messageList.poll(10, TimeUnit.SECONDS);
-            }
-        });
-
-        when(reader.readNextAsync()).thenAnswer(new Answer<CompletableFuture<Message<byte[]>>>() {
-            @Override
-            public CompletableFuture<Message<byte[]>> answer(InvocationOnMock invocationOnMock) throws Throwable {
-                return new CompletableFuture<>();
-            }
-        });
-
-        when(reader.hasMessageAvailable()).thenAnswer(new Answer<Boolean>() {
-            @Override
-            public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable {
-                return !messageList.isEmpty();
-            }
-        });
-
-        ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
-        doReturn(readerBuilder).when(pulsarClient).newReader();
-        doReturn(readerBuilder).when(readerBuilder).topic(anyString());
-        doReturn(readerBuilder).when(readerBuilder).readerName(anyString());
-        doReturn(readerBuilder).when(readerBuilder).subscriptionRolePrefix(anyString());
-        doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
-        doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
-        doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean());
-
-        doReturn(reader).when(readerBuilder).create();
-        WorkerService workerService = mock(WorkerService.class);
-        doReturn(pulsarClient).when(workerService).getClient();
-        doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
-
-        ErrorNotifier errorNotifier = spy(ErrorNotifier.getDefaultImpl());
-
-        // test new assignment add functions
-        FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager(
-                workerConfig,
-                workerService,
-                mock(Namespace.class),
-                mock(MembershipManager.class),
-                mock(ConnectorsManager.class),
-                mock(FunctionsManager.class),
-                mock(FunctionMetaDataManager.class),
-                errorNotifier));
-        FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
-        doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
-        doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class));
-        doNothing().when(functionActioner).terminateFunction(any(FunctionRuntimeInfo.class));
-        functionRuntimeManager.setFunctionActioner(functionActioner);
-
-        functionRuntimeManager.initialize();
-
-        // verify no errors occured
-        verify(errorNotifier, times(0)).triggerError(any());
-
-        messageList.add(message1);
-
-        functionRuntimeManager.start();
-
-        verify(errorNotifier, times(0)).triggerError(any());
-
-        // trigger an error to be thrown
-        doThrow(new RuntimeException("test")).when(functionRuntimeManager).processAssignment(any());
-
-        messageList.add(message2);
-
-        try {
-            errorNotifier.waitForError();
-        } catch (Exception e) {
-            assertEquals(e.getCause().getMessage(), "test");
-        }
-        verify(errorNotifier, times(1)).triggerError(any());
-
-        functionRuntimeManager.close();
-    }
-
     @Test
     public void testRuntimeManagerInitialize() throws Exception {
         WorkerConfig workerConfig = new WorkerConfig();
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java
new file mode 100644
index 0000000..445498a
--- /dev/null
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.ConsumerEventListener;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+public class LeaderServiceTest {
+
+    private final WorkerConfig workerConfig;
+    private LeaderService leaderService;
+    private PulsarClientImpl mockClient;
+    AtomicReference<ConsumerEventListener> listenerHolder;
+    private ConsumerImpl mockConsumer;
+    private FunctionAssignmentTailer functionAssignmentTailer;
+    private SchedulerManager schedulerManager;
+
+    public LeaderServiceTest() {
+        this.workerConfig = new WorkerConfig();
+        workerConfig.setWorkerId("worker-1");
+        workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+        workerConfig.setFunctionRuntimeFactoryConfigs(
+                ObjectMapperFactory.getThreadLocal().convertValue(
+                        new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
+        workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+        workerConfig.setStateStorageServiceUrl("foo");
+        workerConfig.setWorkerPort(1234);
+    }
+
+    @BeforeMethod
+    public void setup() throws PulsarClientException {
+        mockClient = mock(PulsarClientImpl.class);
+
+        mockConsumer = mock(ConsumerImpl.class);
+        ConsumerBuilder<byte[]> mockConsumerBuilder = mock(ConsumerBuilder.class);
+
+        when(mockConsumerBuilder.topic(anyString())).thenReturn(mockConsumerBuilder);
+        when(mockConsumerBuilder.subscriptionName(anyString())).thenReturn(mockConsumerBuilder);
+        when(mockConsumerBuilder.subscriptionType(any(SubscriptionType.class))).thenReturn(mockConsumerBuilder);
+        when(mockConsumerBuilder.property(anyString(), anyString())).thenReturn(mockConsumerBuilder);
+        when(mockConsumerBuilder.consumerName(anyString())).thenReturn(mockConsumerBuilder);
+
+        when(mockConsumerBuilder.subscribe()).thenReturn(mockConsumer);
+        WorkerService workerService = mock(WorkerService.class);
+        doReturn(workerConfig).when(workerService).getWorkerConfig();
+
+        listenerHolder = new AtomicReference<>();
+        when(mockConsumerBuilder.consumerEventListener(any(ConsumerEventListener.class))).thenAnswer(invocationOnMock -> {
+
+            ConsumerEventListener listener = invocationOnMock.getArgument(0);
+            listenerHolder.set(listener);
+
+            return mockConsumerBuilder;
+        });
+
+        when(mockClient.newConsumer()).thenReturn(mockConsumerBuilder);
+
+        schedulerManager = mock(SchedulerManager.class);
+
+
+        functionAssignmentTailer = mock(FunctionAssignmentTailer.class);
+        when(functionAssignmentTailer.triggerReadToTheEndAndExit()).thenReturn(CompletableFuture.completedFuture(null));
+
+        leaderService = spy(new LeaderService(workerService, mockClient, functionAssignmentTailer, schedulerManager, ErrorNotifier.getDefaultImpl()));
+        leaderService.start();
+    }
+
+    @Test
+    public void testLeaderService() throws Exception {
+        MessageId messageId = new MessageIdImpl(1, 2, -1);
+        when(schedulerManager.getLastMessageProduced()).thenReturn(messageId);
+
+        assertFalse(leaderService.isLeader());
+        verify(mockClient, times(1)).newConsumer();
+
+        listenerHolder.get().becameActive(mockConsumer, 0);
+        assertTrue(leaderService.isLeader());
+
+        verify(functionAssignmentTailer, times(1)).triggerReadToTheEndAndExit();
+        verify(functionAssignmentTailer, times(1)).close();
+        verify(schedulerManager, times((1))).initialize();
+
+        listenerHolder.get().becameInactive(mockConsumer, 0);
+        assertFalse(leaderService.isLeader());
+
+        verify(functionAssignmentTailer, times(1)).startFromMessage(messageId);
+        verify(schedulerManager, times(1)).close();
+    }
+
+    @Test
+    public void testLeaderServiceNoNewScheduling() throws Exception {
+        when(schedulerManager.getLastMessageProduced()).thenReturn(null);
+
+        assertFalse(leaderService.isLeader());
+        verify(mockClient, times(1)).newConsumer();
+
+        listenerHolder.get().becameActive(mockConsumer, 0);
+        assertTrue(leaderService.isLeader());
+
+        verify(functionAssignmentTailer, times(1)).triggerReadToTheEndAndExit();
+        verify(functionAssignmentTailer, times(1)).close();
+        verify(schedulerManager, times((1))).initialize();
+
+        listenerHolder.get().becameInactive(mockConsumer, 0);
+        assertFalse(leaderService.isLeader());
+
+        verify(functionAssignmentTailer, times(1)).start();
+        verify(schedulerManager, times(1)).close();
+    }
+}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index 7acb3f7..71fe6b2 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -28,14 +28,11 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
 
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -71,46 +68,6 @@ public class MembershipManagerTest {
         workerConfig.setStateStorageServiceUrl("foo");
     }
 
-    @Test
-    public void testConsumerEventListener() throws Exception {
-        PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
-        PulsarAdmin mockAdmin = mock(PulsarAdmin.class);
-
-        ConsumerImpl<byte[]> mockConsumer = mock(ConsumerImpl.class);
-        ConsumerBuilder<byte[]> mockConsumerBuilder = mock(ConsumerBuilder.class);
-
-        when(mockConsumerBuilder.topic(anyString())).thenReturn(mockConsumerBuilder);
-        when(mockConsumerBuilder.subscriptionName(anyString())).thenReturn(mockConsumerBuilder);
-        when(mockConsumerBuilder.subscriptionType(any(SubscriptionType.class))).thenReturn(mockConsumerBuilder);
-        when(mockConsumerBuilder.property(anyString(), anyString())).thenReturn(mockConsumerBuilder);
-
-        when(mockConsumerBuilder.subscribe()).thenReturn(mockConsumer);
-        WorkerService workerService = mock(WorkerService.class);
-        doReturn(workerConfig).when(workerService).getWorkerConfig();
-
-        AtomicReference<ConsumerEventListener> listenerHolder = new AtomicReference<>();
-        when(mockConsumerBuilder.consumerEventListener(any(ConsumerEventListener.class))).thenAnswer(invocationOnMock -> {
-
-            ConsumerEventListener listener = invocationOnMock.getArgument(0);
-            listenerHolder.set(listener);
-
-            return mockConsumerBuilder;
-        });
-
-        when(mockClient.newConsumer()).thenReturn(mockConsumerBuilder);
-
-        MembershipManager membershipManager = spy(new MembershipManager(workerService, mockClient, mockAdmin));
-        assertFalse(membershipManager.isLeader());
-        verify(mockClient, times(1))
-            .newConsumer();
-
-        listenerHolder.get().becameActive(mockConsumer, 0);
-        assertTrue(membershipManager.isLeader());
-
-        listenerHolder.get().becameInactive(mockConsumer, 0);
-        assertFalse(membershipManager.isLeader());
-    }
-
     private static PulsarClient mockPulsarClient() throws PulsarClientException {
         PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
 
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index fafed1f..e72b81e 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.functions.worker;
 
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.protobuf.InvalidProtocolBufferException;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import io.prometheus.client.CollectorRegistry;
@@ -38,6 +39,7 @@ import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
 import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler;
+import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
 import org.mockito.invocation.Invocation;
 import org.testng.Assert;
@@ -47,18 +49,29 @@ import org.testng.annotations.Test;
 
 import java.lang.reflect.Method;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyBoolean;
 import static org.mockito.Mockito.anyInt;
@@ -68,6 +81,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
@@ -83,6 +97,8 @@ public class SchedulerManagerTest {
     private Producer producer;
     private TypedMessageBuilder<byte[]> message;
     private ScheduledExecutorService executor;
+    private LeaderService leaderService;
+    private ErrorNotifier errorNotifier;
 
     @BeforeMethod
     public void setup() {
@@ -123,13 +139,16 @@ public class SchedulerManagerTest {
 
         this.executor = Executors
                 .newSingleThreadScheduledExecutor(new DefaultThreadFactory("worker-test"));
-        schedulerManager = spy(new SchedulerManager(workerConfig, pulsarClient, null, executor));
+        errorNotifier = spy(ErrorNotifier.getDefaultImpl());
+        schedulerManager = spy(new SchedulerManager(workerConfig, pulsarClient, null, errorNotifier));
         functionRuntimeManager = mock(FunctionRuntimeManager.class);
         functionMetaDataManager = mock(FunctionMetaDataManager.class);
         membershipManager = mock(MembershipManager.class);
+        leaderService = mock(LeaderService.class);
         schedulerManager.setFunctionMetaDataManager(functionMetaDataManager);
         schedulerManager.setFunctionRuntimeManager(functionRuntimeManager);
         schedulerManager.setMembershipManager(membershipManager);
+        schedulerManager.setLeaderService(leaderService);
     }
 
     @AfterMethod
@@ -171,16 +190,17 @@ public class SchedulerManagerTest {
         doReturn(workerInfoList).when(membershipManager).getCurrentMembership();
 
         // i am not leader
-        doReturn(false).when(membershipManager).isLeader();
+        doReturn(false).when(leaderService).isLeader();
         callSchedule();
         verify(producer, times(0)).sendAsync(any());
 
         // i am leader
-        doReturn(true).when(membershipManager).isLeader();
+        doReturn(true).when(leaderService).isLeader();
         callSchedule();
         List<Invocation> invocations = getMethodInvocationDetails(schedulerManager,
-                SchedulerManager.class.getMethod("invokeScheduler"));
+                SchedulerManager.class.getDeclaredMethod("invokeScheduler"));
         Assert.assertEquals(invocations.size(), 1);
+        verify(errorNotifier, times(0)).triggerError(any());
     }
 
     @Test
@@ -217,12 +237,13 @@ public class SchedulerManagerTest {
         doReturn(workerInfoList).when(membershipManager).getCurrentMembership();
 
         // i am leader
-        doReturn(true).when(membershipManager).isLeader();
+        doReturn(true).when(leaderService).isLeader();
 
         callSchedule();
 
-        List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync"));
+        List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("send"));
         Assert.assertEquals(invocations.size(), 0);
+        verify(errorNotifier, times(0)).triggerError(any());
     }
 
     @Test
@@ -264,11 +285,11 @@ public class SchedulerManagerTest {
         doReturn(workerInfoList).when(membershipManager).getCurrentMembership();
 
         // i am leader
-        doReturn(true).when(membershipManager).isLeader();
+        doReturn(true).when(leaderService).isLeader();
 
         callSchedule();
 
-        List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync"));
+        List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("send"));
         Assert.assertEquals(invocations.size(), 1);
         invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value",
                 Object.class));
@@ -283,6 +304,8 @@ public class SchedulerManagerTest {
                 .build();
         Assert.assertEquals(assignment2, assignments);
 
+        // make sure we also directly added the assignment to in memory assignment cache in function runtime manager
+        verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment2));
     }
 
     @Test
@@ -334,20 +357,21 @@ public class SchedulerManagerTest {
         doReturn(workerInfoList).when(membershipManager).getCurrentMembership();
 
         // i am leader
-        doReturn(true).when(membershipManager).isLeader();
+        doReturn(true).when(leaderService).isLeader();
 
         callSchedule();
 
-        List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync"));
+        List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("send"));
         Assert.assertEquals(invocations.size(), 1);
         invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value",
                 Object.class));
         byte[] send = (byte[]) invocations.get(0).getRawArguments()[0];
-        Assignment assignments = Assignment.parseFrom(send);
-
-        log.info("assignments: {}", assignments);
 
+        // delete assignment message should only have key = full qualified instance id and value = null;
         Assert.assertEquals(0, send.length);
+
+        // make sure we also directly deleted the assignment from the in memory assignment cache in function runtime manager
+        verify(functionRuntimeManager, times(1)).deleteAssignment(eq(FunctionCommon.getFullyQualifiedInstanceId(assignment2.getInstance())));
     }
 
     @Test
@@ -391,11 +415,11 @@ public class SchedulerManagerTest {
         doReturn(workerInfoList).when(membershipManager).getCurrentMembership();
 
         // i am leader
-        doReturn(true).when(membershipManager).isLeader();
+        doReturn(true).when(leaderService).isLeader();
 
         callSchedule();
 
-        List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync"));
+        List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("send"));
         Assert.assertEquals(invocations.size(), 1);
         invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value",
                 Object.class));
@@ -443,7 +467,7 @@ public class SchedulerManagerTest {
 
         callSchedule();
 
-        invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync"));
+        invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("send"));
         Assert.assertEquals(invocations.size(), 4);
         invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value",
                 Object.class));
@@ -502,18 +526,26 @@ public class SchedulerManagerTest {
         doReturn(workerInfoList).when(membershipManager).getCurrentMembership();
 
         // i am leader
-        doReturn(true).when(membershipManager).isLeader();
+        doReturn(true).when(leaderService).isLeader();
 
         callSchedule();
 
-        List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync"));
+        List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("send"));
         Assert.assertEquals(invocations.size(), 3);
         invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value",
                 Object.class));
-        byte[] send = (byte[]) invocations.get(0).getRawArguments()[0];
-        Assignment assignments = Assignment.parseFrom(send);
 
-        log.info("assignments: {}", assignments);
+        for (int i = 0; i < invocations.size(); i++) {
+            Invocation invocation = invocations.get(i);
+            byte[] send = (byte[]) invocation.getRawArguments()[0];
+            Assignment assignment = Assignment.parseFrom(send);
+            Assignment expectedAssignment = Function.Assignment.newBuilder()
+                    .setWorkerId("worker-1")
+                    .setInstance(Function.Instance.newBuilder()
+                            .setFunctionMetaData(function2).setInstanceId(i).build())
+                    .build();
+            Assert.assertEquals(assignment, expectedAssignment);
+        }
 
         Set<Assignment> allAssignments = Sets.newHashSet();
         invocations.forEach(invocation -> {
@@ -544,6 +576,12 @@ public class SchedulerManagerTest {
         assertTrue(allAssignments.contains(assignment2_2));
         assertTrue(allAssignments.contains(assignment2_3));
 
+        // make sure we also directly add the assignment to the in memory assignment cache in function runtime manager
+        verify(functionRuntimeManager, times(3)).processAssignment(any());
+        verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment2_1));
+        verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment2_2));
+        verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment2_3));
+
         // updating assignments
         currentAssignments.get("worker-1").put(FunctionCommon.getFullyQualifiedInstanceId(assignment2_1.getInstance()), assignment2_1);
         currentAssignments.get("worker-1").put(FunctionCommon.getFullyQualifiedInstanceId(assignment2_2.getInstance()), assignment2_2);
@@ -568,7 +606,7 @@ public class SchedulerManagerTest {
 
         callSchedule();
 
-        invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync"));
+        invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("send"));
         Assert.assertEquals(invocations.size(), 6);
         invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value",
                 Object.class));
@@ -583,6 +621,14 @@ public class SchedulerManagerTest {
         });
 
         assertTrue(allAssignments2.contains(assignment2Scaled));
+
+        // make sure we also directly removed the assignment from the in memory assignment cache in function runtime manager
+        verify(functionRuntimeManager, times(2)).deleteAssignment(anyString());
+        verify(functionRuntimeManager, times(1)).deleteAssignment(eq(FunctionCommon.getFullyQualifiedInstanceId(assignment2_2.getInstance())));
+        verify(functionRuntimeManager, times(1)).deleteAssignment(eq(FunctionCommon.getFullyQualifiedInstanceId(assignment2_2.getInstance())));
+
+        verify(functionRuntimeManager, times(4)).processAssignment(any());
+        verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment2Scaled));
     }
 
     @Test
@@ -621,11 +667,11 @@ public class SchedulerManagerTest {
         doReturn(workerInfoList).when(membershipManager).getCurrentMembership();
 
         // i am leader
-        doReturn(true).when(membershipManager).isLeader();
+        doReturn(true).when(leaderService).isLeader();
 
         callSchedule();
 
-        List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync"));
+        List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("send"));
         Assert.assertEquals(invocations.size(), 2);
         invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value",
                 Object.class));
@@ -683,7 +729,7 @@ public class SchedulerManagerTest {
         doReturn(workerInfoList).when(membershipManager).getCurrentMembership();
 
         // i am leader
-        doReturn(true).when(membershipManager).isLeader();
+        doReturn(true).when(leaderService).isLeader();
 
         callSchedule();
 
@@ -703,7 +749,7 @@ public class SchedulerManagerTest {
                         .setFunctionMetaData(function2).setInstanceId(2).build())
                 .build();
 
-        List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync"));
+        List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("send"));
         Assert.assertEquals(invocations.size(), 3);
         invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value",
                 Object.class));
@@ -722,6 +768,12 @@ public class SchedulerManagerTest {
         assertTrue(allAssignments.contains(assignment2_2));
         assertTrue(allAssignments.contains(assignment2_3));
 
+        // make sure we also directly add the assignment to the in memory assignment cache in function runtime manager
+        verify(functionRuntimeManager, times(3)).processAssignment(any());
+        verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment2_1));
+        verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment2_2));
+        verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment2_3));
+
         // updating assignments
         currentAssignments.get("worker-1").put(FunctionCommon.getFullyQualifiedInstanceId(assignment2_1.getInstance()), assignment2_1);
         currentAssignments.get("worker-1").put(FunctionCommon.getFullyQualifiedInstanceId(assignment2_2.getInstance()), assignment2_2);
@@ -757,7 +809,7 @@ public class SchedulerManagerTest {
 
         callSchedule();
 
-        invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync"));
+        invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("send"));
         Assert.assertEquals(invocations.size(), 6);
         invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value",
                 Object.class));
@@ -774,10 +826,16 @@ public class SchedulerManagerTest {
         assertTrue(allAssignments2.contains(assignment2Updated1));
         assertTrue(allAssignments2.contains(assignment2Updated2));
         assertTrue(allAssignments2.contains(assignment2Updated3));
+
+        // make sure we also directly updated the assignment to the in memory assignment cache in function runtime manager
+        verify(functionRuntimeManager, times(6)).processAssignment(any());
+        verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment2Updated1));
+        verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment2Updated2));
+        verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment2Updated3));
     }
 
     @Test
-    public void testAssignmentWorkerDoesNotExist() throws InterruptedException, NoSuchMethodException, TimeoutException, ExecutionException, InvalidProtocolBufferException {
+    public void testAssignmentWorkerDoesNotExist() throws InterruptedException, NoSuchMethodException, TimeoutException, ExecutionException {
         List<Function.FunctionMetaData> functionMetaDataList = new LinkedList<>();
         long version = 5;
         Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder()
@@ -827,15 +885,15 @@ public class SchedulerManagerTest {
         doReturn(workerInfoList).when(membershipManager).getCurrentMembership();
 
         // i am leader
-        doReturn(true).when(membershipManager).isLeader();
+        doReturn(true).when(leaderService).isLeader();
 
         callSchedule();
 
-        List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync"));
+        List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("send"));
         Assert.assertEquals(invocations.size(), 0);
     }
 
-    private void callSchedule() throws NoSuchMethodException, InterruptedException,
+    private void callSchedule() throws InterruptedException,
             TimeoutException, ExecutionException {
         Future<?> complete = schedulerManager.schedule();