You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2020/06/26 14:49:12 UTC

[pulsar] branch master updated: Re-work Function MetaDataManager to make all metadata writes only by the leader (#7255)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c83a656  Re-work Function MetaDataManager to make all metadata writes only by the leader (#7255)
c83a656 is described below

commit c83a6563f9e0c71d7a5fd62fdece3e29128bbae0
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Fri Jun 26 07:49:02 2020 -0700

    Re-work Function MetaDataManager to make all metadata writes only by the leader (#7255)
    
    * Function workers re-direct call update requests to the leader
    
    * Fixed test
    
    * tests pass
    
    * Working version
    
    * Fix test
    
    * Short circuit update
    
    * Fix test
    
    * Fix test
    
    * Fix tests
    
    * Added one more catch
    
    * Added one more catch
    
    * Seperated internal and external errors
    
    * Fix test
    
    * Address feedback
    
    * Do not expose updateOnLeader to functions
    
    * hide api
    
    * hide api
    
    * removed duplicate comments
    
    * Do leadership changes in function metadata manager
    
    * make the function sync
    
    * Added more comments
    
    * Throw error
    
    * Changed name
    
    * address comments
    
    * Deleted unused classes
    
    * Rework metadata manager
    
    * Working
    
    * Fix test
    
    * A better way for test
    
    * Address feedback
    
    Co-authored-by: Sanjeev Kulkarni <sa...@splunk.com>
---
 .../pulsar/broker/admin/impl/FunctionsBase.java    |  22 +
 .../worker/PulsarFunctionE2ESecurityTest.java      |   2 +-
 .../worker/PulsarFunctionPublishTest.java          |   1 +
 .../client/admin/internal/FunctionsImpl.java       |  48 +++
 .../functions/utils/FunctionMetaDataUtils.java     |   4 +-
 .../functions/utils/FunctionMetaDataUtilsTest.java |   4 +-
 .../functions/worker/FunctionMetaDataManager.java  | 356 ++++++++--------
 .../worker/FunctionMetaDataTopicTailer.java        |  71 +++-
 .../pulsar/functions/worker/LeaderService.java     |   5 +
 .../pulsar/functions/worker/WorkerService.java     |   1 +
 .../functions/worker/request/RequestResult.java    |  40 --
 .../worker/request/ServiceRequestInfo.java         |  46 ---
 .../worker/request/ServiceRequestManager.java      |  53 ---
 .../worker/request/ServiceRequestUtils.java        |  61 ---
 .../functions/worker/rest/api/ComponentImpl.java   |  97 ++---
 .../functions/worker/rest/api/FunctionsImpl.java   |  60 ++-
 .../functions/worker/rest/api/SinksImpl.java       |   4 +-
 .../functions/worker/rest/api/SourcesImpl.java     |   4 +-
 .../worker/rest/api/v3/FunctionsApiV3Resource.java |  22 +
 .../worker/FunctionMetaDataManagerTest.java        | 449 ++++++---------------
 .../worker/FunctionMetaDataTopicTailerTest.java    |  16 +-
 .../worker/request/ServiceRequestManagerTest.java  |  70 ----
 .../rest/api/v2/FunctionApiV2ResourceTest.java     | 122 ++----
 .../rest/api/v3/FunctionApiV3ResourceTest.java     | 118 ++----
 .../worker/rest/api/v3/SinkApiV3ResourceTest.java  | 107 ++---
 .../rest/api/v3/SourceApiV3ResourceTest.java       | 105 ++---
 26 files changed, 699 insertions(+), 1189 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 0529c04..f77528a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -47,6 +47,7 @@ import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
 import javax.ws.rs.core.StreamingOutput;
 import java.io.IOException;
 import java.io.InputStream;
@@ -692,4 +693,25 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
     public List<ConnectorDefinition> getConnectorsList() throws IOException {
         return functions.getListOfConnectors();
     }
+
+    @PUT
+    @ApiOperation(value = "Updates a Pulsar Function on the worker leader", hidden = true)
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have super-user permissions"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 307, message = "Redirecting to the worker leader"),
+            @ApiResponse(code = 200, message = "Pulsar Function successfully updated")
+    })
+    @Path("/leader/{tenant}/{namespace}/{functionName}")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public void updateFunctionOnWorkerLeader(final @PathParam("tenant") String tenant,
+                                                 final @PathParam("namespace") String namespace,
+                                                 final @PathParam("functionName") String functionName,
+                                                 final @FormDataParam("functionMetaData") InputStream uploadedInputStream,
+                                                 final @FormDataParam("delete") boolean delete) {
+
+        functions.updateFunctionOnWorkerLeader(tenant, namespace, functionName, uploadedInputStream,
+                delete, uri.getRequestUri(), clientAppId());
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
index 48c40f3..03f9bd7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
@@ -201,7 +201,7 @@ public class PulsarFunctionE2ESecurityTest {
         superUserAdmin.tenants().createTenant(TENANT2, propAdmin);
         superUserAdmin.namespaces().createNamespace( TENANT2 + "/" + NAMESPACE);
 
-        Thread.sleep(100);
+        functionWorkerService.get().getLeaderService().waitLeaderInit();
     }
 
     @AfterMethod
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
index cce8ce8..90882bb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
@@ -206,6 +206,7 @@ public class PulsarFunctionPublishTest {
         System.setProperty(JAVA_INSTANCE_JAR_PROPERTY,
                 FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath());
 
+        functionWorkerService.get().getLeaderService().waitLeaderInit();
     }
 
     @AfterMethod
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 0e081d4..577f635 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -63,6 +63,7 @@ import org.asynchttpclient.AsyncHttpClient;
 import org.asynchttpclient.HttpResponseBodyPart;
 import org.asynchttpclient.HttpResponseStatus;
 import org.asynchttpclient.RequestBuilder;
+import org.asynchttpclient.request.body.multipart.ByteArrayPart;
 import org.asynchttpclient.request.body.multipart.FilePart;
 import org.asynchttpclient.request.body.multipart.StringPart;
 import org.glassfish.jersey.media.multipart.FormDataBodyPart;
@@ -984,4 +985,51 @@ public class FunctionsImpl extends ComponentResource implements Functions {
         }
         return future;
     }
+
+    public void updateOnWorkerLeader(String tenant, String namespace,
+                                     String function, byte[] functionMetaData,
+                                     boolean delete) throws PulsarAdminException {
+        try {
+            updateOnWorkerLeaderAsync(tenant, namespace, function,
+                    functionMetaData, delete).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    public CompletableFuture<Void> updateOnWorkerLeaderAsync(String tenant, String namespace,
+                                                             String function, byte[] functionMetaData,
+                                                             boolean delete) {
+        final CompletableFuture<Void> future = new CompletableFuture<>();
+        try {
+            RequestBuilder builder =
+                    put(functions.path("leader").path(tenant).path(namespace)
+                            .path(function).getUri().toASCIIString())
+                            .addBodyPart(new ByteArrayPart("functionMetaData", functionMetaData))
+                    .addBodyPart(new StringPart("delete", Boolean.toString(delete)));
+
+            asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build())
+                    .toCompletableFuture()
+                    .thenAccept(response -> {
+                        if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
+                            future.completeExceptionally(
+                                    getApiException(Response
+                                            .status(response.getStatusCode())
+                                            .entity(response.getResponseBody())
+                                            .build()));
+                        } else {
+                            future.complete(null);
+                        }
+                    });
+
+        } catch (Exception e) {
+            future.completeExceptionally(e);
+        }
+        return future;
+    }
 }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionMetaDataUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionMetaDataUtils.java
index 530b887..46766ef 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionMetaDataUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionMetaDataUtils.java
@@ -66,8 +66,8 @@ public class FunctionMetaDataUtils {
         return builder.build();
     }
 
