You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2020/06/23 17:20:46 UTC

[pulsar] branch master updated: Various fixes and optimizations for processing assignments in function worker (#7338)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new df0d189  Various fixes and optimizations for processing assignments in function worker (#7338)
df0d189 is described below

commit df0d189d9076745d8f2d97fc54d7fe967ee38528
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Tue Jun 23 10:20:30 2020 -0700

    Various fixes and optimizations for processing assignments in function worker (#7338)
    
    1. FunctionRuntimeManager initialize() should return messageId of last read message which is the position assignment tailer should start reading from
    2. Inefficient use of data structures in processAssignment()
    3. Wait for leader to finish init routine before allowing scheduler to compute new assignments
    4. Start leader service early so that the worker joins the worker membership sooner and its assignments don't get re-scheduled because initialize() routines for function runtime manager and metadata manager may take a while
    
    Co-authored-by: Jerry Peng <je...@splunk.com>
---
 .../pulsar/functions/worker/FunctionActioner.java  |  1 -
 .../functions/worker/FunctionMetaDataManager.java  |  6 ++-
 .../functions/worker/FunctionRuntimeManager.java   | 47 ++++++++++++++-----
 .../pulsar/functions/worker/LeaderService.java     | 31 ++++++++++++-
 .../pulsar/functions/worker/SchedulerManager.java  |  5 +-
 .../pulsar/functions/worker/WorkerService.java     | 27 ++++++-----
 .../worker/FunctionRuntimeManagerTest.java         | 53 ++++++++++++----------
 .../pulsar/functions/worker/LeaderServiceTest.java | 49 ++++++++++++++------
 .../functions/worker/SchedulerManagerTest.java     | 45 +++++++-----------
 9 files changed, 168 insertions(+), 96 deletions(-)

diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index ccf00d5..b43ce0a 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -284,7 +284,6 @@ public class FunctionActioner {
         FunctionDetails details = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails();
         String fqfn = FunctionCommon.getFullyQualifiedName(details);
         log.info("{}-{} Terminating function...", fqfn,functionRuntimeInfo.getFunctionInstance().getInstanceId());
-        FunctionDetails funcDetails = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails();
 
         if (functionRuntimeInfo.getRuntimeSpawner() != null) {
             functionRuntimeInfo.getRuntimeSpawner().close();
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
index 6496f6e..0c4779e 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
@@ -25,7 +25,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
@@ -68,6 +67,9 @@ public class FunctionMetaDataManager implements AutoCloseable {
     @Getter
     boolean isInitializePhase = false;
 
+    @Getter
+    private final CompletableFuture<Void> isInitialized = new CompletableFuture<>();
+
     public FunctionMetaDataManager(WorkerConfig workerConfig,
                                    SchedulerManager schedulerManager,
                                    PulsarClient pulsarClient,
@@ -99,7 +101,7 @@ public class FunctionMetaDataManager implements AutoCloseable {
             }
             this.setInitializePhase(false);
             
-
+            this.isInitialized.complete(null);
         } catch (Exception e) {
             log.error("Failed to initialize meta data store", e);
             throw new RuntimeException(e);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index bdde8a8..c8fb8fd 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -33,6 +33,7 @@ import org.apache.pulsar.common.functions.WorkerInfo;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.functions.auth.FunctionAuthProvider;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.proto.Function;
@@ -47,20 +48,28 @@ import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsPro
 import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.FunctionInstanceId;
-import org.apache.pulsar.common.util.Reflections;
 
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.Status;
-import javax.ws.rs.core.UriBuilder;
 import java.io.IOException;
 import java.net.URI;
-import java.util.*;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.core.UriBuilder;
+
 /**
  * This class managers all aspects of functions assignments and running of function assignments for this worker
  */
@@ -127,6 +136,9 @@ public class FunctionRuntimeManager implements AutoCloseable{
 
     boolean isInitializePhase = false;
 
+    @Getter
+    private final CompletableFuture<Void> isInitialized = new CompletableFuture<>();
+
     private final FunctionMetaDataManager functionMetaDataManager;
 
     private final ErrorNotifier errorNotifier;
@@ -208,8 +220,10 @@ public class FunctionRuntimeManager implements AutoCloseable{
      * Initializes the FunctionRuntimeManager.  Does the following:
      * 1. Consume all existing assignments to establish existing/latest set of assignments
      * 2. After current assignments are read, assignments belonging to this worker will be processed
+     *
+     * @return the message id of the message processed during init phase
      */
-    public void initialize() {
+    public MessageId initialize() {
         try {
             Reader<byte[]> reader = WorkerUtils.createReader(
                     workerService.getClient().newReader(),
@@ -219,9 +233,13 @@ public class FunctionRuntimeManager implements AutoCloseable{
 
             // start init phase
             this.isInitializePhase = true;
+            // keep track of the last message read
+            MessageId lastMessageRead = MessageId.earliest;
             // read all existing messages
             while (reader.hasMessageAvailable()) {
-                processAssignmentMessage(reader.readNext());
+                Message<byte[]> message = reader.readNext();
+                lastMessageRead = message.getMessageId();
+                processAssignmentMessage(message);
             }
             // init phase is done
             this.isInitializePhase = false;
@@ -236,6 +254,9 @@ public class FunctionRuntimeManager implements AutoCloseable{
                     }
                 }
             }
+            // complete future to indicate initialization is complete
+            isInitialized.complete(null);
+            return lastMessageRead;
         } catch (Exception e) {
             log.error("Failed to initialize function runtime manager: {}", e.getMessage(), e);
             throw new RuntimeException(e);
@@ -630,12 +651,14 @@ public class FunctionRuntimeManager implements AutoCloseable{
      */
     public synchronized void processAssignment(Assignment newAssignment) {
 
-        Map<String, Assignment> existingAssignmentMap = new HashMap<>();
+        boolean exists = false;
         for (Map<String, Assignment> entry : this.workerIdToAssignments.values()) {
-            existingAssignmentMap.putAll(entry);
+            if (entry.containsKey(FunctionCommon.getFullyQualifiedInstanceId(newAssignment.getInstance()))) {
+                exists = true;
+            }
         }
 
-        if (existingAssignmentMap.containsKey(FunctionCommon.getFullyQualifiedInstanceId(newAssignment.getInstance()))) {
+        if (exists) {
             updateAssignment(newAssignment);
         } else {
             addAssignment(newAssignment);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
index 3d7ec92..43faccc 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
@@ -35,24 +35,31 @@ public class LeaderService implements AutoCloseable, ConsumerEventListener {
     private final FunctionAssignmentTailer functionAssignmentTailer;
     private final ErrorNotifier errorNotifier;
     private final SchedulerManager schedulerManager;
+    private final FunctionRuntimeManager functionRuntimeManager;
+    private final FunctionMetaDataManager functionMetaDataManager;
     private ConsumerImpl<byte[]> consumer;
     private final WorkerConfig workerConfig;
     private final PulsarClient pulsarClient;
     private final AtomicBoolean isLeader = new AtomicBoolean(false);
+    private final AtomicBoolean leaderInitComplete = new AtomicBoolean(false);
 
     static final String COORDINATION_TOPIC_SUBSCRIPTION = "participants";
 
     private static String WORKER_IDENTIFIER = "id";
-    
+
     public LeaderService(WorkerService workerService,
                          PulsarClient pulsarClient,
                          FunctionAssignmentTailer functionAssignmentTailer,
                          SchedulerManager schedulerManager,
+                         FunctionRuntimeManager functionRuntimeManager,
+                         FunctionMetaDataManager functionMetaDataManager,
                          ErrorNotifier errorNotifier) {
         this.workerConfig = workerService.getWorkerConfig();
         this.pulsarClient = pulsarClient;
         this.functionAssignmentTailer = functionAssignmentTailer;
         this.schedulerManager = schedulerManager;
+        this.functionRuntimeManager = functionRuntimeManager;
+        this.functionMetaDataManager = functionMetaDataManager;
         this.errorNotifier = errorNotifier;
         consumerName = String.format(
                 "%s:%s:%d",
@@ -83,6 +90,13 @@ public class LeaderService implements AutoCloseable, ConsumerEventListener {
         if (isLeader.compareAndSet(false, true)) {
             log.info("Worker {} became the leader.", consumerName);
             try {
+
+                // Wait for worker to be initialized.
+                // We need to do this because LeaderService is started
+                // before FunctionMetadataManager and FunctionRuntimeManager is done initializing
+                functionMetaDataManager.getIsInitialized().get();
+                functionRuntimeManager.getIsInitialized().get();
+
                 // trigger read to the end of the topic and exit
                 // Since the leader can just update its in memory assignments cache directly
                 functionAssignmentTailer.triggerReadToTheEndAndExit().get();
@@ -91,6 +105,9 @@ public class LeaderService implements AutoCloseable, ConsumerEventListener {
                 // make sure scheduler is initialized because this worker
                 // is the leader and may need to start computing and writing assignments
                 schedulerManager.initialize();
+
+                // indicate leader initialization is complete
+                leaderInitComplete.set(true);
             } catch (Throwable th) {
                 log.error("Encountered error when initializing to become leader", th);
                 errorNotifier.triggerError(th);
@@ -114,6 +131,8 @@ public class LeaderService implements AutoCloseable, ConsumerEventListener {
                 } else {
                     functionAssignmentTailer.startFromMessage(schedulerManager.getLastMessageProduced());
                 }
+
+                leaderInitComplete.set(false);
             } catch (Throwable th) {
                 log.error("Encountered error in routine when worker lost leadership", th);
                 errorNotifier.triggerError(th);
@@ -125,6 +144,16 @@ public class LeaderService implements AutoCloseable, ConsumerEventListener {
         return isLeader.get();
     }
 
+    public void waitLeaderInit() {
+        while (!leaderInitComplete.get()) {
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
     @Override
     public void close() throws PulsarClientException {
         if (consumer != null) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index 33213a8..0b8eeb2 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -187,8 +187,9 @@ public class SchedulerManager implements AutoCloseable {
             return CompletableFuture.completedFuture(null);
         }
 
-        // make sure we are initialized before scheduling
-        initialize();
+        // make sure leader is done initializing before starting to compute new assignments
+        // scheduler mananger will also be initialized during that process
+        leaderService.waitLeaderInit();
 
         try {
             return executorService.submit(() -> {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 532bc32..56a2538 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -179,7 +179,6 @@ public class WorkerService {
             }
             this.membershipManager = new MembershipManager(this, client, brokerAdmin);
 
-
             // create function runtime manager
             this.functionRuntimeManager = new FunctionRuntimeManager(
                     workerConfig,
@@ -199,19 +198,26 @@ public class WorkerService {
                     workerConfig,
                     errorNotifier);
 
+            // Start worker early in the worker service init process so that functions don't get re-assigned because
+            // initialize operations of FunctionRuntimeManager and FunctionMetadataManger might take a while
+            this.leaderService = new LeaderService(this,
+              client,
+              functionAssignmentTailer,
+              schedulerManager,
+              functionRuntimeManager,
+              functionMetaDataManager,
+              errorNotifier);
+
+            log.info("/** Start Leader Service **/");
+            leaderService.start();
+
             // initialize function metadata manager
             log.info("/** Initializing Metdata Manager **/");
             functionMetaDataManager.initialize();
 
             // initialize function runtime manager
             log.info("/** Initializing Runtime Manager **/");
-            functionRuntimeManager.initialize();
-
-            this.leaderService = new LeaderService(this,
-                    client,
-                    functionAssignmentTailer,
-                    schedulerManager,
-                    errorNotifier);
+            MessageId lastAssignmentMessageId = functionRuntimeManager.initialize();
 
             // Setting references to managers in scheduler
             schedulerManager.setFunctionMetaDataManager(functionMetaDataManager);
@@ -225,10 +231,7 @@ public class WorkerService {
 
             // Start function assignment tailer
             log.info("/** Starting Function Assignment Tailer **/");
-            functionAssignmentTailer.start();
-
-            log.info("/** Start Leader Service **/");
-            leaderService.start();
+            functionAssignmentTailer.startFromMessage(lastAssignmentMessageId);
             
             // start function metadata manager
             log.info("/** Starting Metdata Manager **/");
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index 54b47f5..c7e8e41 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -18,6 +18,24 @@
  */
 package org.apache.pulsar.functions.worker;
 
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyBoolean;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
 import io.netty.buffer.Unpooled;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.distributedlog.api.namespace.Namespace;
@@ -27,16 +45,17 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntime;
 import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory;
+import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactoryConfig;
 import org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory;
-import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
 import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
-import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactoryConfig;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -49,24 +68,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyBoolean;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.argThat;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
 @Slf4j
 public class FunctionRuntimeManagerTest {
 
@@ -519,16 +520,20 @@ public class FunctionRuntimeManagerTest {
 
         List<Message<byte[]>> messageList = new LinkedList<>();
         PulsarApi.MessageMetadata.Builder msgMetadataBuilder = PulsarApi.MessageMetadata.newBuilder();
-        Message message1 = spy(new MessageImpl("foo", MessageId.latest.toString(),
+
+        MessageId messageId1 = new MessageIdImpl(0, 1, -1);
+        Message message1 = spy(new MessageImpl("foo", messageId1.toString(),
                 new HashMap<>(), Unpooled.copiedBuffer(assignment1.toByteArray()), null, msgMetadataBuilder));
         doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance())).when(message1).getKey();
 
-        Message message2 = spy(new MessageImpl("foo", MessageId.latest.toString(),
+        MessageId messageId2 = new MessageIdImpl(0, 2, -1);
+        Message message2 = spy(new MessageImpl("foo", messageId2.toString(),
                 new HashMap<>(), Unpooled.copiedBuffer(assignment2.toByteArray()), null, msgMetadataBuilder));
         doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment2.getInstance())).when(message2).getKey();
 
         // delete function2
-        Message message3 = spy(new MessageImpl("foo", MessageId.latest.toString(),
+        MessageId messageId3 = new MessageIdImpl(0, 3, -1);
+        Message message3 = spy(new MessageImpl("foo", messageId3.toString(),
                 new HashMap<>(), Unpooled.copiedBuffer("".getBytes()), null, msgMetadataBuilder));
         doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment3.getInstance())).when(message3).getKey();
 
@@ -595,7 +600,7 @@ public class FunctionRuntimeManagerTest {
         doNothing().when(functionActioner).terminateFunction(any(FunctionRuntimeInfo.class));
         functionRuntimeManager.setFunctionActioner(functionActioner);
 
-        functionRuntimeManager.initialize();
+        assertEquals(functionRuntimeManager.initialize(), messageId3);
 
         assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 1);
         verify(functionActioner, times(1)).startFunction(any(FunctionRuntimeInfo.class));
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java
index 445498a..2bfaf81 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java
@@ -18,6 +18,17 @@
  */
 package org.apache.pulsar.functions.worker;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.ConsumerEventListener;
 import org.apache.pulsar.client.api.MessageId;
@@ -36,17 +47,6 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
 
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertTrue;
-
 public class LeaderServiceTest {
 
     private final WorkerConfig workerConfig;
@@ -56,6 +56,11 @@ public class LeaderServiceTest {
     private ConsumerImpl mockConsumer;
     private FunctionAssignmentTailer functionAssignmentTailer;
     private SchedulerManager schedulerManager;
+    private FunctionRuntimeManager functionRuntimeManager;
+    private FunctionMetaDataManager functionMetadataManager;
+    private CompletableFuture metadataManagerInitFuture;
+    private CompletableFuture runtimeManagerInitFuture;
+    private CompletableFuture readToTheEndAndExitFuture;
 
     public LeaderServiceTest() {
         this.workerConfig = new WorkerConfig();
@@ -99,11 +104,21 @@ public class LeaderServiceTest {
 
         schedulerManager = mock(SchedulerManager.class);
 
-
         functionAssignmentTailer = mock(FunctionAssignmentTailer.class);
-        when(functionAssignmentTailer.triggerReadToTheEndAndExit()).thenReturn(CompletableFuture.completedFuture(null));
+        readToTheEndAndExitFuture = mock(CompletableFuture.class);
+        when(functionAssignmentTailer.triggerReadToTheEndAndExit()).thenReturn(readToTheEndAndExitFuture);
+
+        functionRuntimeManager = mock(FunctionRuntimeManager.class);
+        functionMetadataManager = mock(FunctionMetaDataManager.class);
 
-        leaderService = spy(new LeaderService(workerService, mockClient, functionAssignmentTailer, schedulerManager, ErrorNotifier.getDefaultImpl()));
+        metadataManagerInitFuture = mock(CompletableFuture.class);
+        runtimeManagerInitFuture = mock(CompletableFuture.class);
+
+        when(functionMetadataManager.getIsInitialized()).thenReturn(metadataManagerInitFuture);
+        when(functionRuntimeManager.getIsInitialized()).thenReturn(runtimeManagerInitFuture);
+
+        leaderService = spy(new LeaderService(workerService, mockClient, functionAssignmentTailer, schedulerManager,
+          functionRuntimeManager, functionMetadataManager,  ErrorNotifier.getDefaultImpl()));
         leaderService.start();
     }
 
@@ -118,6 +133,11 @@ public class LeaderServiceTest {
         listenerHolder.get().becameActive(mockConsumer, 0);
         assertTrue(leaderService.isLeader());
 
+        verify(functionMetadataManager, times(1)).getIsInitialized();
+        verify(metadataManagerInitFuture, times(1)).get();
+        verify(functionRuntimeManager, times(1)).getIsInitialized();
+        verify(runtimeManagerInitFuture, times(1)).get();
+
         verify(functionAssignmentTailer, times(1)).triggerReadToTheEndAndExit();
         verify(functionAssignmentTailer, times(1)).close();
         verify(schedulerManager, times((1))).initialize();
@@ -140,6 +160,7 @@ public class LeaderServiceTest {
         assertTrue(leaderService.isLeader());
 
         verify(functionAssignmentTailer, times(1)).triggerReadToTheEndAndExit();
+        verify(readToTheEndAndExitFuture, times(1)).get();
         verify(functionAssignmentTailer, times(1)).close();
         verify(schedulerManager, times((1))).initialize();
 
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index e72b81e..ab59cfd 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -18,8 +18,21 @@
  */
 package org.apache.pulsar.functions.worker;
 
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyBoolean;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.protobuf.InvalidProtocolBufferException;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import io.prometheus.client.CollectorRegistry;
@@ -34,12 +47,11 @@ import org.apache.pulsar.common.functions.WorkerInfo;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.Assignment;
-import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
 import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
 import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler;
-import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
 import org.mockito.invocation.Invocation;
 import org.testng.Assert;
@@ -49,42 +61,17 @@ import org.testng.annotations.Test;
 
 import java.lang.reflect.Method;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import static org.mockito.ArgumentMatchers.argThat;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyBoolean;
-import static org.mockito.Mockito.anyInt;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
 
 @Slf4j
 public class SchedulerManagerTest {
@@ -895,6 +882,8 @@ public class SchedulerManagerTest {
 
     private void callSchedule() throws InterruptedException,
             TimeoutException, ExecutionException {
+
+        schedulerManager.initialize();
         Future<?> complete = schedulerManager.schedule();
 
         complete.get(30, TimeUnit.SECONDS);