-    public static Function.FunctionMetaData generateUpdatedMetadata(Function.FunctionMetaData existingMetaData,
-                                                                    Function.FunctionMetaData updatedMetaData) {
+    public static Function.FunctionMetaData incrMetadataVersion(Function.FunctionMetaData existingMetaData,
+                                                                Function.FunctionMetaData updatedMetaData) {
         long version = 0;
         if (existingMetaData != null) {
             version = existingMetaData.getVersion() + 1;
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionMetaDataUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionMetaDataUtilsTest.java
index d708aca..4e09c05 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionMetaDataUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionMetaDataUtilsTest.java
@@ -85,11 +85,11 @@ public class FunctionMetaDataUtilsTest {
                 Function.FunctionDetails.newBuilder().setName("func-1").setParallelism(2)).setVersion(version).build();
         Function.FunctionMetaData updatedMetaData = Function.FunctionMetaData.newBuilder().setFunctionDetails(
                 Function.FunctionDetails.newBuilder().setName("func-1").setParallelism(3)).setVersion(version).build();
-        Function.FunctionMetaData newMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(existingMetaData, updatedMetaData);
+        Function.FunctionMetaData newMetaData = FunctionMetaDataUtils.incrMetadataVersion(existingMetaData, updatedMetaData);
         Assert.assertEquals(newMetaData.getVersion(), version + 1);
         Assert.assertEquals(newMetaData.getFunctionDetails().getParallelism(), 3);
 
-        newMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(null, newMetaData);
+        newMetaData = FunctionMetaDataUtils.incrMetadataVersion(null, newMetaData);
         Assert.assertEquals(newMetaData.getVersion(), 0);
     }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
index 0c4779e..795a735 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
@@ -19,31 +19,37 @@
 package org.apache.pulsar.functions.worker;
 
 import com.google.common.annotations.VisibleForTesting;
-import lombok.Getter;
-import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
+import lombok.Getter;
+import org.apache.pulsar.client.api.*;
 import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
 import org.apache.pulsar.functions.proto.Request;
-import org.apache.pulsar.functions.utils.FunctionMetaDataUtils;
-import org.apache.pulsar.functions.worker.request.RequestResult;
-import org.apache.pulsar.functions.worker.request.ServiceRequestInfo;
-import org.apache.pulsar.functions.worker.request.ServiceRequestManager;
-import org.apache.pulsar.functions.worker.request.ServiceRequestUtils;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
- * This class maintains a global state of all function metadata and is responsible for serving function metadata
+ * FunctionMetaDataManager maintains a global state of all function metadata.
+ * It is the system of record for the worker for function metadata.
+ * FunctionMetaDataManager operates in either the leader mode or worker mode.
+ * By default, when you initialize and start manager, it starts in the worker mode.
+ * In the worker mode, the FunctionMetaDataTailer tails the function metadata topic
+ * and updates the in-memory metadata cache.
+ * When the worker becomes a leader, it calls the acquireLeadaership thru which
+ * the FunctionMetaData Manager switches to a leader mode. In the leader mode
+ * the manager first captures an exclusive producer on the the metadata topic.
+ * Then it drains the MetaDataTailer to ensure that it has caught up to the last record.
+ * After this point, the worker can update the in-memory state of function metadata
+ * by calling processUpdate/processDeregister methods.
+ * If a worker loses its leadership, it calls giveupLeaderShip at which time the
+ * manager closes its exclusive producer and starts its tailer again.
  */
 @Slf4j
 public class FunctionMetaDataManager implements AutoCloseable {
@@ -52,23 +58,20 @@ public class FunctionMetaDataManager implements AutoCloseable {
     @VisibleForTesting
     final Map<String, Map<String, Map<String, FunctionMetaData>>> functionMetaDataMap = new ConcurrentHashMap<>();
 
-    // A map in which the key is the service request id and value is the service request
-    private final Map<String, ServiceRequestInfo> pendingServiceRequests = new ConcurrentHashMap<>();
-
-    private final ServiceRequestManager serviceRequestManager;
     private final SchedulerManager schedulerManager;
     private final WorkerConfig workerConfig;
     private final PulsarClient pulsarClient;
     private final ErrorNotifier errorNotifier;
 
     private FunctionMetaDataTopicTailer functionMetaDataTopicTailer;
-
-    @Setter
-    @Getter
-    boolean isInitializePhase = false;
+    // The producer of the metadata topic when we are the leader.
+    // Note that this variable serves a double duty. A non-null value
+    // implies we are the leader, while a null value means we are not the leader
+    private Producer exclusiveLeaderProducer;
+    private MessageId lastMessageSeen = MessageId.earliest;
 
     @Getter
-    private final CompletableFuture<Void> isInitialized = new CompletableFuture<>();
+    private CompletableFuture<Void> isInitialized = new CompletableFuture<>();
 
     public FunctionMetaDataManager(WorkerConfig workerConfig,
                                    SchedulerManager schedulerManager,
@@ -76,10 +79,9 @@ public class FunctionMetaDataManager implements AutoCloseable {
                                    ErrorNotifier errorNotifier) throws PulsarClientException {
         this.workerConfig = workerConfig;
         this.pulsarClient = pulsarClient;
-        this.serviceRequestManager = getServiceRequestManager(
-                this.pulsarClient, this.workerConfig.getFunctionMetadataTopic());
         this.schedulerManager = schedulerManager;
         this.errorNotifier = errorNotifier;
+        exclusiveLeaderProducer = null;
     }
 
     /**
@@ -87,32 +89,44 @@ public class FunctionMetaDataManager implements AutoCloseable {
      */
 
     /**
-     * Initializes the FunctionMetaDataManager.  Does the following:
-     * 1. Consume all existing function meta data upon start to establish existing state
+     * Initializes the FunctionMetaDataManager.
+     * We create a new reader
      */
-    public void initialize() {
+    public synchronized void initialize() {
         try {
-            this.functionMetaDataTopicTailer = new FunctionMetaDataTopicTailer(this,
-                    pulsarClient.newReader(), this.workerConfig, this.errorNotifier);
             // read all existing messages
-            this.setInitializePhase(true);
-            while (this.functionMetaDataTopicTailer.getReader().hasMessageAvailable()) {
-                this.functionMetaDataTopicTailer.processRequest(this.functionMetaDataTopicTailer.getReader().readNext());
+            Reader reader = FunctionMetaDataTopicTailer.createReader(workerConfig, pulsarClient.newReader(), MessageId.earliest);
+            while (reader.hasMessageAvailable()) {
+                processMetaDataTopicMessage(reader.readNext());
             }
-            this.setInitializePhase(false);
-            
             this.isInitialized.complete(null);
         } catch (Exception e) {
             log.error("Failed to initialize meta data store", e);
-            throw new RuntimeException(e);
+            throw new RuntimeException("Failed to initialize Metadata Manager", e);
         }
+        log.info("FunctionMetaData Manager initialization complete");
     }
-    
-    public void start() {
-        // schedule functions if necessary
-        this.schedulerManager.schedule();
-        // start function metadata tailer
-        this.functionMetaDataTopicTailer.start();
+
+    // Starts the tailer if we are in non-leader mode
+    public synchronized void start() {
+        if (exclusiveLeaderProducer == null) {
+            try {
+                // This means that we are in non-leader mode. start function metadata tailer
+                initializeTailer();
+            } catch (PulsarClientException e) {
+                throw new RuntimeException("Could not start MetaData topic tailer", e);
+            }
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (this.functionMetaDataTopicTailer != null) {
+            this.functionMetaDataTopicTailer.close();
+        }
+        if (this.exclusiveLeaderProducer != null) {
+            this.exclusiveLeaderProducer.close();
+        }
     }
 
     /**
@@ -174,89 +188,126 @@ public class FunctionMetaDataManager implements AutoCloseable {
     }
 
     /**
-     * Sends an update request to the FMT (Function Metadata Topic)
-     * @param functionMetaData The function metadata that needs to be updated
-     * @return a completable future of when the update has been applied
+     * Called by the worker when we are in the leader mode.  In this state, we update our in-memory
+     * data structures and then write to the metadata topic.
+     * @param functionMetaData The function metadata in question
+     * @param delete Is this a delete operation
+     * @throws IllegalStateException if we are not the leader
+     * @throws IllegalArgumentException if the request is out of date.
      */
-    public synchronized CompletableFuture<RequestResult> updateFunction(FunctionMetaData functionMetaData) {
-
-        FunctionMetaData existingFunctionMetadata = null;
-        if (containsFunction(functionMetaData.getFunctionDetails().getTenant(),
-                functionMetaData.getFunctionDetails().getNamespace(),
-                functionMetaData.getFunctionDetails().getName())) {
-            existingFunctionMetadata = getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(),
-                    functionMetaData.getFunctionDetails().getNamespace(),
-                    functionMetaData.getFunctionDetails().getName());
+    public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaData, boolean delete)
+            throws IllegalStateException, IllegalArgumentException {
+        if (exclusiveLeaderProducer == null) {
+            throw new IllegalStateException("Not the leader");
+        }
+        boolean needsScheduling;
+        if (delete) {
+            needsScheduling = proccessDeregister(functionMetaData);
+        } else {
+            needsScheduling = processUpdate(functionMetaData);
+        }
+        Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
+                .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
+                .setFunctionMetaData(functionMetaData)
+                .setWorkerId(workerConfig.getWorkerId())
+                .setRequestId(UUID.randomUUID().toString())
+                .build();
+        try {
+            lastMessageSeen = exclusiveLeaderProducer.send(serviceRequest.toByteArray());
+        } catch (Exception e) {
+            log.error("Could not write into Function Metadata topic", e);
+            throw new IllegalStateException("Internal Error updating function at the leader", e);
+        }
+        if (needsScheduling) {
+            this.schedulerManager.schedule();
         }
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(existingFunctionMetadata, functionMetaData);
-
-        Request.ServiceRequest updateRequest = ServiceRequestUtils.getUpdateRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
-
-        return submit(updateRequest);
     }
 
-
     /**
-     * Sends a deregister request to the FMT (Function Metadata Topic) for a function
-     * @param tenant the tenant the function that needs to be deregistered belongs to
-     * @param namespace the namespace the function that needs to be deregistered belongs to
-     * @param functionName the name of the function
-     * @return a completable future of when the deregister has been applied
+     * Called by the leader service when this worker becomes the leader.
+     * We first get exclusive producer on the metadata topic. Next we drain the tailer
+     * to ensure that we have caught up to metadata topic. After which we close the tailer.
+     * Note that this method cannot be syncrhonized because the tailer might still be processing messages
      */
-    public synchronized CompletableFuture<RequestResult> deregisterFunction(String tenant, String namespace, String functionName) {
-        FunctionMetaData functionMetaData = this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(functionMetaData, functionMetaData);
-
-        Request.ServiceRequest deregisterRequest = ServiceRequestUtils.getDeregisterRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
+    public void acquireLeadership() {
+        log.info("FunctionMetaDataManager becoming leader by creating exclusive producer");
+        FunctionMetaDataTopicTailer tailer = internalAcquireLeadership();
+        // Now that we have created the exclusive producer, wait for reader to get over
+        if (tailer != null) {
+            try {
+                tailer.stopWhenNoMoreMessages().get();
+            } catch (Exception e) {
+                log.error("Error while waiting for metadata tailer thread to finish", e);
+                errorNotifier.triggerError(e);
+            }
+            tailer.close();
+        }
+        log.info("FunctionMetaDataManager done becoming leader");
+    }
 
-        return submit(deregisterRequest);
+    private synchronized FunctionMetaDataTopicTailer internalAcquireLeadership() {
+        if (exclusiveLeaderProducer == null) {
+            try {
+                exclusiveLeaderProducer = pulsarClient.newProducer()
+                        .topic(this.workerConfig.getFunctionMetadataTopic())
+                        .producerName(workerConfig.getWorkerId() + "-leader")
+                        // .type(EXCLUSIVE)
+                        .create();
+            } catch (PulsarClientException e) {
+                log.error("Error creating exclusive producer", e);
+                errorNotifier.triggerError(e);
+            }
+        } else {
+            log.error("Logic Error in FunctionMetaData Manager");
+            errorNotifier.triggerError(new IllegalStateException());
+        }
+        FunctionMetaDataTopicTailer tailer = this.functionMetaDataTopicTailer;
+        this.functionMetaDataTopicTailer = null;
+        return tailer;
     }
 
     /**
-     * Sends a start/stop function request to the FMT (Function Metadata Topic) for a function
-     * @param tenant the tenant the function that needs to be deregistered belongs to
-     * @param namespace the namespace the function that needs to be deregistered belongs to
-     * @param functionName the name of the function
-     * @param instanceId the instanceId of the function, -1 if for all instances
-     * @param start do we need to start or stop
-     * @return a completable future of when the start/stop has been applied
+     * called by the leader service when we lose leadership. We close the exclusive producer
+     * and start the tailer.
      */
-    public synchronized CompletableFuture<RequestResult> changeFunctionInstanceStatus(String tenant, String namespace, String functionName,
-                                                                                      Integer instanceId, boolean start) {
-        FunctionMetaData functionMetaData = this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.changeFunctionInstanceStatus(functionMetaData, instanceId, start);
-
-        Request.ServiceRequest updateRequest = ServiceRequestUtils.getUpdateRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
-
-        return submit(updateRequest);
+    public synchronized void giveupLeadership() {
+        log.info("FunctionMetaDataManager giving up leadership by closing exclusive producer");
+        try {
+            exclusiveLeaderProducer.close();
+            exclusiveLeaderProducer = null;
+            initializeTailer();
+        } catch (PulsarClientException e) {
+            log.error("Error closing exclusive producer", e);
+            errorNotifier.triggerError(e);
+        }
     }
 
     /**
-     * Processes a request received from the FMT (Function Metadata Topic)
-     * @param messageId The message id of the request
-     * @param serviceRequest The request
+     * This is called by the MetaData tailer. It updates the in-memory cache.
+     * It eats up any exception thrown by processUpdate/processDeregister since
+     * that's just part of the state machine
+     * @param message The message read from metadata topic that needs to be processed
      */
-    public void processRequest(MessageId messageId, Request.ServiceRequest serviceRequest) {
-
-        // make sure that processing requests don't happen simultaneously
-        synchronized (this) {
+    public void processMetaDataTopicMessage(Message<byte[]> message) throws IOException {
+        try {
+            Request.ServiceRequest serviceRequest = Request.ServiceRequest.parseFrom(message.getData());
+            if (log.isDebugEnabled()) {
+                log.debug("Received Service Request: {}", serviceRequest);
+            }
             switch (serviceRequest.getServiceRequestType()) {
                 case UPDATE:
-                    this.processUpdate(serviceRequest);
+                    this.processUpdate(serviceRequest.getFunctionMetaData());
                     break;
                 case DELETE:
-                    this.proccessDeregister(serviceRequest);
+                    this.proccessDeregister(serviceRequest.getFunctionMetaData());
                     break;
                 default:
                     log.warn("Received request with unrecognized type: {}", serviceRequest);
             }
+        } catch (IllegalArgumentException e) {
+            // Its ok. Nothing much we can do about it
         }
+        lastMessageSeen = message.getMessageId();
     }
 
     /**
@@ -284,48 +335,38 @@ public class FunctionMetaDataManager implements AutoCloseable {
     }
 
     @VisibleForTesting
-    synchronized void proccessDeregister(Request.ServiceRequest deregisterRequest) {
+    synchronized boolean proccessDeregister(FunctionMetaData deregisterRequestFs) throws IllegalArgumentException {
 
-        FunctionMetaData deregisterRequestFs = deregisterRequest.getFunctionMetaData();
         String functionName = deregisterRequestFs.getFunctionDetails().getName();
         String tenant = deregisterRequestFs.getFunctionDetails().getTenant();
         String namespace = deregisterRequestFs.getFunctionDetails().getNamespace();
 
         boolean needsScheduling = false;
 
-        log.debug("Process deregister request: {}", deregisterRequest);
+        log.debug("Process deregister request: {}", deregisterRequestFs);
 
         // Check if we still have this function. Maybe already deleted by someone else
         if (this.containsFunctionMetaData(deregisterRequestFs)) {
             // check if request is outdated
-            if (!isRequestOutdated(deregisterRequest)) {
+            if (!isRequestOutdated(deregisterRequestFs)) {
                 this.functionMetaDataMap.get(tenant).get(namespace).remove(functionName);
-                completeRequest(deregisterRequest, true);
                 needsScheduling = true;
             } else {
                 if (log.isDebugEnabled()) {
                     log.debug("{}/{}/{} Ignoring outdated request version: {}", tenant, namespace, functionName,
-                            deregisterRequest.getFunctionMetaData().getVersion());
+                            deregisterRequestFs.getVersion());
                 }
-                completeRequest(deregisterRequest, false,
-                        "Request ignored because it is out of date. Please try again.");
+                throw new IllegalArgumentException("Delete request ignored because it is out of date. Please try again.");
             }
-        } else {
-            // already deleted so  just complete request
-            completeRequest(deregisterRequest, true);
         }
 
-        if (!this.isInitializePhase() && needsScheduling) {
-            this.schedulerManager.schedule();
-        }
+        return needsScheduling;
     }
 
     @VisibleForTesting
-    synchronized void processUpdate(Request.ServiceRequest updateRequest) {
+    synchronized boolean processUpdate(FunctionMetaData updateRequestFs) throws IllegalArgumentException {
 
-        log.debug("Process update request: {}", updateRequest);
-
-        FunctionMetaData updateRequestFs = updateRequest.getFunctionMetaData();
+        log.debug("Process update request: {}", updateRequestFs);
 
         boolean needsScheduling = false;
 
@@ -334,52 +375,23 @@ public class FunctionMetaDataManager implements AutoCloseable {
             // Since this is the first time worker has seen function, just put it into internal function metadata store
             setFunctionMetaData(updateRequestFs);
             needsScheduling = true;
-            completeRequest(updateRequest, true);
         } else {
             // The request is an update to an existing function since this worker already has a record of this function
             // in its function metadata store
             // Check if request is outdated
-            if (!isRequestOutdated(updateRequest)) {
+            if (!isRequestOutdated(updateRequestFs)) {
                 // update the function metadata
                 setFunctionMetaData(updateRequestFs);
                 needsScheduling = true;
-                completeRequest(updateRequest, true);
             } else {
-                completeRequest(updateRequest, false,
-                        "Request ignored because it is out of date. Please try again.");
+                throw new IllegalArgumentException("Update request ignored because it is out of date. Please try again.");
             }
         }
 
-        if (!this.isInitializePhase() && needsScheduling) {
-            this.schedulerManager.schedule();
-        }
+        return needsScheduling;
     }
 
-    /**
-     * Complete requests that this worker has pending
-     * @param serviceRequest
-     * @param isSuccess
-     * @param message
-     */
-    private void completeRequest(Request.ServiceRequest serviceRequest, boolean isSuccess, String message) {
-        ServiceRequestInfo pendingServiceRequestInfo
-                = this.pendingServiceRequests.getOrDefault(
-                serviceRequest.getRequestId(), null);
-        if (pendingServiceRequestInfo != null) {
-            RequestResult requestResult = new RequestResult();
-            requestResult.setSuccess(isSuccess);
-            requestResult.setMessage(message);
-            pendingServiceRequestInfo.getRequestResultCompletableFuture().complete(requestResult);
-        }
-    }
-
-    private void completeRequest(Request.ServiceRequest serviceRequest, boolean isSuccess) {
-        completeRequest(serviceRequest, isSuccess, null);
-    }
-
-
-    private boolean isRequestOutdated(Request.ServiceRequest serviceRequest) {
-        FunctionMetaData requestFunctionMetaData = serviceRequest.getFunctionMetaData();
+    private boolean isRequestOutdated(FunctionMetaData requestFunctionMetaData) {
         Function.FunctionDetails functionDetails = requestFunctionMetaData.getFunctionDetails();
         FunctionMetaData currentFunctionMetaData = this.functionMetaDataMap.get(functionDetails.getTenant())
                 .get(functionDetails.getNamespace()).get(functionDetails.getName());
@@ -401,44 +413,10 @@ public class FunctionMetaDataManager implements AutoCloseable {
                 .get(functionDetails.getNamespace()).put(functionDetails.getName(), functionMetaData);
     }
 
-    @VisibleForTesting
-    CompletableFuture<RequestResult> submit(Request.ServiceRequest serviceRequest) {
-        ServiceRequestInfo serviceRequestInfo = ServiceRequestInfo.of(serviceRequest);
-        CompletableFuture<MessageId> messageIdCompletableFuture = this.serviceRequestManager.submitRequest(serviceRequest);
-
-        serviceRequestInfo.setCompletableFutureRequestMessageId(messageIdCompletableFuture);
-        CompletableFuture<RequestResult> requestResultCompletableFuture = new CompletableFuture<>();
-
-        serviceRequestInfo.setRequestResultCompletableFuture(requestResultCompletableFuture);
-
-        this.pendingServiceRequests.put(serviceRequestInfo.getServiceRequest().getRequestId(), serviceRequestInfo);
-        
-        messageIdCompletableFuture.exceptionally(ex -> {
-            FunctionDetails metadata = serviceRequest.getFunctionMetaData().getFunctionDetails();
-            log.warn("Failed to submit function metadata for {}/{}/{}-{}", metadata.getTenant(),
-                    metadata.getNamespace(), metadata.getName(), ex.getMessage());
-            serviceRequestInfo.getRequestResultCompletableFuture()
-                    .completeExceptionally(new RuntimeException("Failed to submit function metadata"));
-            return null;
-        });
-
-        return requestResultCompletableFuture;
-    }
-
-    @Override
-    public void close() throws Exception {
-        if (this.functionMetaDataTopicTailer != null) {
-            this.functionMetaDataTopicTailer.close();
-        }
-        if (this.serviceRequestManager != null) {
-            this.serviceRequestManager.close();
-        }
-    }
-
-    private ServiceRequestManager getServiceRequestManager(PulsarClient pulsarClient, String functionMetadataTopic) throws PulsarClientException {
-        return new ServiceRequestManager(pulsarClient.newProducer()
-                .topic(functionMetadataTopic)
-                .producerName(workerConfig.getWorkerId() + "-function-metadata-manager")
-                .create());
+    private void initializeTailer() throws PulsarClientException {
+        this.functionMetaDataTopicTailer = new FunctionMetaDataTopicTailer(this,
+                pulsarClient.newReader(), this.workerConfig, lastMessageSeen, this.errorNotifier);
+        this.functionMetaDataTopicTailer.start();
+        log.info("MetaData Manager Tailer started");
     }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
index f0626ce..8b51971 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.functions.worker;
 
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -39,21 +41,20 @@ public class FunctionMetaDataTopicTailer
     private final Thread readerThread;
     private volatile boolean running;
     private ErrorNotifier errorNotifier;
+    private volatile boolean stopOnNoMessageAvailable;
+    private CompletableFuture<Void> exitFuture = new CompletableFuture<>();
 
     public FunctionMetaDataTopicTailer(FunctionMetaDataManager functionMetaDataManager,
                                        ReaderBuilder readerBuilder, WorkerConfig workerConfig,
+                                       MessageId lastMessageSeen,
                                        ErrorNotifier errorNotifier)
             throws PulsarClientException {
         this.functionMetaDataManager = functionMetaDataManager;
-        this.reader = readerBuilder
-                .topic(workerConfig.getFunctionMetadataTopic())
-                .startMessageId(MessageId.earliest)
-                .readerName(workerConfig.getWorkerId() + "-function-metadata-tailer")
-                .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-metadata-tailer")
-                .create();
+        this.reader = createReader(workerConfig, readerBuilder, lastMessageSeen);
         readerThread = new Thread(this);
         readerThread.setName("function-metadata-tailer-thread");
         this.errorNotifier = errorNotifier;
+        stopOnNoMessageAvailable = false;
     }
 
     public void start() {
@@ -63,10 +64,22 @@ public class FunctionMetaDataTopicTailer
 
     @Override
     public void run() {
-        while(running) {
+        while (running) {
+            if (stopOnNoMessageAvailable) {
+                try {
+                    if (!reader.hasMessageAvailable()) {
+                        break;
+                    }
+                } catch (PulsarClientException e) {
+                    log.error("Received exception while testing hasMessageAvailable", e);
+                    errorNotifier.triggerError(e);
+                }
+            }
             try {
-                Message<byte[]> msg = reader.readNext();
-                processRequest(msg);
+                Message<byte[]> msg = reader.readNext(5, TimeUnit.SECONDS);
+                if (msg != null) {
+                    this.functionMetaDataManager.processMetaDataTopicMessage(msg);
+                }
             } catch (Throwable th) {
                 if (running) {
                     log.error("Encountered error in metadata tailer", th);
@@ -77,10 +90,16 @@ public class FunctionMetaDataTopicTailer
                     if (!(th instanceof InterruptedException || th.getCause() instanceof InterruptedException)) {
                         log.warn("Encountered error when metadata tailer is not running", th);
                     }
-                    return;
                 }
             }
         }
+        log.info("metadata tailer thread exiting");
+        exitFuture.complete(null);
+    }
+
+    public CompletableFuture<Void> stopWhenNoMoreMessages() {
+        stopOnNoMessageAvailable = true;
+        return exitFuture;
     }
 
     @Override
@@ -88,23 +107,35 @@ public class FunctionMetaDataTopicTailer
         log.info("Stopping function metadata tailer");
         try {
             running = false;
-            if (readerThread != null && readerThread.isAlive()) {
+            while (true) {
                 readerThread.interrupt();
+                try {
+                    readerThread.join(5000, 0);
+                } catch (InterruptedException e) {
+                    log.warn("Waiting for metadata tailer thread to stop is interrupted", e);
+                }
+
+                if (readerThread.isAlive()) {
+                    log.warn("metadata tailer thread is still alive.  Will attempt to interrupt again.");
+                } else {
+                    break;
+                }
             }
-            if (reader != null) {
-                reader.close();
-            }
+
+            reader.close();
         } catch (IOException e) {
             log.error("Failed to stop function metadata tailer", e);
         }
         log.info("Stopped function metadata tailer");
     }
 
-    public void processRequest(Message<byte[]> msg) throws IOException {
-        ServiceRequest serviceRequest = ServiceRequest.parseFrom(msg.getData());
-        if (log.isDebugEnabled()) {
-            log.debug("Received Service Request: {}", serviceRequest);
-        }
-        this.functionMetaDataManager.processRequest(msg.getMessageId(), serviceRequest);
+    public static Reader createReader(WorkerConfig workerConfig, ReaderBuilder readerBuilder,
+                                      MessageId startMessageId) throws PulsarClientException {
+        return readerBuilder
+                .topic(workerConfig.getFunctionMetadataTopic())
+                .startMessageId(startMessageId)
+                .readerName(workerConfig.getWorkerId() + "-function-metadata-tailer")
+                .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-metadata-tailer")
+                .create();
     }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
index 43faccc..81a1aa1 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
@@ -106,8 +106,12 @@ public class LeaderService implements AutoCloseable, ConsumerEventListener {
                 // is the leader and may need to start computing and writing assignments
                 schedulerManager.initialize();
 
+                functionMetaDataManager.acquireLeadership();
                 // indicate leader initialization is complete
                 leaderInitComplete.set(true);
+                // Once we become leader we need to schedule
+                // This is done after leaderInitComplete because schedule waits on that becoming true
+                schedulerManager.schedule();
             } catch (Throwable th) {
                 log.error("Encountered error when initializing to become leader", th);
                 errorNotifier.triggerError(th);
@@ -131,6 +135,7 @@ public class LeaderService implements AutoCloseable, ConsumerEventListener {
                 } else {
                     functionAssignmentTailer.startFromMessage(schedulerManager.getLastMessageProduced());
                 }
+                functionMetaDataManager.giveupLeadership();
 
                 leaderInitComplete.set(false);
             } catch (Throwable th) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index ede4fc8..35dff9e 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -217,6 +217,7 @@ public class WorkerService {
 
             // initialize function runtime manager
             log.info("/** Initializing Runtime Manager **/");
+
             MessageId lastAssignmentMessageId = functionRuntimeManager.initialize();
 
             // Setting references to managers in scheduler
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/request/RequestResult.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/request/RequestResult.java
deleted file mode 100644
index 7cccab1..0000000
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/request/RequestResult.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.worker.request;
-
-import com.google.gson.Gson;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.experimental.Accessors;
-
-@Accessors(chain = true)
-@Getter
-@Setter
-public class RequestResult {
-    private boolean success;
-    private String message;
-
-    public boolean isSuccess() {
-        return success;
-    }
-
-    public String toJson() {
-        return new Gson().toJson(this);
-    }
-}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/request/ServiceRequestInfo.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/request/ServiceRequestInfo.java
deleted file mode 100644
index 9b652d0..0000000
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/request/ServiceRequestInfo.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.worker.request;
-
-import lombok.Data;
-import lombok.Setter;
-import lombok.experimental.Accessors;
-import org.apache.pulsar.client.api.MessageId;
-
-import java.util.concurrent.CompletableFuture;
-
-import org.apache.pulsar.functions.proto.Request.ServiceRequest;
-
-@Data
-@Accessors(chain = true)
-public class ServiceRequestInfo {
-    private final ServiceRequest serviceRequest;
-    @Setter
-    private CompletableFuture<MessageId> completableFutureRequestMessageId;
-    @Setter
-    private CompletableFuture<RequestResult> requestResultCompletableFuture;
-
-    private ServiceRequestInfo(ServiceRequest serviceRequest) {
-        this.serviceRequest = serviceRequest;
-    }
-
-    public static ServiceRequestInfo of (ServiceRequest serviceRequest) {
-        return new ServiceRequestInfo(serviceRequest);
-    }
-}
\ No newline at end of file
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/request/ServiceRequestManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/request/ServiceRequestManager.java
deleted file mode 100644
index 266276b..0000000
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/request/ServiceRequestManager.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.worker.request;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.functions.proto.Request.ServiceRequest;
-
-import java.util.concurrent.CompletableFuture;
-
-@Slf4j
-public class ServiceRequestManager implements AutoCloseable {
-
-    private final Producer producer;
-
-    public ServiceRequestManager(Producer producer) throws PulsarClientException {
-        this.producer = producer;
-    }
-
-    public CompletableFuture<MessageId> submitRequest(ServiceRequest serviceRequest) {
-        if (log.isDebugEnabled()) {
-            log.debug("Submitting Service Request: {}", serviceRequest);
-        }
-        return producer.sendAsync(serviceRequest.toByteArray());
-    }
-
-    @Override
-    public void close() {
-        try {
-            this.producer.close();
-        } catch (PulsarClientException e) {
-            log.warn("Failed to close producer for service request manager", e);
-        }
-    }
-}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/request/ServiceRequestUtils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/request/ServiceRequestUtils.java
deleted file mode 100644
index b31de0a..0000000
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/request/ServiceRequestUtils.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.worker.request;
-
-import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
-import org.apache.pulsar.functions.proto.Request.ServiceRequest;
-
-import java.util.UUID;
-
-public class ServiceRequestUtils {
-    public static ServiceRequest getServiceRequest(String requestId, String workerId,
-                                                  ServiceRequest.ServiceRequestType serviceRequestType,
-                                                  FunctionMetaData functionMetaData) {
-        ServiceRequest.Builder serviceRequestBuilder
-                = ServiceRequest.newBuilder()
-                .setRequestId(requestId)
-                .setWorkerId(workerId)
-                .setServiceRequestType(serviceRequestType);
-        if (functionMetaData != null) {
-            serviceRequestBuilder.setFunctionMetaData(functionMetaData);
-        }
-        return serviceRequestBuilder.build();
-    }
-
-    public static ServiceRequest getUpdateRequest(String workerId, FunctionMetaData functionMetaData) {
-        return getServiceRequest(
-                UUID.randomUUID().toString(),
-                workerId,
-                ServiceRequest.ServiceRequestType.UPDATE, functionMetaData);
-    }
-
-    public static ServiceRequest getDeregisterRequest(String workerId, FunctionMetaData functionMetaData) {
-        return getServiceRequest(
-                UUID.randomUUID().toString(),
-                workerId,
-                ServiceRequest.ServiceRequestType.DELETE, functionMetaData);
-    }
-
-    public static ServiceRequest getIntializationRequest(String requestId, String workerId) {
-        return getServiceRequest(
-                requestId,
-                workerId,
-                ServiceRequest.ServiceRequestType.INITIALIZE, null);
-    }
-}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index f49971b..2e31f73 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -45,6 +45,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.internal.FunctionsImpl;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
@@ -80,7 +81,6 @@ import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.WorkerUtils;
-import org.apache.pulsar.functions.worker.request.RequestResult;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 
 import java.io.File;
@@ -94,7 +94,6 @@ import java.util.Base64;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
@@ -392,24 +391,13 @@ public abstract class ComponentImpl {
             throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
         }
 
-        CompletableFuture<RequestResult> completableFuture = functionMetaDataManager.deregisterFunction(tenant,
-                namespace, componentName);
-
-        RequestResult requestResult = null;
-        try {
-            requestResult = completableFuture.get();
-            if (!requestResult.isSuccess()) {
-                throw new RestException(Status.BAD_REQUEST, requestResult.getMessage());
-            }
-        } catch (ExecutionException e) {
-            log.error("Execution Exception while deregistering {} @ /{}/{}/{}",
-                    ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
-            throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getCause().getMessage());
-        } catch (InterruptedException e) {
-            log.error("Interrupted Exception while deregistering {} @ /{}/{}/{}",
-                    ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
-            throw new RestException(Status.REQUEST_TIMEOUT, e.getMessage());
-        }
+        FunctionMetaData newVersionedMetaData = FunctionMetaDataUtils.incrMetadataVersion(functionMetaData, functionMetaData);
+        internalProcessFunctionRequest(newVersionedMetaData.getFunctionDetails().getTenant(),
+                newVersionedMetaData.getFunctionDetails().getNamespace(),
+                newVersionedMetaData.getFunctionDetails().getName(),
+                newVersionedMetaData, true,
+                String.format("Error deleting {} @ /{}/{}/{}",
+                        ComponentTypeUtils.toString(componentType), tenant, namespace, componentName));
 
         // clean up component files stored in BK
         if (!functionMetaData.getPackageLocation().getPackagePath().startsWith(Utils.HTTP) && !functionMetaData.getPackageLocation().getPackagePath().startsWith(Utils.FILE)) {
@@ -534,15 +522,10 @@ public abstract class ComponentImpl {
             throw new RestException(Status.BAD_REQUEST, String.format("Operation not permitted"));
         }
 
-        try {
-            functionMetaDataManager.changeFunctionInstanceStatus(tenant, namespace, componentName,
-                    Integer.parseInt(instanceId), start);
-        } catch (WebApplicationException we) {
-            throw we;
-        } catch (Exception e) {
-            log.error("Failed to start/stop {}: {}/{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, instanceId, e);
-            throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
-        }
+        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.changeFunctionInstanceStatus(functionMetaData, Integer.parseInt(instanceId), start);
+        internalProcessFunctionRequest(tenant, namespace, componentName, newFunctionMetaData, false,
+                String.format("Failed to start/stop {}: {}/{}/{}/{}", ComponentTypeUtils.toString(componentType),
+                        tenant, namespace, componentName, instanceId));
     }
 
     public void restartFunctionInstance(final String tenant,
@@ -662,14 +645,9 @@ public abstract class ComponentImpl {
             throw new RestException(Status.BAD_REQUEST, String.format("Operation not permitted"));
         }
 
-        try {
-            functionMetaDataManager.changeFunctionInstanceStatus(tenant, namespace, componentName, -1, start);
-        } catch (WebApplicationException we) {
-            throw we;
-        } catch (Exception e) {
-            log.error("Failed to start/stop {}: {}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
-            throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
-        }
+        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.changeFunctionInstanceStatus(functionMetaData, -1, start);
+        internalProcessFunctionRequest(tenant, namespace, componentName, newFunctionMetaData, false,
+                String.format("Failed to start/stop {}: {}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName));
     }
 
     public void restartFunctionInstances(final String tenant,
@@ -880,25 +858,12 @@ public abstract class ComponentImpl {
         return retVals;
     }
 
-    void updateRequest(final FunctionMetaData functionMetaData) {
-
-        // Submit to FMT
-        FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
-
-        CompletableFuture<RequestResult> completableFuture = functionMetaDataManager.updateFunction(functionMetaData);
-
-        RequestResult requestResult = null;
-        try {
-            requestResult = completableFuture.get();
-            if (!requestResult.isSuccess()) {
-                throw new RestException(Status.BAD_REQUEST, requestResult.getMessage());
-            }
-        } catch (ExecutionException e) {
-            throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
-        } catch (InterruptedException e) {
-            throw new RestException(Status.REQUEST_TIMEOUT, e.getMessage());
-        }
-
+    void updateRequest(FunctionMetaData existingFunctionMetaData, final FunctionMetaData functionMetaData) {
+        FunctionMetaData updatedVersionMetaData = FunctionMetaDataUtils.incrMetadataVersion(existingFunctionMetaData, functionMetaData);
+        internalProcessFunctionRequest(updatedVersionMetaData.getFunctionDetails().getTenant(),
+                updatedVersionMetaData.getFunctionDetails().getNamespace(),
+                updatedVersionMetaData.getFunctionDetails().getName(),
+                updatedVersionMetaData, false, "Update Failed");
     }
 
     public List<ConnectorDefinition> getListOfConnectors() {
@@ -1564,4 +1529,24 @@ public abstract class ComponentImpl {
             throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
         }
     }
+
+    private void internalProcessFunctionRequest(final String tenant, final String namespace, final String functionName,
+                                                final FunctionMetaData functionMetadata, boolean delete, String errorMsg) {
+        try {
+            if (worker().getLeaderService().isLeader()) {
+                worker().getFunctionMetaDataManager().updateFunctionOnLeader(functionMetadata, delete);
+            } else {
+                FunctionsImpl functions = (FunctionsImpl) worker().getFunctionAdmin().functions();
+                functions.updateOnWorkerLeader(tenant,
+                        namespace, functionName, functionMetadata.toByteArray(), delete);
+            }
+        } catch (PulsarAdminException e) {
+            log.error(errorMsg, e);
+            throw new RestException(e.getStatusCode(), e.getMessage());
+        } catch (IllegalStateException e) {
+            throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
+        } catch (IllegalArgumentException e) {
+            throw new RestException(Status.BAD_REQUEST, e.getMessage());
+        }
+    }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 0e4c4a0..b5c641a 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -26,6 +26,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.UpdateOptions;
 import org.apache.pulsar.common.functions.Utils;
+import org.apache.pulsar.common.functions.WorkerInfo;
 import org.apache.pulsar.common.policies.data.ExceptionInformation;
 import org.apache.pulsar.common.policies.data.FunctionStatus;
 import org.apache.pulsar.common.util.RestException;
@@ -45,6 +46,7 @@ import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
@@ -226,7 +228,7 @@ public class FunctionsImpl extends ComponentImpl {
             }
 
             functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
-            updateRequest(functionMetaDataBuilder.build());
+            updateRequest(null, functionMetaDataBuilder.build());
         } finally {
             if (componentPackageFile != null && componentPackageFile.exists()) {
                 if (functionPkgUrl == null || !functionPkgUrl.startsWith(Utils.FILE)) {
@@ -417,7 +419,7 @@ public class FunctionsImpl extends ComponentImpl {
 
             functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
 
-            updateRequest(functionMetaDataBuilder.build());
+            updateRequest(existingComponent, functionMetaDataBuilder.build());
         } finally {
             if (componentPackageFile != null && componentPackageFile.exists()) {
                 if ((functionPkgUrl != null && !functionPkgUrl.startsWith(Utils.FILE)) || uploadedInputStream != null) {
@@ -647,6 +649,60 @@ public class FunctionsImpl extends ComponentImpl {
         return functionStatus;
     }
 
+    public void updateFunctionOnWorkerLeader(final String tenant,
+                               final String namespace,
+                               final String functionName,
+                               final InputStream uploadedInputStream,
+                               final boolean delete,
+                               URI uri,
+                               final String clientRole) {
+
+        if (!isWorkerServiceAvailable()) {
+            throwUnavailableException();
+        }
+
+        if (worker().getWorkerConfig().isAuthorizationEnabled()) {
+            if (!isSuperUser(clientRole)) {
+                log.error("{}/{}/{} Client [{}] is not superuser to update on worker leader {}", tenant, namespace,
+                        functionName, clientRole, ComponentTypeUtils.toString(componentType));
+                throw new RestException(Response.Status.UNAUTHORIZED, "client is not authorize to perform operation");
+            }
+        }
+
+        if (tenant == null) {
+            throw new RestException(Response.Status.BAD_REQUEST, "Tenant is not provided");
+        }
+        if (namespace == null) {
+            throw new RestException(Response.Status.BAD_REQUEST, "Namespace is not provided");
+        }
+        if (functionName == null) {
+            throw new RestException(Response.Status.BAD_REQUEST, "Function name is not provided");
+        }
+        Function.FunctionMetaData functionMetaData;
+        try {
+            functionMetaData = Function.FunctionMetaData.parseFrom(uploadedInputStream);
+        } catch (IOException e) {
+            throw new RestException(Response.Status.BAD_REQUEST, "Corrupt Function MetaData");
+        }
+
+        // Redirect if we are not the leader
+        if (!worker().getLeaderService().isLeader()) {
+            WorkerInfo workerInfo = worker().getMembershipManager().getLeader();
+            URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
+            throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
+        }
+
+        // Its possible that we are not the leader anymore. That will be taken care of by FunctionMetaDataManager
+        FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
+        try {
+            functionMetaDataManager.updateFunctionOnLeader(functionMetaData, delete);
+        } catch (IllegalStateException e) {
+            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
+        } catch (IllegalArgumentException e) {
+            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
+        }
+    }
+
     private Function.FunctionDetails validateUpdateRequestParams(final String tenant,
                                                                  final String namespace,
                                                                  final String componentName,
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
index 095d8e9..a9bc573 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
@@ -223,7 +223,7 @@ public class SinksImpl extends ComponentImpl {
             }
 
             functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
-            updateRequest(functionMetaDataBuilder.build());
+            updateRequest(null, functionMetaDataBuilder.build());
         } finally {
             if (componentPackageFile != null && componentPackageFile.exists()) {
                 if (sinkPkgUrl == null || !sinkPkgUrl.startsWith(Utils.FILE)) {
@@ -417,7 +417,7 @@ public class SinksImpl extends ComponentImpl {
 
             functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
 
-            updateRequest(functionMetaDataBuilder.build());
+            updateRequest(existingComponent, functionMetaDataBuilder.build());
         } finally {
             if (componentPackageFile != null && componentPackageFile.exists()) {
                 if ((sinkPkgUrl != null && !sinkPkgUrl.startsWith(Utils.FILE)) || uploadedInputStream != null) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
index bbe5920..187a7ae 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
@@ -223,7 +223,7 @@ public class SourcesImpl extends ComponentImpl {
             }
 
             functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
-            updateRequest(functionMetaDataBuilder.build());
+            updateRequest(null, functionMetaDataBuilder.build());
         } finally {
             if (componentPackageFile != null && componentPackageFile.exists()) {
                 if (sourcePkgUrl == null || !sourcePkgUrl.startsWith(Utils.FILE)) {
@@ -414,7 +414,7 @@ public class SourcesImpl extends ComponentImpl {
 
             functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
 
-            updateRequest(functionMetaDataBuilder.build());
+            updateRequest(existingComponent, functionMetaDataBuilder.build());
         } finally {
             if (componentPackageFile != null && componentPackageFile.exists()) {
                 if ((sourcePkgUrl != null && !sourcePkgUrl.startsWith(Utils.FILE)) || uploadedInputStream != null) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
index 8e32cda..0fe8c48 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
@@ -44,6 +44,7 @@ import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
 import javax.ws.rs.core.StreamingOutput;
 import java.io.IOException;
 import java.io.InputStream;
@@ -366,4 +367,25 @@ public class FunctionsApiV3Resource extends FunctionApiResource {
                                  final @FormDataParam("state") FunctionState stateJson) throws IOException {
         functions.putFunctionState(tenant, namespace, functionName, key, stateJson, clientAppId(), clientAuthData());
     }
+
+    @PUT
+    @ApiOperation(value = "Updates a Pulsar Function on the worker leader", hidden = true)
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have super-user permissions"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 307, message = "Redirecting to the worker leader"),
+            @ApiResponse(code = 200, message = "Pulsar Function successfully updated")
+    })
+    @Path("/leader/{tenant}/{namespace}/{functionName}")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public void updateFunctionOnWorkerLeader(final @PathParam("tenant") String tenant,
+                                                 final @PathParam("namespace") String namespace,
+                                                 final @PathParam("functionName") String functionName,
+                                                 final @FormDataParam("functionMetaData") InputStream uploadedInputStream,
+                                                 final @FormDataParam("delete") boolean delete) {
+
+        functions.updateFunctionOnWorkerLeader(tenant, namespace, functionName, uploadedInputStream,
+                delete, uri.getRequestUri(), clientAppId());
+    }
 }
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
index 20f30a9..b5fb0df 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
@@ -20,26 +20,15 @@ package org.apache.pulsar.functions.worker;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.argThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerBuilder;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.*;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Request;
-import org.apache.pulsar.functions.utils.FunctionMetaDataUtils;
-import org.mockito.ArgumentMatcher;
-import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -97,7 +86,7 @@ public class FunctionMetaDataManagerTest {
     }
 
     @Test
-    public void updateFunction() throws PulsarClientException {
+    public void testUpdateIfLeaderFunction() throws PulsarClientException {
 
         WorkerConfig workerConfig = new WorkerConfig();
         workerConfig.setWorkerId("worker-1");
@@ -106,309 +95,167 @@ public class FunctionMetaDataManagerTest {
                         mock(SchedulerManager.class),
                         mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
         Function.FunctionMetaData m1 = Function.FunctionMetaData.newBuilder()
+                .setVersion(1)
                 .setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")).build();
 
-        Mockito.doReturn(null).when(functionMetaDataManager).submit(any(Request.ServiceRequest.class));
-        functionMetaDataManager.updateFunction(m1);
-        verify(functionMetaDataManager, times(1)).submit(any(Request.ServiceRequest.class));
-        verify(functionMetaDataManager).submit(argThat(new ArgumentMatcher<Request.ServiceRequest>() {
-            @Override
-            public boolean matches(Request.ServiceRequest serviceRequest) {
-                if (!serviceRequest.getWorkerId().equals(workerConfig.getWorkerId())) {
-                    return false;
-                }
-                if (!serviceRequest.getServiceRequestType().equals(Request.ServiceRequest.ServiceRequestType.UPDATE)) {
-                    return false;
-                }
-                if (!serviceRequest.getFunctionMetaData().equals(m1)) {
-                    return false;
-                }
-                if (serviceRequest.getFunctionMetaData().getVersion() != 0) {
-                    return false;
-                }
-                return true;
-            }
-        }));
-
-        // already have record
-        long version = 5;
-        functionMetaDataManager = spy(
-                new FunctionMetaDataManager(workerConfig,
-                        mock(SchedulerManager.class),
-                        mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
-        Map<String, Function.FunctionMetaData> functionMetaDataMap = new HashMap<>();
-        Function.FunctionMetaData m2 = Function.FunctionMetaData.newBuilder()
-                .setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
-                        .setNamespace("namespace-1").setTenant("tenant-1")).setVersion(version).build();
-        functionMetaDataMap.put("func-1", m2);
-        functionMetaDataManager.functionMetaDataMap.put("tenant-1", new HashMap<>());
-        functionMetaDataManager.functionMetaDataMap.get("tenant-1").put("namespace-1", functionMetaDataMap);
-        Mockito.doReturn(null).when(functionMetaDataManager).submit(any(Request.ServiceRequest.class));
-
-        functionMetaDataManager.updateFunction(m2);
-        verify(functionMetaDataManager, times(1)).submit(any(Request.ServiceRequest.class));
-        verify(functionMetaDataManager).submit(argThat(new ArgumentMatcher<Request.ServiceRequest>() {
-            @Override
-            public boolean matches(Request.ServiceRequest serviceRequest) {
-                if (!serviceRequest.getWorkerId().equals(workerConfig.getWorkerId()))
-                    return false;
-                if (!serviceRequest.getServiceRequestType().equals(
-                        Request.ServiceRequest.ServiceRequestType.UPDATE)) {
-                    return false;
-                }
-                if (!serviceRequest.getFunctionMetaData().getFunctionDetails().equals(m2.getFunctionDetails())) {
-                    return false;
-                }
-                if (serviceRequest.getFunctionMetaData().getVersion() != (version + 1)) {
-                    return false;
-                }
-                return true;
-            }
-        }));
-
-    }
-
-    @Test
-    public void testStopFunction() throws PulsarClientException {
-
-        long version = 5;
-        WorkerConfig workerConfig = new WorkerConfig();
-        workerConfig.setWorkerId("worker-1");
-        FunctionMetaDataManager functionMetaDataManager = spy(
-                new FunctionMetaDataManager(workerConfig,
-                        mock(SchedulerManager.class),
-                        mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
-
-        Map<String, Function.FunctionMetaData> functionMetaDataMap1 = new HashMap<>();
-        Function.FunctionMetaData f1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
-                Function.FunctionDetails.newBuilder().setName("func-1").setParallelism(2)).setVersion(version).build();
-        functionMetaDataMap1.put("func-1", f1);
-
-        Assert.assertTrue(FunctionMetaDataUtils.canChangeState(f1, 0, Function.FunctionState.STOPPED));
-        Assert.assertFalse(FunctionMetaDataUtils.canChangeState(f1, 0, Function.FunctionState.RUNNING));
-        Assert.assertFalse(FunctionMetaDataUtils.canChangeState(f1, 2, Function.FunctionState.STOPPED));
-        Assert.assertFalse(FunctionMetaDataUtils.canChangeState(f1, 2, Function.FunctionState.RUNNING));
-
-        functionMetaDataManager.functionMetaDataMap.put("tenant-1", new HashMap<>());
-        functionMetaDataManager.functionMetaDataMap.get("tenant-1").put("namespace-1", functionMetaDataMap1);
-
-        Mockito.doReturn(null).when(functionMetaDataManager).submit(any(Request.ServiceRequest.class));
-
-        functionMetaDataManager.changeFunctionInstanceStatus("tenant-1", "namespace-1", "func-1", 0, false);
-
-        verify(functionMetaDataManager, times(1)).submit(any(Request.ServiceRequest.class));
-        verify(functionMetaDataManager).submit(argThat(serviceRequest -> {
-            if (!serviceRequest.getWorkerId().equals(workerConfig.getWorkerId()))
-                return false;
-            if (!serviceRequest.getServiceRequestType().equals(
-                    Request.ServiceRequest.ServiceRequestType.UPDATE)) {
-                return false;
-            }
-            if (!serviceRequest.getFunctionMetaData().getFunctionDetails().equals(f1.getFunctionDetails())) {
-                return false;
-            }
-            if (serviceRequest.getFunctionMetaData().getVersion() != (version + 1)) {
-                return false;
-            }
-            Map<Integer, Function.FunctionState> stateMap = serviceRequest.getFunctionMetaData().getInstanceStatesMap();
-            if (stateMap == null || stateMap.isEmpty()) {
-                return false;
-            }
-            if (stateMap.get(1) != Function.FunctionState.RUNNING) {
-                return false;
-            }
-            if (stateMap.get(0) != Function.FunctionState.STOPPED) {
-                return false;
-            }
-            return true;
-        }));
+        // update when you are not the leader
+        try {
+            functionMetaDataManager.updateFunctionOnLeader(m1, false);
+            Assert.assertTrue(false);
+        } catch (IllegalStateException e) {
+            Assert.assertEquals(e.getMessage(), "Not the leader");
+        }
+
+        // become leader
+        functionMetaDataManager.acquireLeadership();
+        // Now w should be able to really update
+        functionMetaDataManager.updateFunctionOnLeader(m1, false);
+
+        // outdated request
+        try {
+            functionMetaDataManager.updateFunctionOnLeader(m1, false);
+            Assert.assertTrue(false);
+        } catch (IllegalArgumentException e) {
+            Assert.assertEquals(e.getMessage(), "Update request ignored because it is out of date. Please try again.");
+        }
+        // udpate with new version
+        m1 = m1.toBuilder().setVersion(2).build();
+        functionMetaDataManager.updateFunctionOnLeader(m1, false);
     }
 
     @Test
     public void deregisterFunction() throws PulsarClientException {
-        long version = 5;
+        SchedulerManager mockedScheduler = mock(SchedulerManager.class);
         WorkerConfig workerConfig = new WorkerConfig();
         workerConfig.setWorkerId("worker-1");
         FunctionMetaDataManager functionMetaDataManager = spy(
                 new FunctionMetaDataManager(workerConfig,
-                        mock(SchedulerManager.class),
+                        mockedScheduler,
                         mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
         Function.FunctionMetaData m1 = Function.FunctionMetaData.newBuilder()
+                .setVersion(1)
                 .setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
-                        .setNamespace("namespace-1").setTenant("tenant-1")).setVersion(version).build();
-        Map<String, Function.FunctionMetaData> functionMetaDataMap = new HashMap<>();
-        functionMetaDataMap.put("func-1", m1);
-        functionMetaDataManager.functionMetaDataMap.put("tenant-1", new HashMap<>());
-        functionMetaDataManager.functionMetaDataMap.get("tenant-1").put("namespace-1", functionMetaDataMap);
-        Mockito.doReturn(null).when(functionMetaDataManager).submit(any(Request.ServiceRequest.class));
-
-        functionMetaDataManager.deregisterFunction("tenant-1", "namespace-1", "func-1");
-
-        verify(functionMetaDataManager, times(1)).submit(any(Request.ServiceRequest.class));
-        verify(functionMetaDataManager).submit(argThat(new ArgumentMatcher<Request.ServiceRequest>() {
-            @Override
-            public boolean matches(Request.ServiceRequest serviceRequest) {
-                if (!serviceRequest.getWorkerId().equals(workerConfig.getWorkerId()))
-                    return false;
-                if (!serviceRequest.getServiceRequestType().equals(
-                        Request.ServiceRequest.ServiceRequestType.DELETE)) {
-                    return false;
-                }
-                if (!serviceRequest.getFunctionMetaData().getFunctionDetails().equals(m1.getFunctionDetails())) {
-                    return false;
-                }
-                if (serviceRequest.getFunctionMetaData().getVersion() != (version + 1)) {
-                    return false;
-                }
-                return true;
-            }
-        }));
+                        .setNamespace("namespace-1").setTenant("tenant-1")).build();
+
+        // Try deleting when you are not the leader
+        try {
+            functionMetaDataManager.updateFunctionOnLeader(m1, true);
+            Assert.assertTrue(false);
+        } catch (IllegalStateException e) {
+            Assert.assertEquals(e.getMessage(), "Not the leader");
+        }
+
+        // become leader
+        functionMetaDataManager.acquireLeadership();
+        verify(mockedScheduler, times(0)).schedule();
+        // Now try deleting
+        functionMetaDataManager.updateFunctionOnLeader(m1, true);
+        // make sure schedule was not called because function didn't exist.
+        verify(mockedScheduler, times(0)).schedule();
+
+        // insert function
+        functionMetaDataManager.updateFunctionOnLeader(m1, false);
+        verify(mockedScheduler, times(1)).schedule();
+
+        // outdated request
+        try {
+            functionMetaDataManager.updateFunctionOnLeader(m1, true);
+            Assert.assertTrue(false);
+        } catch (IllegalArgumentException e) {
+            Assert.assertEquals(e.getMessage(), "Delete request ignored because it is out of date. Please try again.");
+        }
+        verify(mockedScheduler, times(1)).schedule();
+
+        // udpate with new version
+        m1 = m1.toBuilder().setVersion(2).build();
+        functionMetaDataManager.updateFunctionOnLeader(m1, true);
+        verify(mockedScheduler, times(2)).schedule();
     }
 
     @Test
-    public void testProcessRequest() throws PulsarClientException {
+    public void testProcessRequest() throws PulsarClientException, IOException {
         WorkerConfig workerConfig = new WorkerConfig();
         FunctionMetaDataManager functionMetaDataManager = spy(
                 new FunctionMetaDataManager(workerConfig,
                         mock(SchedulerManager.class),
                         mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
 
-        Mockito.doNothing().when(functionMetaDataManager).processUpdate(any(Request.ServiceRequest.class));
-        Mockito.doNothing().when(functionMetaDataManager).proccessDeregister(any(Request.ServiceRequest.class));
+        doReturn(true).when(functionMetaDataManager).processUpdate(any(Function.FunctionMetaData.class));
+        doReturn(true).when(functionMetaDataManager).proccessDeregister(any(Function.FunctionMetaData.class));
 
         Request.ServiceRequest serviceRequest
                 = Request.ServiceRequest.newBuilder().setServiceRequestType(
                         Request.ServiceRequest.ServiceRequestType.UPDATE).build();
-        functionMetaDataManager.processRequest(MessageId.earliest, serviceRequest);
+        Message msg = mock(Message.class);
+        doReturn(serviceRequest.toByteArray()).when(msg).getData();
+        functionMetaDataManager.processMetaDataTopicMessage(msg);
 
         verify(functionMetaDataManager, times(1)).processUpdate
-                (any(Request.ServiceRequest.class));
-        verify(functionMetaDataManager).processUpdate(serviceRequest);
+                (any(Function.FunctionMetaData.class));
+        verify(functionMetaDataManager).processUpdate(serviceRequest.getFunctionMetaData());
 
         serviceRequest
                 = Request.ServiceRequest.newBuilder().setServiceRequestType(
                 Request.ServiceRequest.ServiceRequestType.INITIALIZE).build();
-        functionMetaDataManager.processRequest(MessageId.earliest, serviceRequest);
+        doReturn(serviceRequest.toByteArray()).when(msg).getData();
+        functionMetaDataManager.processMetaDataTopicMessage(msg);
 
         serviceRequest
                 = Request.ServiceRequest.newBuilder().setServiceRequestType(
                 Request.ServiceRequest.ServiceRequestType.DELETE).build();
-        functionMetaDataManager.processRequest(MessageId.earliest, serviceRequest);
+        doReturn(serviceRequest.toByteArray()).when(msg).getData();
+        functionMetaDataManager.processMetaDataTopicMessage(msg);
 
         verify(functionMetaDataManager, times(1)).proccessDeregister(
-                any(Request.ServiceRequest.class));
-        verify(functionMetaDataManager).proccessDeregister(serviceRequest);
+                any(Function.FunctionMetaData.class));
+        verify(functionMetaDataManager).proccessDeregister(serviceRequest.getFunctionMetaData());
     }
 
     @Test
     public void processUpdateTest() throws PulsarClientException {
-        long version = 5;
+        SchedulerManager schedulerManager = mock(SchedulerManager.class);
         WorkerConfig workerConfig = new WorkerConfig();
         workerConfig.setWorkerId("worker-1");
-        SchedulerManager schedulerManager = mock(SchedulerManager.class);
         FunctionMetaDataManager functionMetaDataManager = spy(
                 new FunctionMetaDataManager(workerConfig,
                         schedulerManager,
                         mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
-
-        // worker has no record of function
         Function.FunctionMetaData m1 = Function.FunctionMetaData.newBuilder()
+                .setVersion(1)
                 .setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
-                        .setNamespace("namespace-1").setTenant("tenant-1")).setVersion(version).build();
-
-        Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
-                .setServiceRequestType(Request.ServiceRequest.ServiceRequestType.UPDATE)
-                .setFunctionMetaData(m1)
-                .setWorkerId("worker-1")
-                .build();
-        functionMetaDataManager.processUpdate(serviceRequest);
+                .setNamespace("namespace-1").setTenant("tenant-1")).build();
+
+        Assert.assertTrue(functionMetaDataManager.processUpdate(m1));
         verify(functionMetaDataManager, times(1))
                 .setFunctionMetaData(any(Function.FunctionMetaData.class));
-        verify(schedulerManager, times(1)).schedule();
+        verify(schedulerManager, times(0)).schedule();
         Assert.assertEquals(m1, functionMetaDataManager.functionMetaDataMap.get(
                 "tenant-1").get("namespace-1").get("func-1"));
         Assert.assertEquals(1, functionMetaDataManager.functionMetaDataMap.get(
                 "tenant-1").get("namespace-1").size());
 
-        // worker has record of function
-
-        // request is oudated
-        schedulerManager = mock(SchedulerManager.class);
-        functionMetaDataManager = spy(
-                new FunctionMetaDataManager(workerConfig,
-                        schedulerManager,
-                        mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
-
-        Function.FunctionMetaData m3 = Function.FunctionMetaData.newBuilder()
-                .setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
-                        .setNamespace("namespace-1").setTenant("tenant-1")).setVersion(version).build();
-        functionMetaDataManager.setFunctionMetaData(m3);
-        Function.FunctionMetaData outdated = Function.FunctionMetaData.newBuilder()
-                .setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
-                        .setNamespace("namespace-1").setTenant("tenant-1")).setVersion(version - 1).build();
-
-        serviceRequest = Request.ServiceRequest.newBuilder()
-                .setServiceRequestType(Request.ServiceRequest.ServiceRequestType.UPDATE)
-                .setFunctionMetaData(outdated)
-                .setWorkerId("worker-1")
-                .build();
-        functionMetaDataManager.processUpdate(serviceRequest);
-
-        Assert.assertEquals(m3, functionMetaDataManager.getFunctionMetaData(
-                "tenant-1", "namespace-1", "func-1"));
+        // outdated request
+        try {
+            functionMetaDataManager.processUpdate(m1);
+            Assert.assertTrue(false);
+        } catch (IllegalArgumentException e) {
+            Assert.assertEquals(e.getMessage(), "Update request ignored because it is out of date. Please try again.");
+        }
         verify(functionMetaDataManager, times(1))
                 .setFunctionMetaData(any(Function.FunctionMetaData.class));
         verify(schedulerManager, times(0)).schedule();
-
-        Function.FunctionMetaData outdated2 = Function.FunctionMetaData.newBuilder()
-                .setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
-                        .setNamespace("namespace-1").setTenant("tenant-1")).setVersion(version).build();
-
-        serviceRequest = Request.ServiceRequest.newBuilder()
-                .setServiceRequestType(Request.ServiceRequest.ServiceRequestType.UPDATE)
-                .setFunctionMetaData(outdated2)
-                .setWorkerId("worker-2")
-                .build();
-        functionMetaDataManager.processUpdate(serviceRequest);
-        Assert.assertEquals(m3, functionMetaDataManager.getFunctionMetaData(
-                "tenant-1", "namespace-1", "func-1"));
-        verify(functionMetaDataManager, times(1))
-                .setFunctionMetaData(any(Function.FunctionMetaData.class));
-        verify(schedulerManager, times(0)).schedule();
-
         Assert.assertEquals(m1, functionMetaDataManager.functionMetaDataMap.get(
                 "tenant-1").get("namespace-1").get("func-1"));
         Assert.assertEquals(1, functionMetaDataManager.functionMetaDataMap.get(
                 "tenant-1").get("namespace-1").size());
 
-        // schedule
-        schedulerManager = mock(SchedulerManager.class);
-        functionMetaDataManager = spy(
-                new FunctionMetaDataManager(workerConfig,
-                        schedulerManager,
-                        mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
-
-        Function.FunctionMetaData m4 = Function.FunctionMetaData.newBuilder()
-                .setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
-                        .setNamespace("namespace-1").setTenant("tenant-1")).setVersion(version).build();
-        functionMetaDataManager.setFunctionMetaData(m4);
-        Function.FunctionMetaData m5 = Function.FunctionMetaData.newBuilder()
-                .setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
-                        .setNamespace("namespace-1").setTenant("tenant-1")).setVersion(version + 1).build();
-
-        serviceRequest = Request.ServiceRequest.newBuilder()
-                .setServiceRequestType(Request.ServiceRequest.ServiceRequestType.UPDATE)
-                .setFunctionMetaData(m5)
-                .setWorkerId("worker-2")
-                .build();
-        functionMetaDataManager.processUpdate(serviceRequest);
-
+        // udpate with new version
+        m1 = m1.toBuilder().setVersion(2).build();
+        Assert.assertTrue(functionMetaDataManager.processUpdate(m1));
         verify(functionMetaDataManager, times(2))
                 .setFunctionMetaData(any(Function.FunctionMetaData.class));
-        verify(schedulerManager, times(1)).schedule();
-
-        Assert.assertEquals(m1.toBuilder().setVersion(version + 1).build(),
-                functionMetaDataManager.functionMetaDataMap.get(
+        verify(schedulerManager, times(0)).schedule();
+        Assert.assertEquals(m1, functionMetaDataManager.functionMetaDataMap.get(
                 "tenant-1").get("namespace-1").get("func-1"));
         Assert.assertEquals(1, functionMetaDataManager.functionMetaDataMap.get(
                 "tenant-1").get("namespace-1").size());
@@ -416,90 +263,56 @@ public class FunctionMetaDataManagerTest {
 
     @Test
     public void processDeregister() throws PulsarClientException {
-        long version = 5;
+        SchedulerManager schedulerManager = mock(SchedulerManager.class);
         WorkerConfig workerConfig = new WorkerConfig();
         workerConfig.setWorkerId("worker-1");
-        SchedulerManager schedulerManager = mock(SchedulerManager.class);
         FunctionMetaDataManager functionMetaDataManager = spy(
                 new FunctionMetaDataManager(workerConfig,
                         schedulerManager,
                         mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
-        // worker has no record of function
-        Function.FunctionMetaData test = Function.FunctionMetaData.newBuilder()
-                .setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-2")
-                        .setNamespace("namespace-1").setTenant("tenant-1")).setVersion(version).build();
-        functionMetaDataManager.setFunctionMetaData(test);
         Function.FunctionMetaData m1 = Function.FunctionMetaData.newBuilder()
+                .setVersion(1)
                 .setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
-                        .setNamespace("namespace-1").setTenant("tenant-1")).setVersion(version).build();
-        Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
-                .setServiceRequestType(Request.ServiceRequest.ServiceRequestType.UPDATE)
-                .setFunctionMetaData(m1)
-                .setWorkerId("worker-1")
-                .build();
-        functionMetaDataManager.proccessDeregister(serviceRequest);
+                        .setNamespace("namespace-1").setTenant("tenant-1")).build();
 
+        Assert.assertFalse(functionMetaDataManager.proccessDeregister(m1));
+        verify(functionMetaDataManager, times(0))
+                .setFunctionMetaData(any(Function.FunctionMetaData.class));
         verify(schedulerManager, times(0)).schedule();
-        Assert.assertEquals(test, functionMetaDataManager.functionMetaDataMap.get(
-                "tenant-1").get("namespace-1").get("func-2"));
+        Assert.assertEquals(0, functionMetaDataManager.functionMetaDataMap.size());
+
+        // insert something
+        Assert.assertTrue(functionMetaDataManager.processUpdate(m1));
+        verify(functionMetaDataManager, times(1))
+                .setFunctionMetaData(any(Function.FunctionMetaData.class));
+        verify(schedulerManager, times(0)).schedule();
+        Assert.assertEquals(m1, functionMetaDataManager.functionMetaDataMap.get(
+                "tenant-1").get("namespace-1").get("func-1"));
         Assert.assertEquals(1, functionMetaDataManager.functionMetaDataMap.get(
                 "tenant-1").get("namespace-1").size());
 
-        // function exists but request outdated
-        schedulerManager = mock(SchedulerManager.class);
-        functionMetaDataManager = spy(
-                new FunctionMetaDataManager(workerConfig,
-                        schedulerManager,
-                        mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
-        functionMetaDataManager.setFunctionMetaData(test);
-        Function.FunctionMetaData m2 = Function.FunctionMetaData.newBuilder()
-                .setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
-                        .setNamespace("namespace-1").setTenant("tenant-1")).setVersion(version).build();
-        functionMetaDataManager.setFunctionMetaData(m2);
-        serviceRequest = Request.ServiceRequest.newBuilder()
-                .setServiceRequestType(Request.ServiceRequest.ServiceRequestType.UPDATE)
-                .setFunctionMetaData(m2)
-                .setWorkerId("worker-1")
-                .build();
-
-        functionMetaDataManager.proccessDeregister(serviceRequest);
+        // outdated delete request
+        try {
+            functionMetaDataManager.proccessDeregister(m1);
+            Assert.assertTrue(false);
+        } catch (IllegalArgumentException e) {
+            Assert.assertEquals(e.getMessage(), "Delete request ignored because it is out of date. Please try again.");
+        }
+        verify(functionMetaDataManager, times(1))
+                .setFunctionMetaData(any(Function.FunctionMetaData.class));
         verify(schedulerManager, times(0)).schedule();
-
-        Assert.assertEquals(test, functionMetaDataManager.functionMetaDataMap.get(
-                "tenant-1").get("namespace-1").get("func-2"));
-        Assert.assertEquals(m2, functionMetaDataManager.functionMetaDataMap.get(
+        Assert.assertEquals(m1, functionMetaDataManager.functionMetaDataMap.get(
                 "tenant-1").get("namespace-1").get("func-1"));
-        Assert.assertEquals(2, functionMetaDataManager.functionMetaDataMap.get(
+        Assert.assertEquals(1, functionMetaDataManager.functionMetaDataMap.get(
                 "tenant-1").get("namespace-1").size());
 
-        // function deleted
-        schedulerManager = mock(SchedulerManager.class);
-        functionMetaDataManager = spy(
-                new FunctionMetaDataManager(workerConfig,
-                        schedulerManager,
-                        mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
-        functionMetaDataManager.setFunctionMetaData(test);
-
-        Function.FunctionMetaData m3 = Function.FunctionMetaData.newBuilder()
-                .setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
-                        .setNamespace("namespace-1").setTenant("tenant-1")).setVersion(version ).build();
-        functionMetaDataManager.setFunctionMetaData(m3);
-
-        Function.FunctionMetaData m4 = Function.FunctionMetaData.newBuilder()
-                .setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
-                        .setNamespace("namespace-1").setTenant("tenant-1")).setVersion(version +1).build();
-        serviceRequest = Request.ServiceRequest.newBuilder()
-                .setServiceRequestType(Request.ServiceRequest.ServiceRequestType.UPDATE)
-                .setFunctionMetaData(m4)
-                .setWorkerId("worker-1")
-                .build();
-
-        functionMetaDataManager.proccessDeregister(serviceRequest);
-        verify(schedulerManager, times(1)).schedule();
-
-        Assert.assertEquals(test, functionMetaDataManager.functionMetaDataMap.get(
-                "tenant-1").get("namespace-1").get("func-2"));
-        Assert.assertEquals(1, functionMetaDataManager.functionMetaDataMap.get(
+        // delete now
+        m1 = m1.toBuilder().setVersion(2).build();
+        Assert.assertTrue(functionMetaDataManager.proccessDeregister(m1));
+        verify(functionMetaDataManager, times(1))
+                .setFunctionMetaData(any(Function.FunctionMetaData.class));
+        verify(schedulerManager, times(0)).schedule();
+        Assert.assertEquals(0, functionMetaDataManager.functionMetaDataMap.get(
                 "tenant-1").get("namespace-1").size());
     }
 }
\ No newline at end of file
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailerTest.java
index b84af4b..784b7ed 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailerTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.functions.worker;
 
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
@@ -25,9 +26,9 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
-import java.util.function.Function;
+import java.util.concurrent.TimeUnit;
+
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
@@ -35,7 +36,6 @@ import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.functions.proto.Request.ServiceRequest;
 import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
-import org.apache.pulsar.functions.worker.request.ServiceRequestUtils;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.testng.annotations.AfterMethod;
@@ -63,7 +63,7 @@ public class FunctionMetaDataTopicTailerTest {
         when(readerBuilder.subscriptionRolePrefix(anyString())).thenReturn(readerBuilder);
         when(readerBuilder.create()).thenReturn(reader);
         this.fsm = mock(FunctionMetaDataManager.class);
-        this.fsc = new FunctionMetaDataTopicTailer(fsm, readerBuilder, new WorkerConfig(), ErrorNotifier.getDefaultImpl() );
+        this.fsc = new FunctionMetaDataTopicTailer(fsm, readerBuilder, new WorkerConfig(), MessageId.earliest, ErrorNotifier.getDefaultImpl() );
     }
 
     @AfterMethod
@@ -75,13 +75,13 @@ public class FunctionMetaDataTopicTailerTest {
     @Test
     public void testUpdate() throws Exception {
 
-        ServiceRequest request = ServiceRequestUtils.getUpdateRequest(TEST_NAME, FunctionMetaData.newBuilder().build());
+        FunctionMetaData request = FunctionMetaData.newBuilder().build();
 
         Message msg = mock(Message.class);
         when(msg.getData()).thenReturn(request.toByteArray());
         CountDownLatch readLatch = new CountDownLatch(1);
         CountDownLatch processLatch = new CountDownLatch(1);
-        when(reader.readNext()).thenReturn(msg).then(new Answer<Message>() {
+        when(reader.readNext(anyInt(), any(TimeUnit.class))).thenReturn(msg).then(new Answer<Message>() {
             public Message answer(InvocationOnMock invocation) {
                 try {
                     readLatch.countDown();
@@ -97,7 +97,7 @@ public class FunctionMetaDataTopicTailerTest {
 
         readLatch.await();
 
-        verify(reader, times(2)).readNext();
-        verify(fsm, times(1)).processRequest(any(), any(ServiceRequest.class));
+        verify(reader, times(2)).readNext(anyInt(), any(TimeUnit.class));
+        verify(fsm, times(1)).processMetaDataTopicMessage(any(Message.class));
     }
 }
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/request/ServiceRequestManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/request/ServiceRequestManagerTest.java
deleted file mode 100644
index 501ce1a..0000000
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/request/ServiceRequestManagerTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.worker.request;
-
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertSame;
-
-import java.util.concurrent.CompletableFuture;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.functions.proto.Request.ServiceRequest;
-import org.apache.pulsar.functions.worker.WorkerUtils;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.Test;
-
-/**
- * Unit test of {@link ServiceRequestManager}.
- */
-@PrepareForTest(WorkerUtils.class)
-public class ServiceRequestManagerTest {
-
-    private final Producer producer;
-    private final ServiceRequestManager reqMgr;
-
-    public ServiceRequestManagerTest() throws Exception {
-        this.producer = mock(Producer.class);
-        this.reqMgr = new ServiceRequestManager(producer);
-    }
-
-    @AfterMethod
-    public void tearDown() throws Exception {
-        reqMgr.close();
-        verify(producer, times(1)).close();
-    }
-
-    @Test
-    public void testSubmitRequest() throws Exception {
-        ServiceRequest request = ServiceRequest.newBuilder().build();
-        MessageId msgId = mock(MessageId.class);
-
-        when(producer.sendAsync(any(byte[].class)))
-            .thenReturn(CompletableFuture.completedFuture(msgId));
-
-        CompletableFuture<MessageId> submitFuture = reqMgr.submitRequest(request);
-        assertSame(msgId, submitFuture.get());
-        verify(producer, times(1)).sendAsync(any(byte[].class));
-    }
-
-}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 88142e6..2a50623 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.functions.worker.rest.api.v2;
 
 import static org.apache.pulsar.functions.utils.FunctionCommon.mergeJson;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
@@ -48,7 +49,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
 
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.StreamingOutput;
@@ -56,13 +56,13 @@ import javax.ws.rs.core.StreamingOutput;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.core.config.Configurator;
-import org.apache.pulsar.client.admin.Namespaces;
-import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.Functions;
+import org.apache.pulsar.client.admin.Namespaces;
 import org.apache.pulsar.client.admin.Tenants;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.RestException;
 import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Function;
@@ -77,12 +77,7 @@ import org.apache.pulsar.functions.proto.Function.SubscriptionType;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.source.TopicSchema;
 import org.apache.pulsar.functions.utils.FunctionConfigUtils;
-import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
-import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
-import org.apache.pulsar.functions.worker.WorkerConfig;
-import org.apache.pulsar.functions.worker.WorkerService;
-import org.apache.pulsar.functions.worker.WorkerUtils;
-import org.apache.pulsar.functions.worker.request.RequestResult;
+import org.apache.pulsar.functions.worker.*;
 import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
 import org.apache.pulsar.functions.worker.rest.api.FunctionsImplV2;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
@@ -132,6 +127,7 @@ public class FunctionApiV2ResourceTest {
     private PulsarAdmin mockedPulsarAdmin;
     private Tenants mockedTenants;
     private Namespaces mockedNamespaces;
+    private Functions mockedFunctions;
     private TenantInfo mockedTenantInfo;
     private List<String> namespaceList = new LinkedList<>();
     private FunctionMetaDataManager mockedManager;
@@ -142,6 +138,7 @@ public class FunctionApiV2ResourceTest {
     private InputStream mockedInputStream;
     private FormDataContentDisposition mockedFormData;
     private FunctionMetaData mockedFunctionMetadata;
+    private LeaderService mockedLeaderService;
 
     @BeforeMethod
     public void setup() throws Exception {
@@ -156,20 +153,26 @@ public class FunctionApiV2ResourceTest {
         this.mockedPulsarAdmin = mock(PulsarAdmin.class);
         this.mockedTenants = mock(Tenants.class);
         this.mockedNamespaces = mock(Namespaces.class);
+        this.mockedFunctions = mock(Functions.class);
+        this.mockedLeaderService = mock(LeaderService.class);
         this.mockedFunctionMetadata = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
         namespaceList.add(tenant + "/" + namespace);
 
         this.mockedWorkerService = mock(WorkerService.class);
         when(mockedWorkerService.getFunctionMetaDataManager()).thenReturn(mockedManager);
+        when(mockedWorkerService.getLeaderService()).thenReturn(mockedLeaderService);
         when(mockedWorkerService.getFunctionRuntimeManager()).thenReturn(mockedFunctionRunTimeManager);
         when(mockedFunctionRunTimeManager.getRuntimeFactory()).thenReturn(mockedRuntimeFactory);
         when(mockedWorkerService.getDlogNamespace()).thenReturn(mockedNamespace);
         when(mockedWorkerService.isInitialized()).thenReturn(true);
         when(mockedWorkerService.getBrokerAdmin()).thenReturn(mockedPulsarAdmin);
+        when(mockedWorkerService.getFunctionAdmin()).thenReturn(mockedPulsarAdmin);
         when(mockedPulsarAdmin.tenants()).thenReturn(mockedTenants);
         when(mockedPulsarAdmin.namespaces()).thenReturn(mockedNamespaces);
+        when(mockedPulsarAdmin.functions()).thenReturn(mockedFunctions);
         when(mockedTenants.getTenantInfo(any())).thenReturn(mockedTenantInfo);
         when(mockedNamespaces.getNamespaces(any())).thenReturn(namespaceList);
+        when(mockedLeaderService.isLeader()).thenReturn(true);
         when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetadata);
 
         // worker config
@@ -495,11 +498,6 @@ public class FunctionApiV2ResourceTest {
                     any(File.class),
                     any(Namespace.class));
             PowerMockito.when(WorkerUtils.class, "dumpToTmpFile", any()).thenCallRealMethod();
-            RequestResult rr = new RequestResult()
-                    .setSuccess(true)
-                    .setMessage("function registered");
-            CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -584,12 +582,6 @@ public class FunctionApiV2ResourceTest {
 
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
 
-            RequestResult rr = new RequestResult()
-                    .setSuccess(true)
-                    .setMessage("function registered");
-            CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
-
             registerDefaultFunction();
         } catch (RestException re) {
             assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
@@ -631,12 +623,8 @@ public class FunctionApiV2ResourceTest {
             PowerMockito.when(WorkerUtils.class, "dumpToTmpFile", any()).thenCallRealMethod();
 
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
-
-            RequestResult rr = new RequestResult()
-                    .setSuccess(false)
-                    .setMessage("function failed to register");
-            CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+            doThrow(new IllegalArgumentException("function failed to register"))
+                    .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), anyBoolean());
 
             registerDefaultFunction();
         } catch (RestException re) {
@@ -645,7 +633,7 @@ public class FunctionApiV2ResourceTest {
         }
     }
 
-    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "java.io.IOException: Function registeration interrupted")
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function registration interrupted")
     public void testRegisterFunctionInterrupted() throws Exception {
         try {
             mockStatic(WorkerUtils.class);
@@ -658,10 +646,8 @@ public class FunctionApiV2ResourceTest {
 
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
 
-            CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
-                    new IOException("Function registeration interrupted"));
-            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
-
+            doThrow(new IllegalStateException("Function registration interrupted"))
+                    .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), anyBoolean());
             registerDefaultFunction();
         } catch (RestException re) {
             assertEquals(re.getResponse().getStatusInfo(), Response.Status.INTERNAL_SERVER_ERROR);
@@ -674,7 +660,7 @@ public class FunctionApiV2ResourceTest {
     //
 
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Tenant is not provided")
-    public void testUpdateFunctionMissingTenant() {
+    public void testUpdateFunctionMissingTenant() throws Exception {
         try {
             testUpdateFunctionMissingArguments(
                     null,
@@ -695,7 +681,7 @@ public class FunctionApiV2ResourceTest {
     }
 
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Namespace is not provided")
-    public void testUpdateFunctionMissingNamespace() {
+    public void testUpdateFunctionMissingNamespace() throws Exception {
         try {
             testUpdateFunctionMissingArguments(
                     tenant,
@@ -716,7 +702,7 @@ public class FunctionApiV2ResourceTest {
     }
 
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function name is not provided")
-    public void testUpdateFunctionMissingFunctionName() {
+    public void testUpdateFunctionMissingFunctionName() throws Exception {
         try {
             testUpdateFunctionMissingArguments(
                     tenant,
@@ -899,7 +885,7 @@ public class FunctionApiV2ResourceTest {
             String outputSerdeClassName,
             String className,
             Integer parallelism,
-            String expectedError) {
+            String expectedError) throws Exception {
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
 
         FunctionConfig functionConfig = new FunctionConfig();
@@ -929,12 +915,9 @@ public class FunctionApiV2ResourceTest {
         }
         functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
 
-        if (expectedError == null) {
-            RequestResult rr = new RequestResult()
-                    .setSuccess(true)
-                    .setMessage("function registered");
-            CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+        if (expectedError != null) {
+            doThrow(new IllegalArgumentException(expectedError))
+                    .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), anyBoolean());
         }
 
         try {
@@ -1023,12 +1006,6 @@ public class FunctionApiV2ResourceTest {
 
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
 
-        RequestResult rr = new RequestResult()
-                .setSuccess(true)
-                .setMessage("function registered");
-        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
-
         updateDefaultFunction();
     }
 
@@ -1053,11 +1030,6 @@ public class FunctionApiV2ResourceTest {
         functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
 
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
-        RequestResult rr = new RequestResult()
-                .setSuccess(true)
-                .setMessage("function registered");
-        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
         try {
             resource.updateFunction(
@@ -1088,11 +1060,8 @@ public class FunctionApiV2ResourceTest {
 
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
 
-            RequestResult rr = new RequestResult()
-                    .setSuccess(false)
-                    .setMessage("function failed to register");
-            CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+            doThrow(new IllegalArgumentException("function failed to register"))
+                    .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), anyBoolean());
 
             updateDefaultFunction();
         } catch (RestException re) {
@@ -1101,7 +1070,7 @@ public class FunctionApiV2ResourceTest {
         }
     }
 
-    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "java.io.IOException: Function registeration interrupted")
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function registeration interrupted")
     public void testUpdateFunctionInterrupted() throws Exception {
         try {
             mockStatic(WorkerUtils.class);
@@ -1116,9 +1085,8 @@ public class FunctionApiV2ResourceTest {
 
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
 
-            CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
-                    new IOException("Function registeration interrupted"));
-            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+            doThrow(new IllegalStateException("Function registeration interrupted"))
+                    .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), anyBoolean());
 
             updateDefaultFunction();
         } catch (RestException re) {
@@ -1209,25 +1177,16 @@ public class FunctionApiV2ResourceTest {
     public void testDeregisterFunctionSuccess() {
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
 
-        RequestResult rr = new RequestResult()
-                .setSuccess(true)
-                .setMessage("function deregistered");
-        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-        when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(requestResult);
-
         deregisterDefaultFunction();
     }
 
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "function failed to deregister")
-    public void testDeregisterFunctionFailure() {
+    public void testDeregisterFunctionFailure() throws Exception {
         try {
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
 
-            RequestResult rr = new RequestResult()
-                    .setSuccess(false)
-                    .setMessage("function failed to deregister");
-            CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-            when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(requestResult);
+            doThrow(new IllegalArgumentException("function failed to deregister"))
+                    .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), anyBoolean());
 
             deregisterDefaultFunction();
         } catch (RestException re) {
@@ -1237,13 +1196,12 @@ public class FunctionApiV2ResourceTest {
     }
 
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function deregisteration interrupted")
-    public void testDeregisterFunctionInterrupted() {
+    public void testDeregisterFunctionInterrupted() throws Exception {
         try {
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
 
-            CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
-                    new IOException("Function deregisteration interrupted"));
-            when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(requestResult);
+            doThrow(new IllegalStateException("Function deregisteration interrupted"))
+                    .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), anyBoolean());
 
             deregisterDefaultFunction();
         }
@@ -1476,10 +1434,6 @@ public class FunctionApiV2ResourceTest {
         String filePackageUrl = "file://" + fileLocation;
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
 
-        RequestResult rr = new RequestResult().setSuccess(true).setMessage("function registered");
-        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
-
         FunctionConfig functionConfig = new FunctionConfig();
         functionConfig.setTenant(tenant);
         functionConfig.setNamespace(namespace);
@@ -1514,10 +1468,6 @@ public class FunctionApiV2ResourceTest {
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
         when(mockedManager.containsFunction(eq(actualTenant), eq(actualNamespace), eq(actualName))).thenReturn(false);
 
-        RequestResult rr = new RequestResult().setSuccess(true).setMessage("function registered");
-        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
-
         FunctionConfig functionConfig = new FunctionConfig();
         functionConfig.setTenant(tenant);
         functionConfig.setNamespace(namespace);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index f1128ef..395951a 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -44,7 +44,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 
 import javax.ws.rs.core.Response;
@@ -53,9 +52,10 @@ import javax.ws.rs.core.StreamingOutput;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.core.config.Configurator;
-import org.apache.pulsar.client.admin.Namespaces;
-import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.Functions;
+import org.apache.pulsar.client.admin.Namespaces;
 import org.apache.pulsar.client.admin.Tenants;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -74,15 +74,11 @@ import org.apache.pulsar.functions.proto.Function.SubscriptionType;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.source.TopicSchema;
 import org.apache.pulsar.functions.utils.FunctionConfigUtils;
-import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
-import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
-import org.apache.pulsar.functions.worker.WorkerConfig;
-import org.apache.pulsar.functions.worker.WorkerService;
-import org.apache.pulsar.functions.worker.WorkerUtils;
-import org.apache.pulsar.functions.worker.request.RequestResult;
+import org.apache.pulsar.functions.worker.*;
 import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
 import org.apache.pulsar.functions.worker.rest.api.v2.FunctionsApiV2Resource;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
+import org.mockito.Mockito;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -136,6 +132,7 @@ public class FunctionApiV3ResourceTest {
     private PulsarAdmin mockedPulsarAdmin;
     private Tenants mockedTenants;
     private Namespaces mockedNamespaces;
+    private Functions mockedFunctions;
     private TenantInfo mockedTenantInfo;
     private List<String> namespaceList = new LinkedList<>();
     private FunctionMetaDataManager mockedManager;
@@ -146,6 +143,7 @@ public class FunctionApiV3ResourceTest {
     private InputStream mockedInputStream;
     private FormDataContentDisposition mockedFormData;
     private FunctionMetaData mockedFunctionMetadata;
+    private LeaderService mockedLeaderService;
 
     @BeforeMethod
     public void setup() throws Exception {
@@ -160,20 +158,26 @@ public class FunctionApiV3ResourceTest {
         this.mockedPulsarAdmin = mock(PulsarAdmin.class);
         this.mockedTenants = mock(Tenants.class);
         this.mockedNamespaces = mock(Namespaces.class);
+        this.mockedFunctions = mock(Functions.class);
+        this.mockedLeaderService = mock(LeaderService.class);
         this.mockedFunctionMetadata = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
         namespaceList.add(tenant + "/" + namespace);
 
         this.mockedWorkerService = mock(WorkerService.class);
         when(mockedWorkerService.getFunctionMetaDataManager()).thenReturn(mockedManager);
         when(mockedWorkerService.getFunctionRuntimeManager()).thenReturn(mockedFunctionRunTimeManager);
+        when(mockedWorkerService.getLeaderService()).thenReturn(mockedLeaderService);
         when(mockedFunctionRunTimeManager.getRuntimeFactory()).thenReturn(mockedRuntimeFactory);
         when(mockedWorkerService.getDlogNamespace()).thenReturn(mockedNamespace);
         when(mockedWorkerService.isInitialized()).thenReturn(true);
         when(mockedWorkerService.getBrokerAdmin()).thenReturn(mockedPulsarAdmin);
+        when(mockedWorkerService.getFunctionAdmin()).thenReturn(mockedPulsarAdmin);
         when(mockedPulsarAdmin.tenants()).thenReturn(mockedTenants);
         when(mockedPulsarAdmin.namespaces()).thenReturn(mockedNamespaces);
+        when(mockedPulsarAdmin.functions()).thenReturn(mockedFunctions);
         when(mockedTenants.getTenantInfo(any())).thenReturn(mockedTenantInfo);
         when(mockedNamespaces.getNamespaces(any())).thenReturn(namespaceList);
+        when(mockedLeaderService.isLeader()).thenReturn(true);
         when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetadata);
 
         // worker config
@@ -609,12 +613,6 @@ public class FunctionApiV3ResourceTest {
 
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
 
-            RequestResult rr = new RequestResult()
-                .setSuccess(true)
-                .setMessage("function registered");
-            CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
-
             registerDefaultFunction();
         } catch (RestException re) {
             assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
@@ -658,11 +656,8 @@ public class FunctionApiV3ResourceTest {
 
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
 
-            RequestResult rr = new RequestResult()
-                .setSuccess(false)
-                .setMessage("function failed to register");
-            CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+            doThrow(new IllegalArgumentException("function failed to register"))
+                    .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), Mockito.anyBoolean());
 
             registerDefaultFunction();
         } catch (RestException re) {
@@ -671,7 +666,7 @@ public class FunctionApiV3ResourceTest {
         }
     }
 
-    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "java.io.IOException: Function registeration interrupted")
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function registration interrupted")
     public void testRegisterFunctionInterrupted() throws Exception {
         try {
             mockStatic(WorkerUtils.class);
@@ -685,9 +680,8 @@ public class FunctionApiV3ResourceTest {
 
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
 
-            CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
-                new IOException("Function registeration interrupted"));
-            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+            doThrow(new IllegalStateException("Function registration interrupted"))
+                    .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), Mockito.anyBoolean());
 
             registerDefaultFunction();
         } catch (RestException re) {
@@ -701,7 +695,7 @@ public class FunctionApiV3ResourceTest {
     //
 
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Tenant is not provided")
-    public void testUpdateFunctionMissingTenant() {
+    public void testUpdateFunctionMissingTenant() throws Exception {
         try {
             testUpdateFunctionMissingArguments(
                 null,
@@ -722,7 +716,7 @@ public class FunctionApiV3ResourceTest {
     }
 
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Namespace is not provided")
-    public void testUpdateFunctionMissingNamespace() {
+    public void testUpdateFunctionMissingNamespace() throws Exception {
         try {
             testUpdateFunctionMissingArguments(
                 tenant,
@@ -743,7 +737,7 @@ public class FunctionApiV3ResourceTest {
     }
 
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function name is not provided")
-    public void testUpdateFunctionMissingFunctionName() {
+    public void testUpdateFunctionMissingFunctionName() throws Exception {
         try {
             testUpdateFunctionMissingArguments(
                 tenant,
@@ -926,7 +920,7 @@ public class FunctionApiV3ResourceTest {
             String outputSerdeClassName,
             String className,
             Integer parallelism,
-            String expectedError) {
+            String expectedError) throws Exception {
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
 
         FunctionConfig functionConfig = new FunctionConfig();
@@ -956,12 +950,9 @@ public class FunctionApiV3ResourceTest {
         }
         functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
 
-        if (expectedError == null) {
-            RequestResult rr = new RequestResult()
-                    .setSuccess(true)
-                    .setMessage("function registered");
-            CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+        if (expectedError != null) {
+            doThrow(new IllegalArgumentException(expectedError))
+                    .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), Mockito.anyBoolean());
         }
 
         resource.updateFunction(
@@ -1042,12 +1033,6 @@ public class FunctionApiV3ResourceTest {
 
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
 
-        RequestResult rr = new RequestResult()
-            .setSuccess(true)
-            .setMessage("function registered");
-        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
-
         updateDefaultFunction();
     }
 
@@ -1070,11 +1055,6 @@ public class FunctionApiV3ResourceTest {
         functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
 
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
-        RequestResult rr = new RequestResult()
-                .setSuccess(true)
-                .setMessage("function registered");
-            CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
         resource.updateFunction(
             tenant,
@@ -1101,11 +1081,8 @@ public class FunctionApiV3ResourceTest {
 
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
 
-            RequestResult rr = new RequestResult()
-                .setSuccess(false)
-                .setMessage("function failed to register");
-            CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+            doThrow(new IllegalArgumentException("function failed to register"))
+                    .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), Mockito.anyBoolean());
 
             updateDefaultFunction();
         } catch (RestException re) {
@@ -1114,7 +1091,7 @@ public class FunctionApiV3ResourceTest {
         }
     }
 
-    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "java.io.IOException: Function registeration interrupted")
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function registeration interrupted")
     public void testUpdateFunctionInterrupted() throws Exception {
         try {
             mockStatic(WorkerUtils.class);
@@ -1127,9 +1104,8 @@ public class FunctionApiV3ResourceTest {
 
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
 
-            CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
-                new IOException("Function registeration interrupted"));
-            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+            doThrow(new IllegalStateException("Function registeration interrupted"))
+                    .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), Mockito.anyBoolean());
 
             updateDefaultFunction();
         } catch (RestException re) {
@@ -1220,25 +1196,16 @@ public class FunctionApiV3ResourceTest {
     public void testDeregisterFunctionSuccess() {
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
 
-        RequestResult rr = new RequestResult()
-            .setSuccess(true)
-            .setMessage("function deregistered");
-        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-        when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(requestResult);
-
         deregisterDefaultFunction();
     }
 
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "function failed to deregister")
-    public void testDeregisterFunctionFailure() {
+    public void testDeregisterFunctionFailure() throws Exception {
         try {
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
 
-            RequestResult rr = new RequestResult()
-                .setSuccess(false)
-                .setMessage("function failed to deregister");
-            CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-            when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(requestResult);
+            doThrow(new IllegalArgumentException("function failed to deregister"))
+                    .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), Mockito.anyBoolean());
 
             deregisterDefaultFunction();
         } catch (RestException re) {
@@ -1248,13 +1215,12 @@ public class FunctionApiV3ResourceTest {
     }
 
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function deregisteration interrupted")
-    public void testDeregisterFunctionInterrupted() {
+    public void testDeregisterFunctionInterrupted() throws Exception {
         try {
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
 
-            CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
-                    new IOException("Function deregisteration interrupted"));
-            when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(requestResult);
+            doThrow(new IllegalStateException("Function deregisteration interrupted"))
+                    .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), Mockito.anyBoolean());
 
             deregisterDefaultFunction();
         }
@@ -1520,10 +1486,6 @@ public class FunctionApiV3ResourceTest {
         String filePackageUrl = "file://" + fileLocation;
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
 
-        RequestResult rr = new RequestResult().setSuccess(true).setMessage("function registered");
-        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
-
         FunctionConfig functionConfig = new FunctionConfig();
         functionConfig.setTenant(tenant);
         functionConfig.setNamespace(namespace);
@@ -1553,10 +1515,6 @@ public class FunctionApiV3ResourceTest {
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
         when(mockedManager.containsFunction(eq(actualTenant), eq(actualNamespace), eq(actualName))).thenReturn(false);
 
-        RequestResult rr = new RequestResult().setSuccess(true).setMessage("function registered");
-        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
-
         FunctionConfig functionConfig = new FunctionConfig();
         functionConfig.setTenant(tenant);
         functionConfig.setNamespace(namespace);
@@ -1580,10 +1538,6 @@ public class FunctionApiV3ResourceTest {
         String filePackageUrl = "file://" + fileLocation;
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
 
-        RequestResult rr = new RequestResult().setSuccess(true).setMessage("function registered");
-        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
-
         FunctionConfig functionConfig = new FunctionConfig();
         functionConfig.setTenant(tenant);
         functionConfig.setNamespace(namespace);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index a93077c..6b55106 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Lists;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.core.config.Configurator;
+import org.apache.pulsar.client.admin.Functions;
 import org.apache.pulsar.client.admin.Namespaces;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -31,7 +32,6 @@ import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.ClassLoaderUtils;
-import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.RestException;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.instance.InstanceUtils;
@@ -42,15 +42,11 @@ import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.SinkConfigUtils;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
-import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
-import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
-import org.apache.pulsar.functions.worker.WorkerConfig;
-import org.apache.pulsar.functions.worker.WorkerService;
-import org.apache.pulsar.functions.worker.WorkerUtils;
-import org.apache.pulsar.functions.worker.request.RequestResult;
+import org.apache.pulsar.functions.worker.*;
 import org.apache.pulsar.functions.worker.rest.api.SinksImpl;
 import org.apache.pulsar.io.cassandra.CassandraStringSink;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
+import org.mockito.Mockito;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -69,7 +65,6 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 
 import static org.apache.pulsar.functions.proto.Function.ProcessingGuarantees.ATLEAST_ONCE;
 import static org.apache.pulsar.functions.source.TopicSchema.DEFAULT_SERDE;
@@ -117,6 +112,7 @@ public class SinkApiV3ResourceTest {
     private PulsarAdmin mockedPulsarAdmin;
     private Tenants mockedTenants;
     private Namespaces mockedNamespaces;
+    private Functions mockedFunctions;
     private TenantInfo mockedTenantInfo;
     private List<String> namespaceList = new LinkedList<>();
     private FunctionMetaDataManager mockedManager;
@@ -127,6 +123,7 @@ public class SinkApiV3ResourceTest {
     private InputStream mockedInputStream;
     private FormDataContentDisposition mockedFormData;
     private FunctionMetaData mockedFunctionMetaData;
+    private LeaderService mockedLeaderService;
 
     @BeforeMethod
     public void setup() throws Exception {
@@ -141,19 +138,25 @@ public class SinkApiV3ResourceTest {
         this.mockedPulsarAdmin = mock(PulsarAdmin.class);
         this.mockedTenants = mock(Tenants.class);
         this.mockedNamespaces = mock(Namespaces.class);
+        this.mockedFunctions = mock(Functions.class);
+        this.mockedLeaderService = mock(LeaderService.class);
         namespaceList.add(tenant + "/" + namespace);
 
         this.mockedWorkerService = mock(WorkerService.class);
         when(mockedWorkerService.getFunctionMetaDataManager()).thenReturn(mockedManager);
+        when(mockedWorkerService.getLeaderService()).thenReturn(mockedLeaderService);
         when(mockedWorkerService.getFunctionRuntimeManager()).thenReturn(mockedFunctionRunTimeManager);
         when(mockedFunctionRunTimeManager.getRuntimeFactory()).thenReturn(mockedRuntimeFactory);
         when(mockedWorkerService.getDlogNamespace()).thenReturn(mockedNamespace);
         when(mockedWorkerService.isInitialized()).thenReturn(true);
         when(mockedWorkerService.getBrokerAdmin()).thenReturn(mockedPulsarAdmin);
+        when(mockedWorkerService.getFunctionAdmin()).thenReturn(mockedPulsarAdmin);
         when(mockedPulsarAdmin.tenants()).thenReturn(mockedTenants);
         when(mockedPulsarAdmin.namespaces()).thenReturn(mockedNamespaces);
+        when(mockedPulsarAdmin.functions()).thenReturn(mockedFunctions);
         when(mockedTenants.getTenantInfo(any())).thenReturn(mockedTenantInfo);
         when(mockedNamespaces.getNamespaces(any())).thenReturn(namespaceList);
+        when(mockedLeaderService.isLeader()).thenReturn(true);
 
         URL file = Thread.currentThread().getContextClassLoader().getResource(JAR_FILE_NAME);
         if (file == null)  {
@@ -162,7 +165,6 @@ public class SinkApiV3ResourceTest {
         JAR_FILE_PATH = file.getFile();
         INVALID_JAR_FILE_PATH = Thread.currentThread().getContextClassLoader().getResource(INVALID_JAR_FILE_NAME).getFile();
 
-
         // worker config
         WorkerConfig workerConfig = new WorkerConfig()
             .setWorkerId("test")
@@ -535,12 +537,6 @@ public class SinkApiV3ResourceTest {
 
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(false);
 
-        RequestResult rr = new RequestResult()
-            .setSuccess(true)
-            .setMessage("sink registered");
-        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
-
         registerDefaultSink();
     }
 
@@ -563,12 +559,6 @@ public class SinkApiV3ResourceTest {
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
         when(mockedManager.containsFunction(eq(actualTenant), eq(actualNamespace), eq(actualName))).thenReturn(false);
 
-        RequestResult rr = new RequestResult()
-                .setSuccess(true)
-                .setMessage("source registered");
-        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
-
         SinkConfig sinkConfig = new SinkConfig();
         sinkConfig.setTenant(tenant);
         sinkConfig.setNamespace(namespace);
@@ -601,11 +591,8 @@ public class SinkApiV3ResourceTest {
 
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(false);
 
-            RequestResult rr = new RequestResult()
-                .setSuccess(false)
-                .setMessage("sink failed to register");
-            CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+            doThrow(new IllegalArgumentException("sink failed to register"))
+                    .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), Mockito.anyBoolean());
 
             registerDefaultSink();
         } catch (RestException re){
@@ -614,7 +601,7 @@ public class SinkApiV3ResourceTest {
         }
     }
 
-    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "java.io.IOException: Function registeration interrupted")
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function registration interrupted")
     public void testRegisterSinkInterrupted() throws Exception {
         try {
             mockStatic(WorkerUtils.class);
@@ -628,9 +615,8 @@ public class SinkApiV3ResourceTest {
 
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(false);
 
-            CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
-                new IOException("Function registeration interrupted"));
-            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+            doThrow(new IllegalStateException("Function registration interrupted"))
+                    .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), Mockito.anyBoolean());
 
             registerDefaultSink();
         } catch (RestException re){
@@ -851,12 +837,9 @@ public class SinkApiV3ResourceTest {
             sinkConfig.setParallelism(parallelism);
         }
 
-        if (expectedError == null) {
-            RequestResult rr = new RequestResult()
-                    .setSuccess(true)
-                    .setMessage("source registered");
-            CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+        if (expectedError != null) {
+            doThrow(new IllegalArgumentException(expectedError))
+                    .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), Mockito.anyBoolean());
         }
 
         resource.updateSink(
@@ -957,12 +940,6 @@ public class SinkApiV3ResourceTest {
 
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
 
-        RequestResult rr = new RequestResult()
-            .setSuccess(true)
-            .setMessage("source registered");
-        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
-
         updateDefaultSink();
     }
 
@@ -1001,12 +978,6 @@ public class SinkApiV3ResourceTest {
         this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
         when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData);
 
-        RequestResult rr = new RequestResult()
-                .setSuccess(true)
-                .setMessage("source registered");
-            CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
-
         resource.updateSink(
             tenant,
             namespace,
@@ -1031,11 +1002,8 @@ public class SinkApiV3ResourceTest {
 
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
 
-            RequestResult rr = new RequestResult()
-                    .setSuccess(false)
-                    .setMessage("sink failed to register");
-            CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+            doThrow(new IllegalArgumentException("sink failed to register"))
+                    .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), Mockito.anyBoolean());
 
             updateDefaultSink();
         } catch (RestException re) {
@@ -1044,7 +1012,7 @@ public class SinkApiV3ResourceTest {
         }
     }
 
-    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "java.io.IOException: Function registeration interrupted")
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function registration interrupted")
     public void testUpdateSinkInterrupted() throws Exception {
         try {
             mockStatic(WorkerUtils.class);
@@ -1057,9 +1025,8 @@ public class SinkApiV3ResourceTest {
 
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
 
-            CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
-                    new IOException("Function registeration interrupted"));
-            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+            doThrow(new IllegalStateException("Function registration interrupted"))
+                    .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), Mockito.anyBoolean());
 
             updateDefaultSink();
         } catch (RestException re) {
@@ -1136,7 +1103,7 @@ public class SinkApiV3ResourceTest {
     }
 
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Sink test-sink doesn't exist")
-    public void testDeregisterNotExistedSink() {
+    public void testDeregisterNotExistedSink()  {
         try {
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(false);
             deregisterDefaultSink();
@@ -1147,28 +1114,19 @@ public class SinkApiV3ResourceTest {
     }
 
     @Test
-    public void testDeregisterSinkSuccess() {
+    public void testDeregisterSinkSuccess() throws Exception {
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
-
-        RequestResult rr = new RequestResult()
-            .setSuccess(true)
-            .setMessage("source deregistered");
-        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-        when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(requestResult);
     }
 
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "sink failed to deregister")
-    public void testDeregisterSinkFailure() {
+    public void testDeregisterSinkFailure() throws Exception {
         try {
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
 
             when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(sink))).thenReturn(FunctionMetaData.newBuilder().build());
 
-            RequestResult rr = new RequestResult()
-                .setSuccess(false)
-                .setMessage("sink failed to deregister");
-            CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-            when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(requestResult);
+            doThrow(new IllegalArgumentException("sink failed to deregister"))
+                    .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), Mockito.anyBoolean());
 
             deregisterDefaultSink();
         } catch (RestException re) {
@@ -1178,15 +1136,14 @@ public class SinkApiV3ResourceTest {
     }
 
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function deregistration interrupted")
-    public void testDeregisterSinkInterrupted() {
+    public void testDeregisterSinkInterrupted() throws Exception {
         try {
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
 
             when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(sink))).thenReturn(FunctionMetaData.newBuilder().build());
 
-            CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
-                new IOException("Function deregistration interrupted"));
-            when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(requestResult);
+            doThrow(new IllegalStateException("Function deregistration interrupted"))
+                    .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), Mockito.anyBoolean());
 
             deregisterDefaultSink();
         } catch (RestException re) {
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index d36c9a9..fe9a117 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -39,13 +39,13 @@ import java.io.InputStream;
 import java.net.URL;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.CompletableFuture;
 
 import javax.ws.rs.core.Response;
 
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.core.config.Configurator;
+import org.apache.pulsar.client.admin.Functions;
 import org.apache.pulsar.client.admin.Namespaces;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -54,7 +54,6 @@ import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.ClassLoaderUtils;
-import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.RestException;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.instance.InstanceUtils;
@@ -69,15 +68,11 @@ import org.apache.pulsar.functions.source.TopicSchema;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.SourceConfigUtils;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
-import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
-import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
-import org.apache.pulsar.functions.worker.WorkerConfig;
-import org.apache.pulsar.functions.worker.WorkerService;
-import org.apache.pulsar.functions.worker.WorkerUtils;
-import org.apache.pulsar.functions.worker.request.RequestResult;
+import org.apache.pulsar.functions.worker.*;
 import org.apache.pulsar.functions.worker.rest.api.SourcesImpl;
 import org.apache.pulsar.io.twitter.TwitterFireHose;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
+import org.mockito.Mockito;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -114,6 +109,7 @@ public class SourceApiV3ResourceTest {
     private PulsarAdmin mockedPulsarAdmin;
     private Tenants mockedTenants;
     private Namespaces mockedNamespaces;
+    private Functions mockedFunctions;
     private TenantInfo mockedTenantInfo;
     private List<String> namespaceList = new LinkedList<>();
     private FunctionMetaDataManager mockedManager;
@@ -124,6 +120,7 @@ public class SourceApiV3ResourceTest {
     private InputStream mockedInputStream;
     private FormDataContentDisposition mockedFormData;
     private FunctionMetaData mockedFunctionMetaData;
+    private LeaderService mockedLeaderService;
 
     @BeforeMethod
     public void setup() throws Exception {
@@ -138,19 +135,25 @@ public class SourceApiV3ResourceTest {
         this.mockedPulsarAdmin = mock(PulsarAdmin.class);
         this.mockedTenants = mock(Tenants.class);
         this.mockedNamespaces = mock(Namespaces.class);
+        this.mockedFunctions = mock(Functions.class);
+        this.mockedLeaderService = mock(LeaderService.class);
         namespaceList.add(tenant + "/" + namespace);
 
         this.mockedWorkerService = mock(WorkerService.class);
         when(mockedWorkerService.getFunctionMetaDataManager()).thenReturn(mockedManager);
+        when(mockedWorkerService.getLeaderService()).thenReturn(mockedLeaderService);
         when(mockedWorkerService.getFunctionRuntimeManager()).thenReturn(mockedFunctionRunTimeManager);
         when(mockedFunctionRunTimeManager.getRuntimeFactory()).thenReturn(mockedRuntimeFactory);
         when(mockedWorkerService.getDlogNamespace()).thenReturn(mockedNamespace);
         when(mockedWorkerService.isInitialized()).thenReturn(true);
         when(mockedWorkerService.getBrokerAdmin()).thenReturn(mockedPulsarAdmin);
+        when(mockedWorkerService.getFunctionAdmin()).thenReturn(mockedPulsarAdmin);
         when(mockedPulsarAdmin.tenants()).thenReturn(mockedTenants);
         when(mockedPulsarAdmin.namespaces()).thenReturn(mockedNamespaces);
+        when(mockedPulsarAdmin.functions()).thenReturn(mockedFunctions);
         when(mockedTenants.getTenantInfo(any())).thenReturn(mockedTenantInfo);
         when(mockedNamespaces.getNamespaces(any())).thenReturn(namespaceList);
+        when(mockedLeaderService.isLeader()).thenReturn(true);
 
         URL file = Thread.currentThread().getContextClassLoader().getResource(JAR_FILE_NAME);
         if (file == null)  {
@@ -503,12 +506,6 @@ public class SourceApiV3ResourceTest {
 
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(false);
 
-        RequestResult rr = new RequestResult()
-            .setSuccess(true)
-            .setMessage("source registered");
-        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
-
         registerDefaultSource();
     }
 
@@ -530,12 +527,6 @@ public class SourceApiV3ResourceTest {
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
         when(mockedManager.containsFunction(eq(actualTenant), eq(actualNamespace), eq(actualName))).thenReturn(false);
 
-        RequestResult rr = new RequestResult()
-                .setSuccess(true)
-                .setMessage("source registered");
-        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
-
         SourceConfig sourceConfig = new SourceConfig();
         sourceConfig.setTenant(tenant);
         sourceConfig.setNamespace(namespace);
@@ -569,11 +560,8 @@ public class SourceApiV3ResourceTest {
 
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(false);
 
-            RequestResult rr = new RequestResult()
-                .setSuccess(false)
-                .setMessage("source failed to register");
-            CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+            doThrow(new IllegalArgumentException("source failed to register"))
+                    .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), Mockito.anyBoolean());
 
             registerDefaultSource();
         } catch (RestException re){
@@ -582,7 +570,7 @@ public class SourceApiV3ResourceTest {
         }
     }
 
-    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "java.io.IOException: Function registration interrupted")
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function registration interrupted")
     public void testRegisterSourceInterrupted() throws Exception {
         try {
             mockStatic(WorkerUtils.class);
@@ -596,9 +584,8 @@ public class SourceApiV3ResourceTest {
 
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(false);
 
-            CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
-                new IOException("Function registration interrupted"));
-            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+            doThrow(new IllegalStateException("Function registration interrupted"))
+                    .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), Mockito.anyBoolean());
 
             registerDefaultSource();
         } catch (RestException re){
@@ -870,12 +857,9 @@ public class SourceApiV3ResourceTest {
             sourceConfig.setParallelism(parallelism);
         }
 
-        if (expectedError == null) {
-            RequestResult rr = new RequestResult()
-                    .setSuccess(true)
-                    .setMessage("source registered");
-            CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+        if (expectedError != null) {
+            doThrow(new IllegalArgumentException(expectedError))
+                    .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), Mockito.anyBoolean());
         }
 
         resource.updateSource(
@@ -972,12 +956,6 @@ public class SourceApiV3ResourceTest {
 
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
 
-        RequestResult rr = new RequestResult()
-            .setSuccess(true)
-            .setMessage("source registered");
-        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
-
         updateDefaultSource();
     }
 
@@ -996,6 +974,7 @@ public class SourceApiV3ResourceTest {
         sourceConfig.setClassName(className);
         sourceConfig.setParallelism(parallelism);
 
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
         mockStatic(ConnectorUtils.class);
         doReturn(TwitterFireHose.class.getName()).when(ConnectorUtils.class);
         ConnectorUtils.getIOSourceClass(any(NarClassLoader.class));
@@ -1013,14 +992,6 @@ public class SourceApiV3ResourceTest {
         this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
         when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData);
 
-
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
-        RequestResult rr = new RequestResult()
-                .setSuccess(true)
-                .setMessage("source registered");
-            CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
-
         resource.updateSource(
             tenant,
             namespace,
@@ -1047,11 +1018,8 @@ public class SourceApiV3ResourceTest {
 
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
 
-            RequestResult rr = new RequestResult()
-                    .setSuccess(false)
-                    .setMessage("source failed to register");
-            CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+            doThrow(new IllegalArgumentException("source failed to register"))
+                    .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), Mockito.anyBoolean());
 
             updateDefaultSource();
         } catch (RestException re){
@@ -1060,7 +1028,7 @@ public class SourceApiV3ResourceTest {
         }
     }
 
-    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "java.io.IOException: Function registration interrupted")
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function registration interrupted")
     public void testUpdateSourceInterrupted() throws Exception {
         try {
             mockStatic(WorkerUtils.class);
@@ -1074,9 +1042,8 @@ public class SourceApiV3ResourceTest {
 
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
 
-            CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
-                new IOException("Function registration interrupted"));
-            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+            doThrow(new IllegalStateException("Function registration interrupted"))
+                    .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), Mockito.anyBoolean());
 
             updateDefaultSource();
         } catch (RestException re){
@@ -1169,27 +1136,18 @@ public class SourceApiV3ResourceTest {
 
         when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(source))).thenReturn(FunctionMetaData.newBuilder().build());
 
-        RequestResult rr = new RequestResult()
-            .setSuccess(true)
-            .setMessage("source deregistered");
-        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-        when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(requestResult);
-
         deregisterDefaultSource();
     }
 
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "source failed to deregister")
-    public void testDeregisterSourceFailure() {
+    public void testDeregisterSourceFailure() throws Exception {
         try {
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
 
             when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(source))).thenReturn(FunctionMetaData.newBuilder().build());
 
-            RequestResult rr = new RequestResult()
-                .setSuccess(false)
-                .setMessage("source failed to deregister");
-            CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-            when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(requestResult);
+            doThrow(new IllegalArgumentException("source failed to deregister"))
+                    .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), Mockito.anyBoolean());
 
             deregisterDefaultSource();
         } catch (RestException re){
@@ -1199,15 +1157,14 @@ public class SourceApiV3ResourceTest {
     }
 
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function deregistration interrupted")
-    public void testDeregisterSourceInterrupted() {
+    public void testDeregisterSourceInterrupted() throws Exception {
         try {
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
 
             when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(source))).thenReturn(FunctionMetaData.newBuilder().build());
 
-            CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
-                new IOException("Function deregistration interrupted"));
-            when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(requestResult);
+            doThrow(new IllegalStateException("Function deregistration interrupted"))
+                    .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), Mockito.anyBoolean());
 
             deregisterDefaultSource();
         } catch (RestException re){