You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/06/12 02:44:01 UTC

[GitHub] [pulsar] srkukarni opened a new pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

srkukarni opened a new pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255


   <!--
   ### Contribution Checklist
     
     - Name the pull request in the form "[Issue XYZ][component] Title of the pull request", where *XYZ* should be replaced by the actual issue number.
       Skip *Issue XYZ* if there is no associated github issue for this pull request.
       Skip *component* if you are unsure about which is the best component. E.g. `[docs] Fix typo in produce method`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   *(If this PR fixes a github issue, please add `Fixes #<xyz>`.)*
   
   Fixes #<xyz>
   
   *(or if this PR is one task of a github issue, please add `Master Issue: #<xyz>` to link to the master issue.)*
   
   Master Issue: #<xyz>
   
   ### Motivation
   Currently Function Metadata topic is not compacted, which means that in a long running system, with sufficient number of function submissions/updates/state changes, the startup lag for workers to read from beginning increases linearly.
   However the current mechanism of Function Metadata topic writes does not lend itself to compaction. This is because all workers write into the topic and only one of them wins(it need not be the last).
   This pr makes a first stab at simplifying the current workflow. Now, upon a function submission/update/state change, the workers simply pass that request to the leader. The leader is the arbitrer of what goes in(just like it is today) and is the only one writing to the function metadata topic. The rest of the worker still continue to tail the topic to receive the appropriate updates. The leader does not have the tailer, and instead directly updates in in-memory state when it writes to the metadata topic.
   
   ### Modifications
   
   *Describe the modifications you've done.*
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (yes / no)
     - The public API: (yes / no)
     - The schema: (yes / no / don't know)
     - The default values of configurations: (yes / no)
     - The wire protocol: (yes / no)
     - The rest endpoints: (yes / no)
     - The admin cli options: (yes / no)
     - Anything that affects deployment: (yes / no / don't know)
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
     - If a feature is not applicable for documentation, explain why?
     - If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r440346471



##########
File path: pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
##########
@@ -984,4 +985,53 @@ public void putFunctionState(String tenant, String namespace, String function, F
         }
         return future;
     }
+
+    @Override
+    public void updateOnWorkerLeader(String tenant, String namespace,
+                                     String function, byte[] functionMetaData,
+                                     boolean delete) throws PulsarAdminException {
+        try {
+            updateOnWorkerLeaderAsync(tenant, namespace, function,
+                    functionMetaData, delete).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> updateOnWorkerLeaderAsync(String tenant, String namespace,

Review comment:
       Same comment about exposing these internal APIs to end users




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r440413018



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -168,68 +151,78 @@ public synchronized boolean containsFunction(String tenant, String namespace, St
         return containsFunctionMetaData(tenant, namespace, functionName);
     }
 
-    /**
-     * Sends an update request to the FMT (Function Metadata Topic)
-     * @param functionMetaData The function metadata that needs to be updated
-     * @return a completable future of when the update has been applied
-     */
-    public synchronized CompletableFuture<RequestResult> updateFunction(FunctionMetaData functionMetaData) {
-
-        FunctionMetaData existingFunctionMetadata = null;
-        if (containsFunction(functionMetaData.getFunctionDetails().getTenant(),
-                functionMetaData.getFunctionDetails().getNamespace(),
-                functionMetaData.getFunctionDetails().getName())) {
-            existingFunctionMetadata = getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(),
-                    functionMetaData.getFunctionDetails().getNamespace(),
-                    functionMetaData.getFunctionDetails().getName());
+    public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaData, boolean delete)
+            throws IllegalStateException, IllegalArgumentException {
+        if (exclusiveLeaderProducer == null) {
+            throw new IllegalStateException("Not the leader");
+        }
+        boolean needsScheduling;
+        if (delete) {
+            needsScheduling = proccessDeregister(functionMetaData);
+        } else {
+            needsScheduling = processUpdate(functionMetaData);
+        }
+        Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
+                .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
+                .setFunctionMetaData(functionMetaData)
+                .setWorkerId(workerConfig.getWorkerId())
+                .setRequestId(UUID.randomUUID().toString())
+                .build();
+        try {
+            exclusiveLeaderProducer.send(serviceRequest.toByteArray());

Review comment:
       That is a follow on change. I wanted this pr to just focus on the changed mechanism of handling updates




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r440403279



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -168,68 +151,78 @@ public synchronized boolean containsFunction(String tenant, String namespace, St
         return containsFunctionMetaData(tenant, namespace, functionName);
     }
 
-    /**
-     * Sends an update request to the FMT (Function Metadata Topic)
-     * @param functionMetaData The function metadata that needs to be updated
-     * @return a completable future of when the update has been applied
-     */
-    public synchronized CompletableFuture<RequestResult> updateFunction(FunctionMetaData functionMetaData) {
-
-        FunctionMetaData existingFunctionMetadata = null;
-        if (containsFunction(functionMetaData.getFunctionDetails().getTenant(),
-                functionMetaData.getFunctionDetails().getNamespace(),
-                functionMetaData.getFunctionDetails().getName())) {
-            existingFunctionMetadata = getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(),
-                    functionMetaData.getFunctionDetails().getNamespace(),
-                    functionMetaData.getFunctionDetails().getName());
+    public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaData, boolean delete)

Review comment:
       Currently when there are concurrent modifications, the worker will fail requests.  That doesn't seem to be done now.  All requests are accepted and 200s will be returned to the user but internally some requests will just be silently igorned.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r444333061



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -171,68 +160,83 @@ public synchronized boolean containsFunction(String tenant, String namespace, St
         return containsFunctionMetaData(tenant, namespace, functionName);
     }
 
-    /**
-     * Sends an update request to the FMT (Function Metadata Topic)
-     * @param functionMetaData The function metadata that needs to be updated
-     * @return a completable future of when the update has been applied
-     */
-    public synchronized CompletableFuture<RequestResult> updateFunction(FunctionMetaData functionMetaData) {
-
-        FunctionMetaData existingFunctionMetadata = null;
-        if (containsFunction(functionMetaData.getFunctionDetails().getTenant(),
-                functionMetaData.getFunctionDetails().getNamespace(),
-                functionMetaData.getFunctionDetails().getName())) {
-            existingFunctionMetadata = getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(),
-                    functionMetaData.getFunctionDetails().getNamespace(),
-                    functionMetaData.getFunctionDetails().getName());
+    public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaData, boolean delete)

Review comment:
       Added more comments on this class




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r445346075



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -174,89 +181,124 @@ public synchronized boolean containsFunction(String tenant, String namespace, St
     }
 
     /**
-     * Sends an update request to the FMT (Function Metadata Topic)
-     * @param functionMetaData The function metadata that needs to be updated
-     * @return a completable future of when the update has been applied
+     * Called by the worker when we are in the leader mode.  In this state, we update our in-memory
+     * data structures and then write to the metadata topic.
+     * @param functionMetaData The function metadata in question
+     * @param delete Is this a delete operation
+     * @throws IllegalStateException if we are not the leader
+     * @throws IllegalArgumentException if the request is out of date.
      */
-    public synchronized CompletableFuture<RequestResult> updateFunction(FunctionMetaData functionMetaData) {
-
-        FunctionMetaData existingFunctionMetadata = null;
-        if (containsFunction(functionMetaData.getFunctionDetails().getTenant(),
-                functionMetaData.getFunctionDetails().getNamespace(),
-                functionMetaData.getFunctionDetails().getName())) {
-            existingFunctionMetadata = getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(),
-                    functionMetaData.getFunctionDetails().getNamespace(),
-                    functionMetaData.getFunctionDetails().getName());
+    public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaData, boolean delete)
+            throws IllegalStateException, IllegalArgumentException {
+        if (exclusiveLeaderProducer == null) {
+            throw new IllegalStateException("Not the leader");
+        }
+        boolean needsScheduling;
+        if (delete) {
+            needsScheduling = proccessDeregister(functionMetaData);
+        } else {
+            needsScheduling = processUpdate(functionMetaData);
+        }
+        Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
+                .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
+                .setFunctionMetaData(functionMetaData)
+                .setWorkerId(workerConfig.getWorkerId())
+                .setRequestId(UUID.randomUUID().toString())
+                .build();
+        try {
+            lastMessageSeen = exclusiveLeaderProducer.send(serviceRequest.toByteArray());
+        } catch (Exception e) {

Review comment:
       We should return the error to the worker making the call to the leader, otherwise the worker might have to wait for a timeout.  I think we should just return an error and the user can retry.  There is no guarantee that restarting the worker or electing another leader will help solve the issue since all the workers have the same configuration.  Restarting can also be heavy and I would prefer to minimize the amount of forced restarts as possible.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r445259166



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -174,89 +188,127 @@ public synchronized boolean containsFunction(String tenant, String namespace, St
     }
 
     /**
-     * Sends an update request to the FMT (Function Metadata Topic)
-     * @param functionMetaData The function metadata that needs to be updated
-     * @return a completable future of when the update has been applied
+     * Called by the worker when we are in the leader mode.  In this state, we update our in-memory
+     * data structures and then write to the metadata topic.
+     * @param functionMetaData The function metadata in question
+     * @param delete Is this a delete operation
+     * @throws IllegalStateException if we are not the leader
+     * @throws IllegalArgumentException if the request is out of date.
      */
-    public synchronized CompletableFuture<RequestResult> updateFunction(FunctionMetaData functionMetaData) {
-
-        FunctionMetaData existingFunctionMetadata = null;
-        if (containsFunction(functionMetaData.getFunctionDetails().getTenant(),
-                functionMetaData.getFunctionDetails().getNamespace(),
-                functionMetaData.getFunctionDetails().getName())) {
-            existingFunctionMetadata = getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(),
-                    functionMetaData.getFunctionDetails().getNamespace(),
-                    functionMetaData.getFunctionDetails().getName());
+    public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaData, boolean delete)
+            throws IllegalStateException, IllegalArgumentException {
+        if (exclusiveLeaderProducer == null) {
+            throw new IllegalStateException("Not the leader");
+        }
+        boolean needsScheduling;
+        if (delete) {
+            needsScheduling = proccessDeregister(functionMetaData);
+        } else {
+            needsScheduling = processUpdate(functionMetaData);
+        }
+        Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
+                .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
+                .setFunctionMetaData(functionMetaData)
+                .setWorkerId(workerConfig.getWorkerId())
+                .setRequestId(UUID.randomUUID().toString())
+                .build();
+        try {
+            lastMessageSeen = exclusiveLeaderProducer.send(serviceRequest.toByteArray());
+        } catch (Exception e) {
+            log.error("Could not write into Function Metadata topic", e);
+            errorNotifier.triggerError(e);
+            throw new IllegalStateException("Internal Error updating function at the leader", e);
+        }
+        if (needsScheduling) {
+            this.schedulerManager.schedule();
         }
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(existingFunctionMetadata, functionMetaData);
-
-        Request.ServiceRequest updateRequest = ServiceRequestUtils.getUpdateRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
-
-        return submit(updateRequest);
     }
 
-
     /**
-     * Sends a deregister request to the FMT (Function Metadata Topic) for a function
-     * @param tenant the tenant the function that needs to be deregistered belongs to
-     * @param namespace the namespace the function that needs to be deregistered belongs to
-     * @param functionName the name of the function
-     * @return a completable future of when the deregister has been applied
+     * Called by the leader service when this worker becomes the leader.
+     * We first get exclusive producer on the metadata topic. Next we drain the tailer
+     * to ensure that we have caught up to metadata topic. After which we close the tailer.
+     * Note that this method cannot be syncrhonized because the tailer might still be processing messages
      */
-    public synchronized CompletableFuture<RequestResult> deregisterFunction(String tenant, String namespace, String functionName) {
-        FunctionMetaData functionMetaData = this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(functionMetaData, functionMetaData);
-
-        Request.ServiceRequest deregisterRequest = ServiceRequestUtils.getDeregisterRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
+    public void acquireLeadership() {
+        log.info("FunctionMetaDataManager becoming leader by creating exclusive producer");
+        FunctionMetaDataTopicTailer tailer = internalAcquireLeadership();
+        // Now that we have created the exclusive producer, wait for reader to get over
+        if (tailer != null) {
+            try {
+                tailer.stopWhenNoMoreMessages().get();
+            } catch (Exception e) {
+                log.error("Error while waiting for metadata tailer thread to finish", e);
+                errorNotifier.triggerError(e);
+            }
+            tailer.close();
+        }
+        log.info("FunctionMetaDataManager done becoming leader");
+    }
 
-        return submit(deregisterRequest);
+    private synchronized FunctionMetaDataTopicTailer internalAcquireLeadership() {
+        if (exclusiveLeaderProducer == null) {
+            try {
+                exclusiveLeaderProducer = pulsarClient.newProducer()

Review comment:
       couldn't there be a race condition between here and "start()" where you are relying on whether exclusiveLeaderProducer is null or not.  "acquireLeadership()" is called by the "becomeActive" which is using the client listener thread while "start()" is executed by the worker "main" thread




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r444025141



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -168,68 +151,78 @@ public synchronized boolean containsFunction(String tenant, String namespace, St
         return containsFunctionMetaData(tenant, namespace, functionName);
     }
 
-    /**
-     * Sends an update request to the FMT (Function Metadata Topic)
-     * @param functionMetaData The function metadata that needs to be updated
-     * @return a completable future of when the update has been applied
-     */
-    public synchronized CompletableFuture<RequestResult> updateFunction(FunctionMetaData functionMetaData) {
-
-        FunctionMetaData existingFunctionMetadata = null;
-        if (containsFunction(functionMetaData.getFunctionDetails().getTenant(),
-                functionMetaData.getFunctionDetails().getNamespace(),
-                functionMetaData.getFunctionDetails().getName())) {
-            existingFunctionMetadata = getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(),
-                    functionMetaData.getFunctionDetails().getNamespace(),
-                    functionMetaData.getFunctionDetails().getName());
+    public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaData, boolean delete)

Review comment:
       How is that the case?  The method "submit" is deleted. We were using the "submit()" method to put a completable future in a list and then completing the future when we were merging in the request to the local metdata cache. If there was a conflict the future is completed exceptionally with an error message.  It seems that whole workflow is removed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r444028946



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -168,68 +151,78 @@ public synchronized boolean containsFunction(String tenant, String namespace, St
         return containsFunctionMetaData(tenant, namespace, functionName);
     }
 
-    /**
-     * Sends an update request to the FMT (Function Metadata Topic)
-     * @param functionMetaData The function metadata that needs to be updated
-     * @return a completable future of when the update has been applied
-     */
-    public synchronized CompletableFuture<RequestResult> updateFunction(FunctionMetaData functionMetaData) {
-
-        FunctionMetaData existingFunctionMetadata = null;
-        if (containsFunction(functionMetaData.getFunctionDetails().getTenant(),
-                functionMetaData.getFunctionDetails().getNamespace(),
-                functionMetaData.getFunctionDetails().getName())) {
-            existingFunctionMetadata = getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(),
-                    functionMetaData.getFunctionDetails().getNamespace(),
-                    functionMetaData.getFunctionDetails().getName());
+    public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaData, boolean delete)

Review comment:
       I see you are throwing an IllegalArgumentException for those




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r444496340



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -174,89 +181,124 @@ public synchronized boolean containsFunction(String tenant, String namespace, St
     }
 
     /**
-     * Sends an update request to the FMT (Function Metadata Topic)
-     * @param functionMetaData The function metadata that needs to be updated
-     * @return a completable future of when the update has been applied
+     * Called by the worker when we are in the leader mode.  In this state, we update our in-memory
+     * data structures and then write to the metadata topic.
+     * @param functionMetaData The function metadata in question
+     * @param delete Is this a delete operation
+     * @throws IllegalStateException if we are not the leader
+     * @throws IllegalArgumentException if the request is out of date.
      */
-    public synchronized CompletableFuture<RequestResult> updateFunction(FunctionMetaData functionMetaData) {
-
-        FunctionMetaData existingFunctionMetadata = null;
-        if (containsFunction(functionMetaData.getFunctionDetails().getTenant(),
-                functionMetaData.getFunctionDetails().getNamespace(),
-                functionMetaData.getFunctionDetails().getName())) {
-            existingFunctionMetadata = getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(),
-                    functionMetaData.getFunctionDetails().getNamespace(),
-                    functionMetaData.getFunctionDetails().getName());
+    public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaData, boolean delete)
+            throws IllegalStateException, IllegalArgumentException {
+        if (exclusiveLeaderProducer == null) {
+            throw new IllegalStateException("Not the leader");
+        }
+        boolean needsScheduling;
+        if (delete) {
+            needsScheduling = proccessDeregister(functionMetaData);
+        } else {
+            needsScheduling = processUpdate(functionMetaData);
+        }
+        Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
+                .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
+                .setFunctionMetaData(functionMetaData)
+                .setWorkerId(workerConfig.getWorkerId())
+                .setRequestId(UUID.randomUUID().toString())
+                .build();
+        try {
+            lastMessageSeen = exclusiveLeaderProducer.send(serviceRequest.toByteArray());
+        } catch (Exception e) {
+            log.error("Could not write into Function Metadata topic", e);
+            errorNotifier.triggerError(e);
+        }
+        if (needsScheduling) {
+            this.schedulerManager.schedule();
         }
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(existingFunctionMetadata, functionMetaData);
-
-        Request.ServiceRequest updateRequest = ServiceRequestUtils.getUpdateRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
-
-        return submit(updateRequest);
     }
 
-
     /**
-     * Sends a deregister request to the FMT (Function Metadata Topic) for a function
-     * @param tenant the tenant the function that needs to be deregistered belongs to
-     * @param namespace the namespace the function that needs to be deregistered belongs to
-     * @param functionName the name of the function
-     * @return a completable future of when the deregister has been applied
+     * Called by the leader service when this worker becomes the leader.
+     * We first get exclusive producer on the metadata topic. Next we drain the tailer
+     * to ensure that we have caught up to metadata topic. After which we close the tailer.
+     * Note that this method cannot be syncrhonized because the tailer might still be processing messages
      */
-    public synchronized CompletableFuture<RequestResult> deregisterFunction(String tenant, String namespace, String functionName) {
-        FunctionMetaData functionMetaData = this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(functionMetaData, functionMetaData);
-
-        Request.ServiceRequest deregisterRequest = ServiceRequestUtils.getDeregisterRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
+    public void acquireLeadership() {
+        log.info("FunctionMetaDataManager becoming leader by creating exclusive producer");
+        FunctionMetaDataTopicTailer tailer = internalAcquireLeadership();
+        // Now that we have created the exclusive producer, wait for reader to get over
+        if (tailer != null) {
+            try {
+                tailer.stopWhenNoMoreMessages().get();
+            } catch (Exception e) {
+                log.error("Error while waiting for metadata tailer thread to finish", e);
+                errorNotifier.triggerError(e);
+            }
+            tailer.close();
+        }
+        this.schedulerManager.schedule();
+        log.info("FunctionMetaDataManager done becoming leader by doing its first schedule");
+    }
 
-        return submit(deregisterRequest);
+    private synchronized FunctionMetaDataTopicTailer internalAcquireLeadership() {

Review comment:
       Because there could be other updates to functions going on. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r440370551



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
##########
@@ -77,12 +89,25 @@ public void run() {
                     if (!(th instanceof InterruptedException || th.getCause() instanceof InterruptedException)) {
                         log.warn("Encountered error when metadata tailer is not running", th);
                     }
-                    return;
                 }
             }
         }
     }
 
+    public void stopWhenNoMoreMessages() {
+        stopOnNoMessageAvailable = true;
+        readerThread.interrupt();
+        // We need to wait here till the thread exits to make sure that the reader is up to date
+        while (true) {
+            try {
+                readerThread.join();
+                return;
+            } catch (InterruptedException e) {

Review comment:
       we shouldn't need to catch any exceptions.  The thread needs to complete without any errors




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r444481424



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -174,89 +181,124 @@ public synchronized boolean containsFunction(String tenant, String namespace, St
     }
 
     /**
-     * Sends an update request to the FMT (Function Metadata Topic)
-     * @param functionMetaData The function metadata that needs to be updated
-     * @return a completable future of when the update has been applied
+     * Called by the worker when we are in the leader mode.  In this state, we update our in-memory
+     * data structures and then write to the metadata topic.
+     * @param functionMetaData The function metadata in question
+     * @param delete Is this a delete operation
+     * @throws IllegalStateException if we are not the leader
+     * @throws IllegalArgumentException if the request is out of date.
      */
-    public synchronized CompletableFuture<RequestResult> updateFunction(FunctionMetaData functionMetaData) {
-
-        FunctionMetaData existingFunctionMetadata = null;
-        if (containsFunction(functionMetaData.getFunctionDetails().getTenant(),
-                functionMetaData.getFunctionDetails().getNamespace(),
-                functionMetaData.getFunctionDetails().getName())) {
-            existingFunctionMetadata = getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(),
-                    functionMetaData.getFunctionDetails().getNamespace(),
-                    functionMetaData.getFunctionDetails().getName());
+    public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaData, boolean delete)
+            throws IllegalStateException, IllegalArgumentException {
+        if (exclusiveLeaderProducer == null) {
+            throw new IllegalStateException("Not the leader");
+        }
+        boolean needsScheduling;
+        if (delete) {
+            needsScheduling = proccessDeregister(functionMetaData);
+        } else {
+            needsScheduling = processUpdate(functionMetaData);
+        }
+        Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
+                .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
+                .setFunctionMetaData(functionMetaData)
+                .setWorkerId(workerConfig.getWorkerId())
+                .setRequestId(UUID.randomUUID().toString())
+                .build();
+        try {
+            lastMessageSeen = exclusiveLeaderProducer.send(serviceRequest.toByteArray());
+        } catch (Exception e) {
+            log.error("Could not write into Function Metadata topic", e);
+            errorNotifier.triggerError(e);
+        }
+        if (needsScheduling) {
+            this.schedulerManager.schedule();
         }
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(existingFunctionMetadata, functionMetaData);
-
-        Request.ServiceRequest updateRequest = ServiceRequestUtils.getUpdateRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
-
-        return submit(updateRequest);
     }
 
-
     /**
-     * Sends a deregister request to the FMT (Function Metadata Topic) for a function
-     * @param tenant the tenant the function that needs to be deregistered belongs to
-     * @param namespace the namespace the function that needs to be deregistered belongs to
-     * @param functionName the name of the function
-     * @return a completable future of when the deregister has been applied
+     * Called by the leader service when this worker becomes the leader.
+     * We first get exclusive producer on the metadata topic. Next we drain the tailer
+     * to ensure that we have caught up to metadata topic. After which we close the tailer.
+     * Note that this method cannot be syncrhonized because the tailer might still be processing messages
      */
-    public synchronized CompletableFuture<RequestResult> deregisterFunction(String tenant, String namespace, String functionName) {
-        FunctionMetaData functionMetaData = this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(functionMetaData, functionMetaData);
-
-        Request.ServiceRequest deregisterRequest = ServiceRequestUtils.getDeregisterRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
+    public void acquireLeadership() {
+        log.info("FunctionMetaDataManager becoming leader by creating exclusive producer");
+        FunctionMetaDataTopicTailer tailer = internalAcquireLeadership();
+        // Now that we have created the exclusive producer, wait for reader to get over
+        if (tailer != null) {
+            try {
+                tailer.stopWhenNoMoreMessages().get();
+            } catch (Exception e) {
+                log.error("Error while waiting for metadata tailer thread to finish", e);
+                errorNotifier.triggerError(e);
+            }
+            tailer.close();
+        }
+        this.schedulerManager.schedule();
+        log.info("FunctionMetaDataManager done becoming leader by doing its first schedule");
+    }
 
-        return submit(deregisterRequest);
+    private synchronized FunctionMetaDataTopicTailer internalAcquireLeadership() {

Review comment:
       why do we need to synchronize this method?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r444438587



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -52,20 +61,15 @@
     @VisibleForTesting
     final Map<String, Map<String, Map<String, FunctionMetaData>>> functionMetaDataMap = new ConcurrentHashMap<>();
 
-    // A map in which the key is the service request id and value is the service request
-    private final Map<String, ServiceRequestInfo> pendingServiceRequests = new ConcurrentHashMap<>();
-
     private final ServiceRequestManager serviceRequestManager;
     private final SchedulerManager schedulerManager;
     private final WorkerConfig workerConfig;
     private final PulsarClient pulsarClient;
     private final ErrorNotifier errorNotifier;
 
     private FunctionMetaDataTopicTailer functionMetaDataTopicTailer;
-
-    @Setter
-    @Getter
-    boolean isInitializePhase = false;
+    private Producer exclusiveLeaderProducer;
+    private MessageId lastMessageSeen = MessageId.earliest;

Review comment:
       It seems more natural that the FunctionMetaDataTopicTailer keeps track of this




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r444465208



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -174,89 +181,124 @@ public synchronized boolean containsFunction(String tenant, String namespace, St
     }
 
     /**
-     * Sends an update request to the FMT (Function Metadata Topic)
-     * @param functionMetaData The function metadata that needs to be updated
-     * @return a completable future of when the update has been applied
+     * Called by the worker when we are in the leader mode.  In this state, we update our in-memory
+     * data structures and then write to the metadata topic.
+     * @param functionMetaData The function metadata in question
+     * @param delete Is this a delete operation
+     * @throws IllegalStateException if we are not the leader
+     * @throws IllegalArgumentException if the request is out of date.
      */
-    public synchronized CompletableFuture<RequestResult> updateFunction(FunctionMetaData functionMetaData) {
-
-        FunctionMetaData existingFunctionMetadata = null;
-        if (containsFunction(functionMetaData.getFunctionDetails().getTenant(),
-                functionMetaData.getFunctionDetails().getNamespace(),
-                functionMetaData.getFunctionDetails().getName())) {
-            existingFunctionMetadata = getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(),
-                    functionMetaData.getFunctionDetails().getNamespace(),
-                    functionMetaData.getFunctionDetails().getName());
+    public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaData, boolean delete)
+            throws IllegalStateException, IllegalArgumentException {
+        if (exclusiveLeaderProducer == null) {
+            throw new IllegalStateException("Not the leader");
+        }
+        boolean needsScheduling;
+        if (delete) {
+            needsScheduling = proccessDeregister(functionMetaData);
+        } else {
+            needsScheduling = processUpdate(functionMetaData);
+        }
+        Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
+                .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
+                .setFunctionMetaData(functionMetaData)
+                .setWorkerId(workerConfig.getWorkerId())
+                .setRequestId(UUID.randomUUID().toString())
+                .build();
+        try {
+            lastMessageSeen = exclusiveLeaderProducer.send(serviceRequest.toByteArray());
+        } catch (Exception e) {
+            log.error("Could not write into Function Metadata topic", e);
+            errorNotifier.triggerError(e);
+        }
+        if (needsScheduling) {
+            this.schedulerManager.schedule();
         }
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(existingFunctionMetadata, functionMetaData);
-
-        Request.ServiceRequest updateRequest = ServiceRequestUtils.getUpdateRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
-
-        return submit(updateRequest);
     }
 
-
     /**
-     * Sends a deregister request to the FMT (Function Metadata Topic) for a function
-     * @param tenant the tenant the function that needs to be deregistered belongs to
-     * @param namespace the namespace the function that needs to be deregistered belongs to
-     * @param functionName the name of the function
-     * @return a completable future of when the deregister has been applied
+     * Called by the leader service when this worker becomes the leader.
+     * We first get exclusive producer on the metadata topic. Next we drain the tailer
+     * to ensure that we have caught up to metadata topic. After which we close the tailer.
+     * Note that this method cannot be syncrhonized because the tailer might still be processing messages
      */
-    public synchronized CompletableFuture<RequestResult> deregisterFunction(String tenant, String namespace, String functionName) {
-        FunctionMetaData functionMetaData = this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(functionMetaData, functionMetaData);
-
-        Request.ServiceRequest deregisterRequest = ServiceRequestUtils.getDeregisterRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
+    public void acquireLeadership() {
+        log.info("FunctionMetaDataManager becoming leader by creating exclusive producer");
+        FunctionMetaDataTopicTailer tailer = internalAcquireLeadership();
+        // Now that we have created the exclusive producer, wait for reader to get over
+        if (tailer != null) {
+            try {
+                tailer.stopWhenNoMoreMessages().get();
+            } catch (Exception e) {
+                log.error("Error while waiting for metadata tailer thread to finish", e);
+                errorNotifier.triggerError(e);
+            }
+            tailer.close();
+        }
+        this.schedulerManager.schedule();
+        log.info("FunctionMetaDataManager done becoming leader by doing its first schedule");
+    }
 
-        return submit(deregisterRequest);
+    private synchronized FunctionMetaDataTopicTailer internalAcquireLeadership() {
+        if (exclusiveLeaderProducer == null) {
+            try {
+                exclusiveLeaderProducer = pulsarClient.newProducer()
+                        .topic(this.workerConfig.getFunctionMetadataTopic())
+                        .producerName(workerConfig.getWorkerId() + "-leader")
+                        // .type(EXCLUSIVE)
+                        .create();
+            } catch (PulsarClientException e) {
+                log.error("Error creating exclusive producer", e);
+                errorNotifier.triggerError(e);
+            }
+        } else {
+            log.error("Logic Error in FunctionMetaData Manager");
+            errorNotifier.triggerError(new IllegalStateException());
+        }
+        FunctionMetaDataTopicTailer tailer = this.functionMetaDataTopicTailer;
+        this.functionMetaDataTopicTailer = null;
+        return tailer;
     }
 
     /**
-     * Sends a start/stop function request to the FMT (Function Metadata Topic) for a function
-     * @param tenant the tenant the function that needs to be deregistered belongs to
-     * @param namespace the namespace the function that needs to be deregistered belongs to
-     * @param functionName the name of the function
-     * @param instanceId the instanceId of the function, -1 if for all instances
-     * @param start do we need to start or stop
-     * @return a completable future of when the start/stop has been applied
+     * called by the leader service when we lose leadership. We close the exclusive producer
+     * and start the tailer.
      */
-    public synchronized CompletableFuture<RequestResult> changeFunctionInstanceStatus(String tenant, String namespace, String functionName,
-                                                                                      Integer instanceId, boolean start) {
-        FunctionMetaData functionMetaData = this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.changeFunctionInstanceStatus(functionMetaData, instanceId, start);
-
-        Request.ServiceRequest updateRequest = ServiceRequestUtils.getUpdateRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
-
-        return submit(updateRequest);
+    public synchronized void giveupLeadership() {
+        log.info("FunctionMetaDataManager giving up leadership by closing exclusive producer");
+        try {
+            exclusiveLeaderProducer.close();
+            exclusiveLeaderProducer = null;
+            initializeTailer();

Review comment:
       Shouldn't we also start the the tailer here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r445257304



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -174,89 +181,124 @@ public synchronized boolean containsFunction(String tenant, String namespace, St
     }
 
     /**
-     * Sends an update request to the FMT (Function Metadata Topic)
-     * @param functionMetaData The function metadata that needs to be updated
-     * @return a completable future of when the update has been applied
+     * Called by the worker when we are in the leader mode.  In this state, we update our in-memory
+     * data structures and then write to the metadata topic.
+     * @param functionMetaData The function metadata in question
+     * @param delete Is this a delete operation
+     * @throws IllegalStateException if we are not the leader
+     * @throws IllegalArgumentException if the request is out of date.
      */
-    public synchronized CompletableFuture<RequestResult> updateFunction(FunctionMetaData functionMetaData) {
-
-        FunctionMetaData existingFunctionMetadata = null;
-        if (containsFunction(functionMetaData.getFunctionDetails().getTenant(),
-                functionMetaData.getFunctionDetails().getNamespace(),
-                functionMetaData.getFunctionDetails().getName())) {
-            existingFunctionMetadata = getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(),
-                    functionMetaData.getFunctionDetails().getNamespace(),
-                    functionMetaData.getFunctionDetails().getName());
+    public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaData, boolean delete)
+            throws IllegalStateException, IllegalArgumentException {
+        if (exclusiveLeaderProducer == null) {
+            throw new IllegalStateException("Not the leader");
+        }
+        boolean needsScheduling;
+        if (delete) {
+            needsScheduling = proccessDeregister(functionMetaData);
+        } else {
+            needsScheduling = processUpdate(functionMetaData);
+        }
+        Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
+                .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
+                .setFunctionMetaData(functionMetaData)
+                .setWorkerId(workerConfig.getWorkerId())
+                .setRequestId(UUID.randomUUID().toString())
+                .build();
+        try {
+            lastMessageSeen = exclusiveLeaderProducer.send(serviceRequest.toByteArray());
+        } catch (Exception e) {
+            log.error("Could not write into Function Metadata topic", e);
+            errorNotifier.triggerError(e);
+        }
+        if (needsScheduling) {
+            this.schedulerManager.schedule();
         }
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(existingFunctionMetadata, functionMetaData);
-
-        Request.ServiceRequest updateRequest = ServiceRequestUtils.getUpdateRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
-
-        return submit(updateRequest);
     }
 
-
     /**
-     * Sends a deregister request to the FMT (Function Metadata Topic) for a function
-     * @param tenant the tenant the function that needs to be deregistered belongs to
-     * @param namespace the namespace the function that needs to be deregistered belongs to
-     * @param functionName the name of the function
-     * @return a completable future of when the deregister has been applied
+     * Called by the leader service when this worker becomes the leader.
+     * We first get exclusive producer on the metadata topic. Next we drain the tailer
+     * to ensure that we have caught up to metadata topic. After which we close the tailer.
+     * Note that this method cannot be syncrhonized because the tailer might still be processing messages
      */
-    public synchronized CompletableFuture<RequestResult> deregisterFunction(String tenant, String namespace, String functionName) {
-        FunctionMetaData functionMetaData = this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(functionMetaData, functionMetaData);
-
-        Request.ServiceRequest deregisterRequest = ServiceRequestUtils.getDeregisterRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
+    public void acquireLeadership() {
+        log.info("FunctionMetaDataManager becoming leader by creating exclusive producer");
+        FunctionMetaDataTopicTailer tailer = internalAcquireLeadership();
+        // Now that we have created the exclusive producer, wait for reader to get over
+        if (tailer != null) {
+            try {
+                tailer.stopWhenNoMoreMessages().get();
+            } catch (Exception e) {
+                log.error("Error while waiting for metadata tailer thread to finish", e);
+                errorNotifier.triggerError(e);
+            }
+            tailer.close();
+        }
+        this.schedulerManager.schedule();
+        log.info("FunctionMetaDataManager done becoming leader by doing its first schedule");
+    }
 
-        return submit(deregisterRequest);
+    private synchronized FunctionMetaDataTopicTailer internalAcquireLeadership() {

Review comment:
       @srkukarni ^^^




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r440394613



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -168,68 +151,78 @@ public synchronized boolean containsFunction(String tenant, String namespace, St
         return containsFunctionMetaData(tenant, namespace, functionName);
     }
 
-    /**
-     * Sends an update request to the FMT (Function Metadata Topic)
-     * @param functionMetaData The function metadata that needs to be updated
-     * @return a completable future of when the update has been applied
-     */
-    public synchronized CompletableFuture<RequestResult> updateFunction(FunctionMetaData functionMetaData) {
-
-        FunctionMetaData existingFunctionMetadata = null;
-        if (containsFunction(functionMetaData.getFunctionDetails().getTenant(),
-                functionMetaData.getFunctionDetails().getNamespace(),
-                functionMetaData.getFunctionDetails().getName())) {
-            existingFunctionMetadata = getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(),
-                    functionMetaData.getFunctionDetails().getNamespace(),
-                    functionMetaData.getFunctionDetails().getName());
+    public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaData, boolean delete)
+            throws IllegalStateException, IllegalArgumentException {
+        if (exclusiveLeaderProducer == null) {
+            throw new IllegalStateException("Not the leader");
+        }
+        boolean needsScheduling;
+        if (delete) {
+            needsScheduling = proccessDeregister(functionMetaData);
+        } else {
+            needsScheduling = processUpdate(functionMetaData);
+        }
+        Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
+                .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
+                .setFunctionMetaData(functionMetaData)
+                .setWorkerId(workerConfig.getWorkerId())
+                .setRequestId(UUID.randomUUID().toString())
+                .build();
+        try {
+            exclusiveLeaderProducer.send(serviceRequest.toByteArray());
+        } catch (Exception e) {
+            log.error("Could not write into Function Metadata topic", e);
+            errorNotifier.triggerError(e);
+        }
+        if (needsScheduling) {
+            this.schedulerManager.schedule();
         }
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(existingFunctionMetadata, functionMetaData);
-
-        Request.ServiceRequest updateRequest = ServiceRequestUtils.getUpdateRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
-
-        return submit(updateRequest);
     }
 
-
-    /**
-     * Sends a deregister request to the FMT (Function Metadata Topic) for a function
-     * @param tenant the tenant the function that needs to be deregistered belongs to
-     * @param namespace the namespace the function that needs to be deregistered belongs to
-     * @param functionName the name of the function
-     * @return a completable future of when the deregister has been applied
-     */
-    public synchronized CompletableFuture<RequestResult> deregisterFunction(String tenant, String namespace, String functionName) {
-        FunctionMetaData functionMetaData = this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(functionMetaData, functionMetaData);
-
-        Request.ServiceRequest deregisterRequest = ServiceRequestUtils.getDeregisterRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
-
-        return submit(deregisterRequest);
+    // Note that this method cannot be syncrhonized because the tailer might still be processing messages
+    public void acquireLeadership() {
+        log.info("FunctionMetaDataManager becoming leader by creating exclusive producer");
+        FunctionMetaDataTopicTailer tailer = internalAcquireLeadership();
+        // Now that we have created the exclusive producer, wait for reader to get over
+        if (tailer != null) {
+            tailer.stopWhenNoMoreMessages();
+            tailer.close();
+        }
+        this.schedulerManager.schedule();
+        log.info("FunctionMetaDataManager done becoming leader by doing its first schedule");
     }
 
-    /**
-     * Sends a start/stop function request to the FMT (Function Metadata Topic) for a function
-     * @param tenant the tenant the function that needs to be deregistered belongs to
-     * @param namespace the namespace the function that needs to be deregistered belongs to
-     * @param functionName the name of the function
-     * @param instanceId the instanceId of the function, -1 if for all instances
-     * @param start do we need to start or stop
-     * @return a completable future of when the start/stop has been applied
-     */
-    public synchronized CompletableFuture<RequestResult> changeFunctionInstanceStatus(String tenant, String namespace, String functionName,
-                                                                                      Integer instanceId, boolean start) {
-        FunctionMetaData functionMetaData = this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.changeFunctionInstanceStatus(functionMetaData, instanceId, start);
-
-        Request.ServiceRequest updateRequest = ServiceRequestUtils.getUpdateRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
+    private synchronized FunctionMetaDataTopicTailer internalAcquireLeadership() {
+        if (exclusiveLeaderProducer == null) {
+            try {
+                exclusiveLeaderProducer = pulsarClient.newProducer()
+                        .topic(this.workerConfig.getFunctionMetadataTopic())
+                        .producerName(workerConfig.getWorkerId() + "-leader")
+                        // .type(EXCLUSIVE)
+                        .create();
+            } catch (PulsarClientException e) {
+                log.error("Error creating exclusive producer", e);
+                errorNotifier.triggerError(e);
+            }
+        } else {
+            log.error("Logic Error in FunctionMetaData Manager");
+            errorNotifier.triggerError(new IllegalStateException());
+        }
+        FunctionMetaDataTopicTailer tailer = this.functionMetaDataTopicTailer;
+        this.functionMetaDataTopicTailer = null;
+        return tailer;
+    }
 
-        return submit(updateRequest);
+    public synchronized void giveupLeadership() {
+        log.info("FunctionMetaDataManager giving up leadership by closing exclusive producer");
+        try {
+            exclusiveLeaderProducer.close();
+        } catch (PulsarClientException e) {
+            log.error("Error closing exclusive producer", e);
+            errorNotifier.triggerError(e);
+        }
+        exclusiveLeaderProducer = null;
+        initializeTailer();

Review comment:
       I is not a good idea to simply re-create the metadata tailer object.  This will cause the tailer to start reading from the beginning.  This will cause all the tailer to have to re-read the whole topic.  The problem here is not only that the the tailer has to re-read the whole topic which can take time but during that time the in-memory metadata map for the worker will be inconsistent.  If a user request is sent to the worker during this time, the behavior might be incorrect.  I would suggest keeping track of the message Id that represents to which message the current view of the metadata relates to.  When the worker is the leader and it updates its in memory metadata cache and  produces messages to metadata topic, we should update the message id.  When the worker loses leadership, the tailer should use the message id and start reading from there

##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -168,68 +151,78 @@ public synchronized boolean containsFunction(String tenant, String namespace, St
         return containsFunctionMetaData(tenant, namespace, functionName);
     }
 
-    /**
-     * Sends an update request to the FMT (Function Metadata Topic)
-     * @param functionMetaData The function metadata that needs to be updated
-     * @return a completable future of when the update has been applied
-     */
-    public synchronized CompletableFuture<RequestResult> updateFunction(FunctionMetaData functionMetaData) {
-
-        FunctionMetaData existingFunctionMetadata = null;
-        if (containsFunction(functionMetaData.getFunctionDetails().getTenant(),
-                functionMetaData.getFunctionDetails().getNamespace(),
-                functionMetaData.getFunctionDetails().getName())) {
-            existingFunctionMetadata = getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(),
-                    functionMetaData.getFunctionDetails().getNamespace(),
-                    functionMetaData.getFunctionDetails().getName());
+    public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaData, boolean delete)
+            throws IllegalStateException, IllegalArgumentException {
+        if (exclusiveLeaderProducer == null) {
+            throw new IllegalStateException("Not the leader");
+        }
+        boolean needsScheduling;
+        if (delete) {
+            needsScheduling = proccessDeregister(functionMetaData);
+        } else {
+            needsScheduling = processUpdate(functionMetaData);
+        }
+        Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
+                .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
+                .setFunctionMetaData(functionMetaData)
+                .setWorkerId(workerConfig.getWorkerId())
+                .setRequestId(UUID.randomUUID().toString())
+                .build();
+        try {
+            exclusiveLeaderProducer.send(serviceRequest.toByteArray());
+        } catch (Exception e) {
+            log.error("Could not write into Function Metadata topic", e);
+            errorNotifier.triggerError(e);
+        }
+        if (needsScheduling) {
+            this.schedulerManager.schedule();
         }
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(existingFunctionMetadata, functionMetaData);
-
-        Request.ServiceRequest updateRequest = ServiceRequestUtils.getUpdateRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
-
-        return submit(updateRequest);
     }
 
-
-    /**
-     * Sends a deregister request to the FMT (Function Metadata Topic) for a function
-     * @param tenant the tenant the function that needs to be deregistered belongs to
-     * @param namespace the namespace the function that needs to be deregistered belongs to
-     * @param functionName the name of the function
-     * @return a completable future of when the deregister has been applied
-     */
-    public synchronized CompletableFuture<RequestResult> deregisterFunction(String tenant, String namespace, String functionName) {
-        FunctionMetaData functionMetaData = this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(functionMetaData, functionMetaData);
-
-        Request.ServiceRequest deregisterRequest = ServiceRequestUtils.getDeregisterRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
-
-        return submit(deregisterRequest);
+    // Note that this method cannot be syncrhonized because the tailer might still be processing messages
+    public void acquireLeadership() {
+        log.info("FunctionMetaDataManager becoming leader by creating exclusive producer");
+        FunctionMetaDataTopicTailer tailer = internalAcquireLeadership();
+        // Now that we have created the exclusive producer, wait for reader to get over
+        if (tailer != null) {
+            tailer.stopWhenNoMoreMessages();
+            tailer.close();
+        }
+        this.schedulerManager.schedule();
+        log.info("FunctionMetaDataManager done becoming leader by doing its first schedule");
     }
 
-    /**
-     * Sends a start/stop function request to the FMT (Function Metadata Topic) for a function
-     * @param tenant the tenant the function that needs to be deregistered belongs to
-     * @param namespace the namespace the function that needs to be deregistered belongs to
-     * @param functionName the name of the function
-     * @param instanceId the instanceId of the function, -1 if for all instances
-     * @param start do we need to start or stop
-     * @return a completable future of when the start/stop has been applied
-     */
-    public synchronized CompletableFuture<RequestResult> changeFunctionInstanceStatus(String tenant, String namespace, String functionName,
-                                                                                      Integer instanceId, boolean start) {
-        FunctionMetaData functionMetaData = this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.changeFunctionInstanceStatus(functionMetaData, instanceId, start);
-
-        Request.ServiceRequest updateRequest = ServiceRequestUtils.getUpdateRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
+    private synchronized FunctionMetaDataTopicTailer internalAcquireLeadership() {
+        if (exclusiveLeaderProducer == null) {
+            try {
+                exclusiveLeaderProducer = pulsarClient.newProducer()
+                        .topic(this.workerConfig.getFunctionMetadataTopic())
+                        .producerName(workerConfig.getWorkerId() + "-leader")
+                        // .type(EXCLUSIVE)
+                        .create();
+            } catch (PulsarClientException e) {
+                log.error("Error creating exclusive producer", e);
+                errorNotifier.triggerError(e);
+            }
+        } else {
+            log.error("Logic Error in FunctionMetaData Manager");
+            errorNotifier.triggerError(new IllegalStateException());
+        }
+        FunctionMetaDataTopicTailer tailer = this.functionMetaDataTopicTailer;
+        this.functionMetaDataTopicTailer = null;
+        return tailer;
+    }
 
-        return submit(updateRequest);
+    public synchronized void giveupLeadership() {
+        log.info("FunctionMetaDataManager giving up leadership by closing exclusive producer");
+        try {
+            exclusiveLeaderProducer.close();
+        } catch (PulsarClientException e) {
+            log.error("Error closing exclusive producer", e);
+            errorNotifier.triggerError(e);
+        }
+        exclusiveLeaderProducer = null;
+        initializeTailer();

Review comment:
       It is not a good idea to simply re-create the metadata tailer object.  This will cause the tailer to start reading from the beginning.  This will cause all the tailer to have to re-read the whole topic.  The problem here is not only that the the tailer has to re-read the whole topic which can take time but during that time the in-memory metadata map for the worker will be inconsistent.  If a user request is sent to the worker during this time, the behavior might be incorrect.  I would suggest keeping track of the message Id that represents to which message the current view of the metadata relates to.  When the worker is the leader and it updates its in memory metadata cache and  produces messages to metadata topic, we should update the message id.  When the worker loses leadership, the tailer should use the message id and start reading from there




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r444324454



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -241,20 +245,21 @@ public synchronized boolean containsFunction(String tenant, String namespace, St
      * @param serviceRequest The request
      */
     public void processRequest(MessageId messageId, Request.ServiceRequest serviceRequest) {
-
-        // make sure that processing requests don't happen simultaneously
-        synchronized (this) {
+        try {
             switch (serviceRequest.getServiceRequestType()) {
                 case UPDATE:
-                    this.processUpdate(serviceRequest);
+                    this.processUpdate(serviceRequest.getFunctionMetaData());
                     break;
                 case DELETE:
-                    this.proccessDeregister(serviceRequest);
+                    this.proccessDeregister(serviceRequest.getFunctionMetaData());
                     break;
                 default:
                     log.warn("Received request with unrecognized type: {}", serviceRequest);
             }
+        } catch (IllegalArgumentException e) {
+            // Its ok. Nothing much we can do about it

Review comment:
       processRequest is called by the metadata tailer. Thus it cannot really do much about IllegalArgumentException.
   Users will get the IllegalArgumentException from the processUpdate and processDelete from the leader path. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r444553172



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
##########
@@ -392,24 +391,13 @@ public void deregisterFunction(final String tenant,
             throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
         }
 
-        CompletableFuture<RequestResult> completableFuture = functionMetaDataManager.deregisterFunction(tenant,
-                namespace, componentName);
-
-        RequestResult requestResult = null;
-        try {
-            requestResult = completableFuture.get();
-            if (!requestResult.isSuccess()) {
-                throw new RestException(Status.BAD_REQUEST, requestResult.getMessage());
-            }
-        } catch (ExecutionException e) {
-            log.error("Execution Exception while deregistering {} @ /{}/{}/{}",
-                    ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
-            throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getCause().getMessage());
-        } catch (InterruptedException e) {
-            log.error("Interrupted Exception while deregistering {} @ /{}/{}/{}",
-                    ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
-            throw new RestException(Status.REQUEST_TIMEOUT, e.getMessage());
-        }
+        FunctionMetaData newVersionedMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(functionMetaData, functionMetaData);

Review comment:
       Changed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r444479973



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -174,89 +181,124 @@ public synchronized boolean containsFunction(String tenant, String namespace, St
     }
 
     /**
-     * Sends an update request to the FMT (Function Metadata Topic)
-     * @param functionMetaData The function metadata that needs to be updated
-     * @return a completable future of when the update has been applied
+     * Called by the worker when we are in the leader mode.  In this state, we update our in-memory
+     * data structures and then write to the metadata topic.
+     * @param functionMetaData The function metadata in question
+     * @param delete Is this a delete operation
+     * @throws IllegalStateException if we are not the leader
+     * @throws IllegalArgumentException if the request is out of date.
      */
-    public synchronized CompletableFuture<RequestResult> updateFunction(FunctionMetaData functionMetaData) {
-
-        FunctionMetaData existingFunctionMetadata = null;
-        if (containsFunction(functionMetaData.getFunctionDetails().getTenant(),
-                functionMetaData.getFunctionDetails().getNamespace(),
-                functionMetaData.getFunctionDetails().getName())) {
-            existingFunctionMetadata = getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(),
-                    functionMetaData.getFunctionDetails().getNamespace(),
-                    functionMetaData.getFunctionDetails().getName());
+    public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaData, boolean delete)
+            throws IllegalStateException, IllegalArgumentException {
+        if (exclusiveLeaderProducer == null) {
+            throw new IllegalStateException("Not the leader");
+        }
+        boolean needsScheduling;
+        if (delete) {
+            needsScheduling = proccessDeregister(functionMetaData);
+        } else {
+            needsScheduling = processUpdate(functionMetaData);
+        }
+        Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
+                .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
+                .setFunctionMetaData(functionMetaData)
+                .setWorkerId(workerConfig.getWorkerId())
+                .setRequestId(UUID.randomUUID().toString())
+                .build();
+        try {
+            lastMessageSeen = exclusiveLeaderProducer.send(serviceRequest.toByteArray());
+        } catch (Exception e) {

Review comment:
       Shouldn't we return a 500 error to the end user?  If we call just "errorNotifier.triggerError(e)", the worker die and the end user will likely not get a response or a timeout error




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r445267381



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -174,89 +181,124 @@ public synchronized boolean containsFunction(String tenant, String namespace, St
     }
 
     /**
-     * Sends an update request to the FMT (Function Metadata Topic)
-     * @param functionMetaData The function metadata that needs to be updated
-     * @return a completable future of when the update has been applied
+     * Called by the worker when we are in the leader mode.  In this state, we update our in-memory
+     * data structures and then write to the metadata topic.
+     * @param functionMetaData The function metadata in question
+     * @param delete Is this a delete operation
+     * @throws IllegalStateException if we are not the leader
+     * @throws IllegalArgumentException if the request is out of date.
      */
-    public synchronized CompletableFuture<RequestResult> updateFunction(FunctionMetaData functionMetaData) {
-
-        FunctionMetaData existingFunctionMetadata = null;
-        if (containsFunction(functionMetaData.getFunctionDetails().getTenant(),
-                functionMetaData.getFunctionDetails().getNamespace(),
-                functionMetaData.getFunctionDetails().getName())) {
-            existingFunctionMetadata = getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(),
-                    functionMetaData.getFunctionDetails().getNamespace(),
-                    functionMetaData.getFunctionDetails().getName());
+    public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaData, boolean delete)
+            throws IllegalStateException, IllegalArgumentException {
+        if (exclusiveLeaderProducer == null) {
+            throw new IllegalStateException("Not the leader");
+        }
+        boolean needsScheduling;
+        if (delete) {
+            needsScheduling = proccessDeregister(functionMetaData);
+        } else {
+            needsScheduling = processUpdate(functionMetaData);
+        }
+        Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
+                .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
+                .setFunctionMetaData(functionMetaData)
+                .setWorkerId(workerConfig.getWorkerId())
+                .setRequestId(UUID.randomUUID().toString())
+                .build();
+        try {
+            lastMessageSeen = exclusiveLeaderProducer.send(serviceRequest.toByteArray());
+        } catch (Exception e) {
+            log.error("Could not write into Function Metadata topic", e);
+            errorNotifier.triggerError(e);
+        }
+        if (needsScheduling) {
+            this.schedulerManager.schedule();
         }
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(existingFunctionMetadata, functionMetaData);
-
-        Request.ServiceRequest updateRequest = ServiceRequestUtils.getUpdateRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
-
-        return submit(updateRequest);
     }
 
-
     /**
-     * Sends a deregister request to the FMT (Function Metadata Topic) for a function
-     * @param tenant the tenant the function that needs to be deregistered belongs to
-     * @param namespace the namespace the function that needs to be deregistered belongs to
-     * @param functionName the name of the function
-     * @return a completable future of when the deregister has been applied
+     * Called by the leader service when this worker becomes the leader.
+     * We first get exclusive producer on the metadata topic. Next we drain the tailer
+     * to ensure that we have caught up to metadata topic. After which we close the tailer.
+     * Note that this method cannot be syncrhonized because the tailer might still be processing messages
      */
-    public synchronized CompletableFuture<RequestResult> deregisterFunction(String tenant, String namespace, String functionName) {
-        FunctionMetaData functionMetaData = this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(functionMetaData, functionMetaData);
-
-        Request.ServiceRequest deregisterRequest = ServiceRequestUtils.getDeregisterRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
+    public void acquireLeadership() {
+        log.info("FunctionMetaDataManager becoming leader by creating exclusive producer");
+        FunctionMetaDataTopicTailer tailer = internalAcquireLeadership();
+        // Now that we have created the exclusive producer, wait for reader to get over
+        if (tailer != null) {
+            try {
+                tailer.stopWhenNoMoreMessages().get();
+            } catch (Exception e) {
+                log.error("Error while waiting for metadata tailer thread to finish", e);
+                errorNotifier.triggerError(e);
+            }
+            tailer.close();
+        }
+        this.schedulerManager.schedule();
+        log.info("FunctionMetaDataManager done becoming leader by doing its first schedule");
+    }
 
-        return submit(deregisterRequest);
+    private synchronized FunctionMetaDataTopicTailer internalAcquireLeadership() {

Review comment:
       The becomeActive is synchronized on the LeaderService Object. This method is sync on the MetaDataManager object. Having sync here will prevent other parts like metadatatailer from interfering with internal structures.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r443780713



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -90,25 +79,25 @@ public FunctionMetaDataManager(WorkerConfig workerConfig,
      */
     public void initialize() {
         try {
+            initializeTailer();
             this.functionMetaDataTopicTailer = new FunctionMetaDataTopicTailer(this,
                     pulsarClient.newReader(), this.workerConfig, this.errorNotifier);
             // read all existing messages
-            this.setInitializePhase(true);
             while (this.functionMetaDataTopicTailer.getReader().hasMessageAvailable()) {
                 this.functionMetaDataTopicTailer.processRequest(this.functionMetaDataTopicTailer.getReader().readNext());
             }
-            this.setInitializePhase(false);
-            
-
         } catch (Exception e) {
             log.error("Failed to initialize meta data store", e);
-            throw new RuntimeException(e);
+            errorNotifier.triggerError(e);
         }
     }
-    
+
+    private void initializeTailer() throws PulsarClientException {

Review comment:
       As it stands now, currently PulsarMetaDataManager encapsulates all of this logic within itself. I can split this up in a following changes




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r442373638



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
##########
@@ -171,6 +171,9 @@ public void start(URI dlogUri,
             this.functionMetaDataManager = new FunctionMetaDataManager(
                     this.workerConfig, this.schedulerManager, this.client, errorNotifier);
 
+            // initialize function metadata manager
+            this.functionMetaDataManager.initialize();

Review comment:
       Upon start, we always first want to get up-to-date before becoming leader and starting to serve requests




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r444016803



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -171,68 +160,83 @@ public synchronized boolean containsFunction(String tenant, String namespace, St
         return containsFunctionMetaData(tenant, namespace, functionName);
     }
 
-    /**
-     * Sends an update request to the FMT (Function Metadata Topic)
-     * @param functionMetaData The function metadata that needs to be updated
-     * @return a completable future of when the update has been applied
-     */
-    public synchronized CompletableFuture<RequestResult> updateFunction(FunctionMetaData functionMetaData) {
-
-        FunctionMetaData existingFunctionMetadata = null;
-        if (containsFunction(functionMetaData.getFunctionDetails().getTenant(),
-                functionMetaData.getFunctionDetails().getNamespace(),
-                functionMetaData.getFunctionDetails().getName())) {
-            existingFunctionMetadata = getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(),
-                    functionMetaData.getFunctionDetails().getNamespace(),
-                    functionMetaData.getFunctionDetails().getName());
+    public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaData, boolean delete)

Review comment:
       Can you add some comments to this method as well of the FunctionMetadataManager to explain the workflow of processing a metadata request?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r445260694



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -174,89 +188,127 @@ public synchronized boolean containsFunction(String tenant, String namespace, St
     }
 
     /**
-     * Sends an update request to the FMT (Function Metadata Topic)
-     * @param functionMetaData The function metadata that needs to be updated
-     * @return a completable future of when the update has been applied
+     * Called by the worker when we are in the leader mode.  In this state, we update our in-memory
+     * data structures and then write to the metadata topic.
+     * @param functionMetaData The function metadata in question
+     * @param delete Is this a delete operation
+     * @throws IllegalStateException if we are not the leader
+     * @throws IllegalArgumentException if the request is out of date.
      */
-    public synchronized CompletableFuture<RequestResult> updateFunction(FunctionMetaData functionMetaData) {
-
-        FunctionMetaData existingFunctionMetadata = null;
-        if (containsFunction(functionMetaData.getFunctionDetails().getTenant(),
-                functionMetaData.getFunctionDetails().getNamespace(),
-                functionMetaData.getFunctionDetails().getName())) {
-            existingFunctionMetadata = getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(),
-                    functionMetaData.getFunctionDetails().getNamespace(),
-                    functionMetaData.getFunctionDetails().getName());
+    public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaData, boolean delete)
+            throws IllegalStateException, IllegalArgumentException {
+        if (exclusiveLeaderProducer == null) {
+            throw new IllegalStateException("Not the leader");
+        }
+        boolean needsScheduling;
+        if (delete) {
+            needsScheduling = proccessDeregister(functionMetaData);
+        } else {
+            needsScheduling = processUpdate(functionMetaData);
+        }
+        Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
+                .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
+                .setFunctionMetaData(functionMetaData)
+                .setWorkerId(workerConfig.getWorkerId())
+                .setRequestId(UUID.randomUUID().toString())
+                .build();
+        try {
+            lastMessageSeen = exclusiveLeaderProducer.send(serviceRequest.toByteArray());
+        } catch (Exception e) {
+            log.error("Could not write into Function Metadata topic", e);
+            errorNotifier.triggerError(e);
+            throw new IllegalStateException("Internal Error updating function at the leader", e);
+        }
+        if (needsScheduling) {
+            this.schedulerManager.schedule();
         }
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(existingFunctionMetadata, functionMetaData);
-
-        Request.ServiceRequest updateRequest = ServiceRequestUtils.getUpdateRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
-
-        return submit(updateRequest);
     }
 
-
     /**
-     * Sends a deregister request to the FMT (Function Metadata Topic) for a function
-     * @param tenant the tenant the function that needs to be deregistered belongs to
-     * @param namespace the namespace the function that needs to be deregistered belongs to
-     * @param functionName the name of the function
-     * @return a completable future of when the deregister has been applied
+     * Called by the leader service when this worker becomes the leader.
+     * We first get exclusive producer on the metadata topic. Next we drain the tailer
+     * to ensure that we have caught up to metadata topic. After which we close the tailer.
+     * Note that this method cannot be syncrhonized because the tailer might still be processing messages
      */
-    public synchronized CompletableFuture<RequestResult> deregisterFunction(String tenant, String namespace, String functionName) {
-        FunctionMetaData functionMetaData = this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(functionMetaData, functionMetaData);
-
-        Request.ServiceRequest deregisterRequest = ServiceRequestUtils.getDeregisterRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
+    public void acquireLeadership() {
+        log.info("FunctionMetaDataManager becoming leader by creating exclusive producer");
+        FunctionMetaDataTopicTailer tailer = internalAcquireLeadership();
+        // Now that we have created the exclusive producer, wait for reader to get over
+        if (tailer != null) {
+            try {
+                tailer.stopWhenNoMoreMessages().get();
+            } catch (Exception e) {
+                log.error("Error while waiting for metadata tailer thread to finish", e);
+                errorNotifier.triggerError(e);
+            }
+            tailer.close();
+        }
+        log.info("FunctionMetaDataManager done becoming leader");
+    }
 
-        return submit(deregisterRequest);
+    private synchronized FunctionMetaDataTopicTailer internalAcquireLeadership() {
+        if (exclusiveLeaderProducer == null) {
+            try {
+                exclusiveLeaderProducer = pulsarClient.newProducer()

Review comment:
       nvm




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r444497185



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
##########
@@ -392,24 +392,13 @@ public void deregisterFunction(final String tenant,
             throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
         }
 
-        CompletableFuture<RequestResult> completableFuture = functionMetaDataManager.deregisterFunction(tenant,
-                namespace, componentName);
-
-        RequestResult requestResult = null;
-        try {
-            requestResult = completableFuture.get();
-            if (!requestResult.isSuccess()) {
-                throw new RestException(Status.BAD_REQUEST, requestResult.getMessage());
-            }
-        } catch (ExecutionException e) {
-            log.error("Execution Exception while deregistering {} @ /{}/{}/{}",
-                    ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
-            throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getCause().getMessage());
-        } catch (InterruptedException e) {
-            log.error("Interrupted Exception while deregistering {} @ /{}/{}/{}",
-                    ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
-            throw new RestException(Status.REQUEST_TIMEOUT, e.getMessage());
-        }
+        FunctionMetaData newVersionedMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(functionMetaData, functionMetaData);
+        processFunctionUpdate(newVersionedMetaData.getFunctionDetails().getTenant(),

Review comment:
       changed to internalProcessFunctionRequest




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r444443636



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -52,20 +61,15 @@
     @VisibleForTesting
     final Map<String, Map<String, Map<String, FunctionMetaData>>> functionMetaDataMap = new ConcurrentHashMap<>();
 
-    // A map in which the key is the service request id and value is the service request
-    private final Map<String, ServiceRequestInfo> pendingServiceRequests = new ConcurrentHashMap<>();
-
     private final ServiceRequestManager serviceRequestManager;
     private final SchedulerManager schedulerManager;
     private final WorkerConfig workerConfig;
     private final PulsarClient pulsarClient;
     private final ErrorNotifier errorNotifier;
 
     private FunctionMetaDataTopicTailer functionMetaDataTopicTailer;
-
-    @Setter
-    @Getter
-    boolean isInitializePhase = false;

Review comment:
       I see in "processRequest()" no scheduling is called and only in "updateFunctionOnLeader" is schedule() called




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r444443636



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -52,20 +61,15 @@
     @VisibleForTesting
     final Map<String, Map<String, Map<String, FunctionMetaData>>> functionMetaDataMap = new ConcurrentHashMap<>();
 
-    // A map in which the key is the service request id and value is the service request
-    private final Map<String, ServiceRequestInfo> pendingServiceRequests = new ConcurrentHashMap<>();
-
     private final ServiceRequestManager serviceRequestManager;
     private final SchedulerManager schedulerManager;
     private final WorkerConfig workerConfig;
     private final PulsarClient pulsarClient;
     private final ErrorNotifier errorNotifier;
 
     private FunctionMetaDataTopicTailer functionMetaDataTopicTailer;
-
-    @Setter
-    @Getter
-    boolean isInitializePhase = false;

Review comment:
       I see in "processRequest()" no scheduling is called




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r444549960



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -174,89 +181,124 @@ public synchronized boolean containsFunction(String tenant, String namespace, St
     }
 
     /**
-     * Sends an update request to the FMT (Function Metadata Topic)
-     * @param functionMetaData The function metadata that needs to be updated
-     * @return a completable future of when the update has been applied
+     * Called by the worker when we are in the leader mode.  In this state, we update our in-memory
+     * data structures and then write to the metadata topic.
+     * @param functionMetaData The function metadata in question
+     * @param delete Is this a delete operation
+     * @throws IllegalStateException if we are not the leader
+     * @throws IllegalArgumentException if the request is out of date.
      */
-    public synchronized CompletableFuture<RequestResult> updateFunction(FunctionMetaData functionMetaData) {
-
-        FunctionMetaData existingFunctionMetadata = null;
-        if (containsFunction(functionMetaData.getFunctionDetails().getTenant(),
-                functionMetaData.getFunctionDetails().getNamespace(),
-                functionMetaData.getFunctionDetails().getName())) {
-            existingFunctionMetadata = getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(),
-                    functionMetaData.getFunctionDetails().getNamespace(),
-                    functionMetaData.getFunctionDetails().getName());
+    public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaData, boolean delete)
+            throws IllegalStateException, IllegalArgumentException {
+        if (exclusiveLeaderProducer == null) {
+            throw new IllegalStateException("Not the leader");
+        }
+        boolean needsScheduling;
+        if (delete) {
+            needsScheduling = proccessDeregister(functionMetaData);
+        } else {
+            needsScheduling = processUpdate(functionMetaData);
+        }
+        Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
+                .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
+                .setFunctionMetaData(functionMetaData)
+                .setWorkerId(workerConfig.getWorkerId())
+                .setRequestId(UUID.randomUUID().toString())
+                .build();
+        try {
+            lastMessageSeen = exclusiveLeaderProducer.send(serviceRequest.toByteArray());
+        } catch (Exception e) {
+            log.error("Could not write into Function Metadata topic", e);
+            errorNotifier.triggerError(e);
+        }
+        if (needsScheduling) {
+            this.schedulerManager.schedule();
         }
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(existingFunctionMetadata, functionMetaData);
-
-        Request.ServiceRequest updateRequest = ServiceRequestUtils.getUpdateRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
-
-        return submit(updateRequest);
     }
 
-
     /**
-     * Sends a deregister request to the FMT (Function Metadata Topic) for a function
-     * @param tenant the tenant the function that needs to be deregistered belongs to
-     * @param namespace the namespace the function that needs to be deregistered belongs to
-     * @param functionName the name of the function
-     * @return a completable future of when the deregister has been applied
+     * Called by the leader service when this worker becomes the leader.
+     * We first get exclusive producer on the metadata topic. Next we drain the tailer
+     * to ensure that we have caught up to metadata topic. After which we close the tailer.
+     * Note that this method cannot be syncrhonized because the tailer might still be processing messages
      */
-    public synchronized CompletableFuture<RequestResult> deregisterFunction(String tenant, String namespace, String functionName) {
-        FunctionMetaData functionMetaData = this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(functionMetaData, functionMetaData);
-
-        Request.ServiceRequest deregisterRequest = ServiceRequestUtils.getDeregisterRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
+    public void acquireLeadership() {
+        log.info("FunctionMetaDataManager becoming leader by creating exclusive producer");
+        FunctionMetaDataTopicTailer tailer = internalAcquireLeadership();
+        // Now that we have created the exclusive producer, wait for reader to get over
+        if (tailer != null) {
+            try {
+                tailer.stopWhenNoMoreMessages().get();
+            } catch (Exception e) {
+                log.error("Error while waiting for metadata tailer thread to finish", e);
+                errorNotifier.triggerError(e);
+            }
+            tailer.close();
+        }
+        this.schedulerManager.schedule();
+        log.info("FunctionMetaDataManager done becoming leader by doing its first schedule");
+    }
 
-        return submit(deregisterRequest);
+    private synchronized FunctionMetaDataTopicTailer internalAcquireLeadership() {
+        if (exclusiveLeaderProducer == null) {
+            try {
+                exclusiveLeaderProducer = pulsarClient.newProducer()
+                        .topic(this.workerConfig.getFunctionMetadataTopic())
+                        .producerName(workerConfig.getWorkerId() + "-leader")
+                        // .type(EXCLUSIVE)
+                        .create();
+            } catch (PulsarClientException e) {
+                log.error("Error creating exclusive producer", e);
+                errorNotifier.triggerError(e);
+            }
+        } else {
+            log.error("Logic Error in FunctionMetaData Manager");
+            errorNotifier.triggerError(new IllegalStateException());
+        }
+        FunctionMetaDataTopicTailer tailer = this.functionMetaDataTopicTailer;
+        this.functionMetaDataTopicTailer = null;
+        return tailer;
     }
 
     /**
-     * Sends a start/stop function request to the FMT (Function Metadata Topic) for a function
-     * @param tenant the tenant the function that needs to be deregistered belongs to
-     * @param namespace the namespace the function that needs to be deregistered belongs to
-     * @param functionName the name of the function
-     * @param instanceId the instanceId of the function, -1 if for all instances
-     * @param start do we need to start or stop
-     * @return a completable future of when the start/stop has been applied
+     * called by the leader service when we lose leadership. We close the exclusive producer
+     * and start the tailer.
      */
-    public synchronized CompletableFuture<RequestResult> changeFunctionInstanceStatus(String tenant, String namespace, String functionName,
-                                                                                      Integer instanceId, boolean start) {
-        FunctionMetaData functionMetaData = this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.changeFunctionInstanceStatus(functionMetaData, instanceId, start);
-
-        Request.ServiceRequest updateRequest = ServiceRequestUtils.getUpdateRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
-
-        return submit(updateRequest);
+    public synchronized void giveupLeadership() {
+        log.info("FunctionMetaDataManager giving up leadership by closing exclusive producer");
+        try {
+            exclusiveLeaderProducer.close();
+            exclusiveLeaderProducer = null;
+            initializeTailer();

Review comment:
       I don't quite follow.  In "internalAcquireLeadership" the tailer is closed.  When the worker loses leadership, where is the tailer getting started up again?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r444435836



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -52,20 +61,15 @@
     @VisibleForTesting
     final Map<String, Map<String, Map<String, FunctionMetaData>>> functionMetaDataMap = new ConcurrentHashMap<>();
 
-    // A map in which the key is the service request id and value is the service request
-    private final Map<String, ServiceRequestInfo> pendingServiceRequests = new ConcurrentHashMap<>();
-
     private final ServiceRequestManager serviceRequestManager;
     private final SchedulerManager schedulerManager;
     private final WorkerConfig workerConfig;
     private final PulsarClient pulsarClient;
     private final ErrorNotifier errorNotifier;
 
     private FunctionMetaDataTopicTailer functionMetaDataTopicTailer;
-
-    @Setter
-    @Getter
-    boolean isInitializePhase = false;

Review comment:
       Why are we getting rid of the "isInitializePhase" flag?  The flag is in place so we don't call schedule() for every message during the initialization phase and we wait to just call schedule at the end of the init phase




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r444496523



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -174,89 +181,124 @@ public synchronized boolean containsFunction(String tenant, String namespace, St
     }
 
     /**
-     * Sends an update request to the FMT (Function Metadata Topic)
-     * @param functionMetaData The function metadata that needs to be updated
-     * @return a completable future of when the update has been applied
+     * Called by the worker when we are in the leader mode.  In this state, we update our in-memory
+     * data structures and then write to the metadata topic.
+     * @param functionMetaData The function metadata in question
+     * @param delete Is this a delete operation
+     * @throws IllegalStateException if we are not the leader
+     * @throws IllegalArgumentException if the request is out of date.
      */
-    public synchronized CompletableFuture<RequestResult> updateFunction(FunctionMetaData functionMetaData) {
-
-        FunctionMetaData existingFunctionMetadata = null;
-        if (containsFunction(functionMetaData.getFunctionDetails().getTenant(),
-                functionMetaData.getFunctionDetails().getNamespace(),
-                functionMetaData.getFunctionDetails().getName())) {
-            existingFunctionMetadata = getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(),
-                    functionMetaData.getFunctionDetails().getNamespace(),
-                    functionMetaData.getFunctionDetails().getName());
+    public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaData, boolean delete)
+            throws IllegalStateException, IllegalArgumentException {
+        if (exclusiveLeaderProducer == null) {
+            throw new IllegalStateException("Not the leader");
+        }
+        boolean needsScheduling;
+        if (delete) {
+            needsScheduling = proccessDeregister(functionMetaData);
+        } else {
+            needsScheduling = processUpdate(functionMetaData);
+        }
+        Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
+                .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
+                .setFunctionMetaData(functionMetaData)
+                .setWorkerId(workerConfig.getWorkerId())
+                .setRequestId(UUID.randomUUID().toString())
+                .build();
+        try {
+            lastMessageSeen = exclusiveLeaderProducer.send(serviceRequest.toByteArray());
+        } catch (Exception e) {

Review comment:
       good point. Changed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] wolfstudy removed a comment on pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
wolfstudy removed a comment on pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#issuecomment-717778890


   Move this change to 2.6.2, because the #8143 depends on it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r440380986



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
##########
@@ -63,12 +65,22 @@ public void start() {
 
     @Override
     public void run() {
-        while(running) {
+        while (running) {
+            if (stopOnNoMessageAvailable) {
+                try {
+                    if (!reader.hasMessageAvailable()) {
+                        break;
+                    }
+                } catch (PulsarClientException e) {
+                    log.error("Received exception while testing hasMessageAvailable", e);
+                    errorNotifier.triggerError(e);
+                }
+            }
             try {
                 Message<byte[]> msg = reader.readNext();
                 processRequest(msg);
             } catch (Throwable th) {
-                if (running) {
+                if (running && !stopOnNoMessageAvailable) {

Review comment:
       I don't think we should check "! stopOnNoMessageAvailable". If errors occur while stopOnNoMessageAvailable=true, we still need to correctly handle the exception and not ignore it because that means we didn't process all messages successfully




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r444483683



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
##########
@@ -392,24 +392,13 @@ public void deregisterFunction(final String tenant,
             throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
         }
 
-        CompletableFuture<RequestResult> completableFuture = functionMetaDataManager.deregisterFunction(tenant,
-                namespace, componentName);
-
-        RequestResult requestResult = null;
-        try {
-            requestResult = completableFuture.get();
-            if (!requestResult.isSuccess()) {
-                throw new RestException(Status.BAD_REQUEST, requestResult.getMessage());
-            }
-        } catch (ExecutionException e) {
-            log.error("Execution Exception while deregistering {} @ /{}/{}/{}",
-                    ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
-            throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getCause().getMessage());
-        } catch (InterruptedException e) {
-            log.error("Interrupted Exception while deregistering {} @ /{}/{}/{}",
-                    ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
-            throw new RestException(Status.REQUEST_TIMEOUT, e.getMessage());
-        }
+        FunctionMetaData newVersionedMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(functionMetaData, functionMetaData);
+        processFunctionUpdate(newVersionedMetaData.getFunctionDetails().getTenant(),

Review comment:
       It is kind of confusing to be calling "processFunctionUpdate" in de-register routine since we also have a update function routine. Maybe rename the method to something else?  "processFunctionRequest"?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r442350721



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
##########
@@ -77,12 +89,25 @@ public void run() {
                     if (!(th instanceof InterruptedException || th.getCause() instanceof InterruptedException)) {
                         log.warn("Encountered error when metadata tailer is not running", th);
                     }
-                    return;
                 }
             }
         }
     }
 
+    public void stopWhenNoMoreMessages() {
+        stopOnNoMessageAvailable = true;
+        readerThread.interrupt();

Review comment:
       changed

##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
##########
@@ -77,12 +89,25 @@ public void run() {
                     if (!(th instanceof InterruptedException || th.getCause() instanceof InterruptedException)) {
                         log.warn("Encountered error when metadata tailer is not running", th);
                     }
-                    return;
                 }
             }
         }
     }
 
+    public void stopWhenNoMoreMessages() {
+        stopOnNoMessageAvailable = true;
+        readerThread.interrupt();
+        // We need to wait here till the thread exits to make sure that the reader is up to date
+        while (true) {
+            try {
+                readerThread.join();
+                return;
+            } catch (InterruptedException e) {

Review comment:
       removed

##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
##########
@@ -63,12 +65,22 @@ public void start() {
 
     @Override
     public void run() {
-        while(running) {
+        while (running) {
+            if (stopOnNoMessageAvailable) {
+                try {
+                    if (!reader.hasMessageAvailable()) {
+                        break;
+                    }
+                } catch (PulsarClientException e) {
+                    log.error("Received exception while testing hasMessageAvailable", e);
+                    errorNotifier.triggerError(e);
+                }
+            }
             try {
                 Message<byte[]> msg = reader.readNext();
                 processRequest(msg);
             } catch (Throwable th) {
-                if (running) {
+                if (running && !stopOnNoMessageAvailable) {

Review comment:
       changed

##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -168,68 +151,78 @@ public synchronized boolean containsFunction(String tenant, String namespace, St
         return containsFunctionMetaData(tenant, namespace, functionName);
     }
 
-    /**
-     * Sends an update request to the FMT (Function Metadata Topic)
-     * @param functionMetaData The function metadata that needs to be updated
-     * @return a completable future of when the update has been applied
-     */
-    public synchronized CompletableFuture<RequestResult> updateFunction(FunctionMetaData functionMetaData) {
-
-        FunctionMetaData existingFunctionMetadata = null;
-        if (containsFunction(functionMetaData.getFunctionDetails().getTenant(),
-                functionMetaData.getFunctionDetails().getNamespace(),
-                functionMetaData.getFunctionDetails().getName())) {
-            existingFunctionMetadata = getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(),
-                    functionMetaData.getFunctionDetails().getNamespace(),
-                    functionMetaData.getFunctionDetails().getName());
+    public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaData, boolean delete)
+            throws IllegalStateException, IllegalArgumentException {
+        if (exclusiveLeaderProducer == null) {
+            throw new IllegalStateException("Not the leader");
+        }
+        boolean needsScheduling;
+        if (delete) {
+            needsScheduling = proccessDeregister(functionMetaData);
+        } else {
+            needsScheduling = processUpdate(functionMetaData);
+        }
+        Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
+                .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
+                .setFunctionMetaData(functionMetaData)
+                .setWorkerId(workerConfig.getWorkerId())
+                .setRequestId(UUID.randomUUID().toString())
+                .build();
+        try {
+            exclusiveLeaderProducer.send(serviceRequest.toByteArray());
+        } catch (Exception e) {
+            log.error("Could not write into Function Metadata topic", e);
+            errorNotifier.triggerError(e);
+        }
+        if (needsScheduling) {
+            this.schedulerManager.schedule();
         }
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(existingFunctionMetadata, functionMetaData);
-
-        Request.ServiceRequest updateRequest = ServiceRequestUtils.getUpdateRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
-
-        return submit(updateRequest);
     }
 
-
-    /**
-     * Sends a deregister request to the FMT (Function Metadata Topic) for a function
-     * @param tenant the tenant the function that needs to be deregistered belongs to
-     * @param namespace the namespace the function that needs to be deregistered belongs to
-     * @param functionName the name of the function
-     * @return a completable future of when the deregister has been applied
-     */
-    public synchronized CompletableFuture<RequestResult> deregisterFunction(String tenant, String namespace, String functionName) {
-        FunctionMetaData functionMetaData = this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(functionMetaData, functionMetaData);
-
-        Request.ServiceRequest deregisterRequest = ServiceRequestUtils.getDeregisterRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
-
-        return submit(deregisterRequest);
+    // Note that this method cannot be syncrhonized because the tailer might still be processing messages
+    public void acquireLeadership() {
+        log.info("FunctionMetaDataManager becoming leader by creating exclusive producer");
+        FunctionMetaDataTopicTailer tailer = internalAcquireLeadership();
+        // Now that we have created the exclusive producer, wait for reader to get over
+        if (tailer != null) {
+            tailer.stopWhenNoMoreMessages();
+            tailer.close();
+        }
+        this.schedulerManager.schedule();
+        log.info("FunctionMetaDataManager done becoming leader by doing its first schedule");
     }
 
-    /**
-     * Sends a start/stop function request to the FMT (Function Metadata Topic) for a function
-     * @param tenant the tenant the function that needs to be deregistered belongs to
-     * @param namespace the namespace the function that needs to be deregistered belongs to
-     * @param functionName the name of the function
-     * @param instanceId the instanceId of the function, -1 if for all instances
-     * @param start do we need to start or stop
-     * @return a completable future of when the start/stop has been applied
-     */
-    public synchronized CompletableFuture<RequestResult> changeFunctionInstanceStatus(String tenant, String namespace, String functionName,
-                                                                                      Integer instanceId, boolean start) {
-        FunctionMetaData functionMetaData = this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.changeFunctionInstanceStatus(functionMetaData, instanceId, start);
-
-        Request.ServiceRequest updateRequest = ServiceRequestUtils.getUpdateRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
+    private synchronized FunctionMetaDataTopicTailer internalAcquireLeadership() {
+        if (exclusiveLeaderProducer == null) {
+            try {
+                exclusiveLeaderProducer = pulsarClient.newProducer()
+                        .topic(this.workerConfig.getFunctionMetadataTopic())
+                        .producerName(workerConfig.getWorkerId() + "-leader")
+                        // .type(EXCLUSIVE)
+                        .create();
+            } catch (PulsarClientException e) {
+                log.error("Error creating exclusive producer", e);
+                errorNotifier.triggerError(e);
+            }
+        } else {
+            log.error("Logic Error in FunctionMetaData Manager");
+            errorNotifier.triggerError(new IllegalStateException());
+        }
+        FunctionMetaDataTopicTailer tailer = this.functionMetaDataTopicTailer;
+        this.functionMetaDataTopicTailer = null;
+        return tailer;
+    }
 
-        return submit(updateRequest);
+    public synchronized void giveupLeadership() {
+        log.info("FunctionMetaDataManager giving up leadership by closing exclusive producer");
+        try {
+            exclusiveLeaderProducer.close();
+        } catch (PulsarClientException e) {
+            log.error("Error closing exclusive producer", e);
+            errorNotifier.triggerError(e);
+        }
+        exclusiveLeaderProducer = null;
+        initializeTailer();

Review comment:
       changed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r444025141



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -168,68 +151,78 @@ public synchronized boolean containsFunction(String tenant, String namespace, St
         return containsFunctionMetaData(tenant, namespace, functionName);
     }
 
-    /**
-     * Sends an update request to the FMT (Function Metadata Topic)
-     * @param functionMetaData The function metadata that needs to be updated
-     * @return a completable future of when the update has been applied
-     */
-    public synchronized CompletableFuture<RequestResult> updateFunction(FunctionMetaData functionMetaData) {
-
-        FunctionMetaData existingFunctionMetadata = null;
-        if (containsFunction(functionMetaData.getFunctionDetails().getTenant(),
-                functionMetaData.getFunctionDetails().getNamespace(),
-                functionMetaData.getFunctionDetails().getName())) {
-            existingFunctionMetadata = getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(),
-                    functionMetaData.getFunctionDetails().getNamespace(),
-                    functionMetaData.getFunctionDetails().getName());
+    public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaData, boolean delete)

Review comment:
       How is that the case?  The method "submit" is deleted. We were using the "submit()" method to put a completable future in the "pendingServiceRequests" map and then completing the future when we were merging in the request to the local metadata cache. If there was a conflict the future is completed exceptionally with an error message.  It seems that whole workflow is removed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r444493849



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -174,89 +181,124 @@ public synchronized boolean containsFunction(String tenant, String namespace, St
     }
 
     /**
-     * Sends an update request to the FMT (Function Metadata Topic)
-     * @param functionMetaData The function metadata that needs to be updated
-     * @return a completable future of when the update has been applied
+     * Called by the worker when we are in the leader mode.  In this state, we update our in-memory
+     * data structures and then write to the metadata topic.
+     * @param functionMetaData The function metadata in question
+     * @param delete Is this a delete operation
+     * @throws IllegalStateException if we are not the leader
+     * @throws IllegalArgumentException if the request is out of date.
      */
-    public synchronized CompletableFuture<RequestResult> updateFunction(FunctionMetaData functionMetaData) {
-
-        FunctionMetaData existingFunctionMetadata = null;
-        if (containsFunction(functionMetaData.getFunctionDetails().getTenant(),
-                functionMetaData.getFunctionDetails().getNamespace(),
-                functionMetaData.getFunctionDetails().getName())) {
-            existingFunctionMetadata = getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(),
-                    functionMetaData.getFunctionDetails().getNamespace(),
-                    functionMetaData.getFunctionDetails().getName());
+    public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaData, boolean delete)
+            throws IllegalStateException, IllegalArgumentException {
+        if (exclusiveLeaderProducer == null) {
+            throw new IllegalStateException("Not the leader");
+        }
+        boolean needsScheduling;
+        if (delete) {
+            needsScheduling = proccessDeregister(functionMetaData);
+        } else {
+            needsScheduling = processUpdate(functionMetaData);
+        }
+        Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
+                .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
+                .setFunctionMetaData(functionMetaData)
+                .setWorkerId(workerConfig.getWorkerId())
+                .setRequestId(UUID.randomUUID().toString())
+                .build();
+        try {
+            lastMessageSeen = exclusiveLeaderProducer.send(serviceRequest.toByteArray());
+        } catch (Exception e) {
+            log.error("Could not write into Function Metadata topic", e);
+            errorNotifier.triggerError(e);
+        }
+        if (needsScheduling) {
+            this.schedulerManager.schedule();
         }
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(existingFunctionMetadata, functionMetaData);
-
-        Request.ServiceRequest updateRequest = ServiceRequestUtils.getUpdateRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
-
-        return submit(updateRequest);
     }
 
-
     /**
-     * Sends a deregister request to the FMT (Function Metadata Topic) for a function
-     * @param tenant the tenant the function that needs to be deregistered belongs to
-     * @param namespace the namespace the function that needs to be deregistered belongs to
-     * @param functionName the name of the function
-     * @return a completable future of when the deregister has been applied
+     * Called by the leader service when this worker becomes the leader.
+     * We first get exclusive producer on the metadata topic. Next we drain the tailer
+     * to ensure that we have caught up to metadata topic. After which we close the tailer.
+     * Note that this method cannot be syncrhonized because the tailer might still be processing messages
      */
-    public synchronized CompletableFuture<RequestResult> deregisterFunction(String tenant, String namespace, String functionName) {
-        FunctionMetaData functionMetaData = this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(functionMetaData, functionMetaData);
-
-        Request.ServiceRequest deregisterRequest = ServiceRequestUtils.getDeregisterRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
+    public void acquireLeadership() {
+        log.info("FunctionMetaDataManager becoming leader by creating exclusive producer");
+        FunctionMetaDataTopicTailer tailer = internalAcquireLeadership();
+        // Now that we have created the exclusive producer, wait for reader to get over
+        if (tailer != null) {
+            try {
+                tailer.stopWhenNoMoreMessages().get();
+            } catch (Exception e) {
+                log.error("Error while waiting for metadata tailer thread to finish", e);
+                errorNotifier.triggerError(e);
+            }
+            tailer.close();
+        }
+        this.schedulerManager.schedule();
+        log.info("FunctionMetaDataManager done becoming leader by doing its first schedule");
+    }
 
-        return submit(deregisterRequest);
+    private synchronized FunctionMetaDataTopicTailer internalAcquireLeadership() {
+        if (exclusiveLeaderProducer == null) {
+            try {
+                exclusiveLeaderProducer = pulsarClient.newProducer()
+                        .topic(this.workerConfig.getFunctionMetadataTopic())
+                        .producerName(workerConfig.getWorkerId() + "-leader")
+                        // .type(EXCLUSIVE)
+                        .create();
+            } catch (PulsarClientException e) {
+                log.error("Error creating exclusive producer", e);
+                errorNotifier.triggerError(e);
+            }
+        } else {
+            log.error("Logic Error in FunctionMetaData Manager");
+            errorNotifier.triggerError(new IllegalStateException());
+        }
+        FunctionMetaDataTopicTailer tailer = this.functionMetaDataTopicTailer;
+        this.functionMetaDataTopicTailer = null;
+        return tailer;
     }
 
     /**
-     * Sends a start/stop function request to the FMT (Function Metadata Topic) for a function
-     * @param tenant the tenant the function that needs to be deregistered belongs to
-     * @param namespace the namespace the function that needs to be deregistered belongs to
-     * @param functionName the name of the function
-     * @param instanceId the instanceId of the function, -1 if for all instances
-     * @param start do we need to start or stop
-     * @return a completable future of when the start/stop has been applied
+     * called by the leader service when we lose leadership. We close the exclusive producer
+     * and start the tailer.
      */
-    public synchronized CompletableFuture<RequestResult> changeFunctionInstanceStatus(String tenant, String namespace, String functionName,
-                                                                                      Integer instanceId, boolean start) {
-        FunctionMetaData functionMetaData = this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.changeFunctionInstanceStatus(functionMetaData, instanceId, start);
-
-        Request.ServiceRequest updateRequest = ServiceRequestUtils.getUpdateRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
-
-        return submit(updateRequest);
+    public synchronized void giveupLeadership() {
+        log.info("FunctionMetaDataManager giving up leadership by closing exclusive producer");
+        try {
+            exclusiveLeaderProducer.close();
+            exclusiveLeaderProducer = null;
+            initializeTailer();

Review comment:
       That happens at start




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r445346075



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -174,89 +181,124 @@ public synchronized boolean containsFunction(String tenant, String namespace, St
     }
 
     /**
-     * Sends an update request to the FMT (Function Metadata Topic)
-     * @param functionMetaData The function metadata that needs to be updated
-     * @return a completable future of when the update has been applied
+     * Called by the worker when we are in the leader mode.  In this state, we update our in-memory
+     * data structures and then write to the metadata topic.
+     * @param functionMetaData The function metadata in question
+     * @param delete Is this a delete operation
+     * @throws IllegalStateException if we are not the leader
+     * @throws IllegalArgumentException if the request is out of date.
      */
-    public synchronized CompletableFuture<RequestResult> updateFunction(FunctionMetaData functionMetaData) {
-
-        FunctionMetaData existingFunctionMetadata = null;
-        if (containsFunction(functionMetaData.getFunctionDetails().getTenant(),
-                functionMetaData.getFunctionDetails().getNamespace(),
-                functionMetaData.getFunctionDetails().getName())) {
-            existingFunctionMetadata = getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(),
-                    functionMetaData.getFunctionDetails().getNamespace(),
-                    functionMetaData.getFunctionDetails().getName());
+    public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaData, boolean delete)
+            throws IllegalStateException, IllegalArgumentException {
+        if (exclusiveLeaderProducer == null) {
+            throw new IllegalStateException("Not the leader");
+        }
+        boolean needsScheduling;
+        if (delete) {
+            needsScheduling = proccessDeregister(functionMetaData);
+        } else {
+            needsScheduling = processUpdate(functionMetaData);
+        }
+        Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
+                .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
+                .setFunctionMetaData(functionMetaData)
+                .setWorkerId(workerConfig.getWorkerId())
+                .setRequestId(UUID.randomUUID().toString())
+                .build();
+        try {
+            lastMessageSeen = exclusiveLeaderProducer.send(serviceRequest.toByteArray());
+        } catch (Exception e) {

Review comment:
       We should return the error to the worker making the call to the leader, otherwise the worker might have to wait for a timeout.  I think we should just return an error and the user can retry.  There is no guarantee that restarting the worker or electing another leader will help solve the issue since all the workers have the same configuration.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r442530821



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
##########
@@ -171,6 +171,9 @@ public void start(URI dlogUri,
             this.functionMetaDataManager = new FunctionMetaDataManager(
                     this.workerConfig, this.schedulerManager, this.client, errorNotifier);
 
+            // initialize function metadata manager
+            this.functionMetaDataManager.initialize();

Review comment:
       This should happen after membership manager is created/started.  We want to do this prior to the worker "joining" the cluster




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r440386456



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
##########
@@ -171,6 +171,9 @@ public void start(URI dlogUri,
             this.functionMetaDataManager = new FunctionMetaDataManager(
                     this.workerConfig, this.schedulerManager, this.client, errorNotifier);
 
+            // initialize function metadata manager
+            this.functionMetaDataManager.initialize();

Review comment:
       Why do you need to move this here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r444028418



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -241,20 +245,21 @@ public synchronized boolean containsFunction(String tenant, String namespace, St
      * @param serviceRequest The request
      */
     public void processRequest(MessageId messageId, Request.ServiceRequest serviceRequest) {
-
-        // make sure that processing requests don't happen simultaneously
-        synchronized (this) {
+        try {
             switch (serviceRequest.getServiceRequestType()) {
                 case UPDATE:
-                    this.processUpdate(serviceRequest);
+                    this.processUpdate(serviceRequest.getFunctionMetaData());
                     break;
                 case DELETE:
-                    this.proccessDeregister(serviceRequest);
+                    this.proccessDeregister(serviceRequest.getFunctionMetaData());
                     break;
                 default:
                     log.warn("Received request with unrecognized type: {}", serviceRequest);
             }
+        } catch (IllegalArgumentException e) {
+            // Its ok. Nothing much we can do about it

Review comment:
       Why are we catching and ignoring the exceptions?  Are not the "out of data request errors" thrown as illegal argument exceptions? If we catch them and ignore it, how are to users going to receive the error?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r440368512



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
##########
@@ -77,12 +89,25 @@ public void run() {
                     if (!(th instanceof InterruptedException || th.getCause() instanceof InterruptedException)) {
                         log.warn("Encountered error when metadata tailer is not running", th);
                     }
-                    return;
                 }
             }
         }
     }
 
+    public void stopWhenNoMoreMessages() {
+        stopOnNoMessageAvailable = true;
+        readerThread.interrupt();

Review comment:
       I don't think this is the right approach.  You might be interrupting a message being processed.  The goal is the reach to the end and process all the messages.  I would just doing something similar to what I have done here:
   https://github.com/apache/pulsar/pull/7237/files#diff-fb140c4ab9a86232f8d85b90cf0d3705R168




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r443731162



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -90,25 +79,25 @@ public FunctionMetaDataManager(WorkerConfig workerConfig,
      */
     public void initialize() {
         try {
+            initializeTailer();
             this.functionMetaDataTopicTailer = new FunctionMetaDataTopicTailer(this,
                     pulsarClient.newReader(), this.workerConfig, this.errorNotifier);
             // read all existing messages
-            this.setInitializePhase(true);
             while (this.functionMetaDataTopicTailer.getReader().hasMessageAvailable()) {
                 this.functionMetaDataTopicTailer.processRequest(this.functionMetaDataTopicTailer.getReader().readNext());
             }
-            this.setInitializePhase(false);
-            
-
         } catch (Exception e) {
             log.error("Failed to initialize meta data store", e);
-            throw new RuntimeException(e);
+            errorNotifier.triggerError(e);
         }
     }
-    
+
+    private void initializeTailer() throws PulsarClientException {

Review comment:
       Should we separate out the metadata tailer from the meta manager like what we did for the assignment tailer and runtime manager?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r440346207



##########
File path: pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
##########
@@ -893,4 +893,42 @@ void putFunctionState(String tenant, String namespace, String function, Function
      */
     CompletableFuture<Void> putFunctionStateAsync(
             String tenant, String namespace, String function, FunctionState state);
+
+    /**
+     * Sends update function request to worker leader. This is an internal only api
+     * <p/>
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param function
+     *            Function name
+     * @param functionMetaData
+     *            byte repr of FunctionMetaData
+     **
+     * @throws NotAuthorizedException
+     *             You don't have admin permission to get the configuration of the cluster
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void updateOnWorkerLeader(String tenant, String namespace, String function, byte[] functionMetaData,

Review comment:
       I don't think it is a good idea to expose this in the admin API. This is an internal thing and users should not have to the option to call this directly




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni merged pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
srkukarni merged pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r440366444



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -168,68 +151,78 @@ public synchronized boolean containsFunction(String tenant, String namespace, St
         return containsFunctionMetaData(tenant, namespace, functionName);
     }
 
-    /**
-     * Sends an update request to the FMT (Function Metadata Topic)
-     * @param functionMetaData The function metadata that needs to be updated
-     * @return a completable future of when the update has been applied
-     */
-    public synchronized CompletableFuture<RequestResult> updateFunction(FunctionMetaData functionMetaData) {
-
-        FunctionMetaData existingFunctionMetadata = null;
-        if (containsFunction(functionMetaData.getFunctionDetails().getTenant(),
-                functionMetaData.getFunctionDetails().getNamespace(),
-                functionMetaData.getFunctionDetails().getName())) {
-            existingFunctionMetadata = getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(),
-                    functionMetaData.getFunctionDetails().getNamespace(),
-                    functionMetaData.getFunctionDetails().getName());
+    public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaData, boolean delete)
+            throws IllegalStateException, IllegalArgumentException {
+        if (exclusiveLeaderProducer == null) {
+            throw new IllegalStateException("Not the leader");
+        }
+        boolean needsScheduling;
+        if (delete) {
+            needsScheduling = proccessDeregister(functionMetaData);
+        } else {
+            needsScheduling = processUpdate(functionMetaData);
+        }
+        Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
+                .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
+                .setFunctionMetaData(functionMetaData)
+                .setWorkerId(workerConfig.getWorkerId())
+                .setRequestId(UUID.randomUUID().toString())
+                .build();
+        try {
+            exclusiveLeaderProducer.send(serviceRequest.toByteArray());

Review comment:
       Shouldn't we also set a key so that we can use topic compaction?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r444440523



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -80,37 +84,40 @@ public FunctionMetaDataManager(WorkerConfig workerConfig,
                 this.pulsarClient, this.workerConfig.getFunctionMetadataTopic());
         this.schedulerManager = schedulerManager;
         this.errorNotifier = errorNotifier;
+        exclusiveLeaderProducer = null;
     }
 
     /**
      * Public methods. Please use these methods if references FunctionMetaManager from an external class
      */
 
     /**
-     * Initializes the FunctionMetaDataManager.  Does the following:
-     * 1. Consume all existing function meta data upon start to establish existing state
+     * Initializes the FunctionMetaDataManager. By default we start in the worker mode.
+     * We consume all existing function meta data to establish existing state
      */
     public void initialize() {
         try {
+            initializeTailer();
             this.functionMetaDataTopicTailer = new FunctionMetaDataTopicTailer(this,
                     pulsarClient.newReader(), this.workerConfig, this.errorNotifier);
             // read all existing messages
-            this.setInitializePhase(true);
             while (this.functionMetaDataTopicTailer.getReader().hasMessageAvailable()) {
                 this.functionMetaDataTopicTailer.processRequest(this.functionMetaDataTopicTailer.getReader().readNext());

Review comment:
       it is kind of weird that the functionMetaDataTopicTailer.processRequest() will call back to FunctionMetadataManager.  Seems like an awkward interaction between the classes.  Perhaps we can refactor in a subsequent PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r445265397



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -174,89 +181,124 @@ public synchronized boolean containsFunction(String tenant, String namespace, St
     }
 
     /**
-     * Sends an update request to the FMT (Function Metadata Topic)
-     * @param functionMetaData The function metadata that needs to be updated
-     * @return a completable future of when the update has been applied
+     * Called by the worker when we are in the leader mode.  In this state, we update our in-memory
+     * data structures and then write to the metadata topic.
+     * @param functionMetaData The function metadata in question
+     * @param delete Is this a delete operation
+     * @throws IllegalStateException if we are not the leader
+     * @throws IllegalArgumentException if the request is out of date.
      */
-    public synchronized CompletableFuture<RequestResult> updateFunction(FunctionMetaData functionMetaData) {
-
-        FunctionMetaData existingFunctionMetadata = null;
-        if (containsFunction(functionMetaData.getFunctionDetails().getTenant(),
-                functionMetaData.getFunctionDetails().getNamespace(),
-                functionMetaData.getFunctionDetails().getName())) {
-            existingFunctionMetadata = getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(),
-                    functionMetaData.getFunctionDetails().getNamespace(),
-                    functionMetaData.getFunctionDetails().getName());
+    public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaData, boolean delete)
+            throws IllegalStateException, IllegalArgumentException {
+        if (exclusiveLeaderProducer == null) {
+            throw new IllegalStateException("Not the leader");
+        }
+        boolean needsScheduling;
+        if (delete) {
+            needsScheduling = proccessDeregister(functionMetaData);
+        } else {
+            needsScheduling = processUpdate(functionMetaData);
+        }
+        Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
+                .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
+                .setFunctionMetaData(functionMetaData)
+                .setWorkerId(workerConfig.getWorkerId())
+                .setRequestId(UUID.randomUUID().toString())
+                .build();
+        try {
+            lastMessageSeen = exclusiveLeaderProducer.send(serviceRequest.toByteArray());
+        } catch (Exception e) {

Review comment:
       So the question here is whats the right thing to do. If we are having issues to write into the producer, should the leader just reject the request saying Internal server error and hope that things will be better next time? Or is the right approach to trigger worker death?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r444561903



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -174,89 +181,124 @@ public synchronized boolean containsFunction(String tenant, String namespace, St
     }
 
     /**
-     * Sends an update request to the FMT (Function Metadata Topic)
-     * @param functionMetaData The function metadata that needs to be updated
-     * @return a completable future of when the update has been applied
+     * Called by the worker when we are in the leader mode.  In this state, we update our in-memory
+     * data structures and then write to the metadata topic.
+     * @param functionMetaData The function metadata in question
+     * @param delete Is this a delete operation
+     * @throws IllegalStateException if we are not the leader
+     * @throws IllegalArgumentException if the request is out of date.
      */
-    public synchronized CompletableFuture<RequestResult> updateFunction(FunctionMetaData functionMetaData) {
-
-        FunctionMetaData existingFunctionMetadata = null;
-        if (containsFunction(functionMetaData.getFunctionDetails().getTenant(),
-                functionMetaData.getFunctionDetails().getNamespace(),
-                functionMetaData.getFunctionDetails().getName())) {
-            existingFunctionMetadata = getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(),
-                    functionMetaData.getFunctionDetails().getNamespace(),
-                    functionMetaData.getFunctionDetails().getName());
+    public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaData, boolean delete)
+            throws IllegalStateException, IllegalArgumentException {
+        if (exclusiveLeaderProducer == null) {
+            throw new IllegalStateException("Not the leader");
+        }
+        boolean needsScheduling;
+        if (delete) {
+            needsScheduling = proccessDeregister(functionMetaData);
+        } else {
+            needsScheduling = processUpdate(functionMetaData);
+        }
+        Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
+                .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
+                .setFunctionMetaData(functionMetaData)
+                .setWorkerId(workerConfig.getWorkerId())
+                .setRequestId(UUID.randomUUID().toString())
+                .build();
+        try {
+            lastMessageSeen = exclusiveLeaderProducer.send(serviceRequest.toByteArray());
+        } catch (Exception e) {

Review comment:
       "errorNotifier.triggerError(e);" is still being called.  The worker might exist before exception gets bubbled up and a response send back




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r445257209



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -174,89 +181,124 @@ public synchronized boolean containsFunction(String tenant, String namespace, St
     }
 
     /**
-     * Sends an update request to the FMT (Function Metadata Topic)
-     * @param functionMetaData The function metadata that needs to be updated
-     * @return a completable future of when the update has been applied
+     * Called by the worker when we are in the leader mode.  In this state, we update our in-memory
+     * data structures and then write to the metadata topic.
+     * @param functionMetaData The function metadata in question
+     * @param delete Is this a delete operation
+     * @throws IllegalStateException if we are not the leader
+     * @throws IllegalArgumentException if the request is out of date.
      */
-    public synchronized CompletableFuture<RequestResult> updateFunction(FunctionMetaData functionMetaData) {
-
-        FunctionMetaData existingFunctionMetadata = null;
-        if (containsFunction(functionMetaData.getFunctionDetails().getTenant(),
-                functionMetaData.getFunctionDetails().getNamespace(),
-                functionMetaData.getFunctionDetails().getName())) {
-            existingFunctionMetadata = getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(),
-                    functionMetaData.getFunctionDetails().getNamespace(),
-                    functionMetaData.getFunctionDetails().getName());
+    public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaData, boolean delete)
+            throws IllegalStateException, IllegalArgumentException {
+        if (exclusiveLeaderProducer == null) {
+            throw new IllegalStateException("Not the leader");
+        }
+        boolean needsScheduling;
+        if (delete) {
+            needsScheduling = proccessDeregister(functionMetaData);
+        } else {
+            needsScheduling = processUpdate(functionMetaData);
+        }
+        Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
+                .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
+                .setFunctionMetaData(functionMetaData)
+                .setWorkerId(workerConfig.getWorkerId())
+                .setRequestId(UUID.randomUUID().toString())
+                .build();
+        try {
+            lastMessageSeen = exclusiveLeaderProducer.send(serviceRequest.toByteArray());
+        } catch (Exception e) {

Review comment:
       @srkukarni ^^^




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r444470591



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
##########
@@ -54,19 +58,33 @@ public FunctionMetaDataTopicTailer(FunctionMetaDataManager functionMetaDataManag
         readerThread = new Thread(this);
         readerThread.setName("function-metadata-tailer-thread");
         this.errorNotifier = errorNotifier;
+        stopOnNoMessageAvailable = false;
     }
 
     public void start() {
         running = true;
+        exitFuture = new CompletableFuture<>();
         readerThread.start();
     }
 
     @Override
     public void run() {
-        while(running) {
+        while (running) {

Review comment:
       To check if we have really reached the end of the topic, I think its safer if we check reader.hasMessageAvailable() == false and reader.readNext(5, TimeUnit.SECONDS) returns null.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r440411984



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -168,68 +151,78 @@ public synchronized boolean containsFunction(String tenant, String namespace, St
         return containsFunctionMetaData(tenant, namespace, functionName);
     }
 
-    /**
-     * Sends an update request to the FMT (Function Metadata Topic)
-     * @param functionMetaData The function metadata that needs to be updated
-     * @return a completable future of when the update has been applied
-     */
-    public synchronized CompletableFuture<RequestResult> updateFunction(FunctionMetaData functionMetaData) {
-
-        FunctionMetaData existingFunctionMetadata = null;
-        if (containsFunction(functionMetaData.getFunctionDetails().getTenant(),
-                functionMetaData.getFunctionDetails().getNamespace(),
-                functionMetaData.getFunctionDetails().getName())) {
-            existingFunctionMetadata = getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(),
-                    functionMetaData.getFunctionDetails().getNamespace(),
-                    functionMetaData.getFunctionDetails().getName());
+    public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaData, boolean delete)
+            throws IllegalStateException, IllegalArgumentException {
+        if (exclusiveLeaderProducer == null) {
+            throw new IllegalStateException("Not the leader");
+        }
+        boolean needsScheduling;
+        if (delete) {
+            needsScheduling = proccessDeregister(functionMetaData);
+        } else {
+            needsScheduling = processUpdate(functionMetaData);
+        }
+        Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
+                .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
+                .setFunctionMetaData(functionMetaData)
+                .setWorkerId(workerConfig.getWorkerId())
+                .setRequestId(UUID.randomUUID().toString())
+                .build();
+        try {
+            exclusiveLeaderProducer.send(serviceRequest.toByteArray());
+        } catch (Exception e) {
+            log.error("Could not write into Function Metadata topic", e);
+            errorNotifier.triggerError(e);
+        }
+        if (needsScheduling) {
+            this.schedulerManager.schedule();
         }
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(existingFunctionMetadata, functionMetaData);
-
-        Request.ServiceRequest updateRequest = ServiceRequestUtils.getUpdateRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
-
-        return submit(updateRequest);
     }
 
-
-    /**
-     * Sends a deregister request to the FMT (Function Metadata Topic) for a function
-     * @param tenant the tenant the function that needs to be deregistered belongs to
-     * @param namespace the namespace the function that needs to be deregistered belongs to
-     * @param functionName the name of the function
-     * @return a completable future of when the deregister has been applied
-     */
-    public synchronized CompletableFuture<RequestResult> deregisterFunction(String tenant, String namespace, String functionName) {
-        FunctionMetaData functionMetaData = this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(functionMetaData, functionMetaData);
-
-        Request.ServiceRequest deregisterRequest = ServiceRequestUtils.getDeregisterRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
-
-        return submit(deregisterRequest);
+    // Note that this method cannot be syncrhonized because the tailer might still be processing messages
+    public void acquireLeadership() {
+        log.info("FunctionMetaDataManager becoming leader by creating exclusive producer");
+        FunctionMetaDataTopicTailer tailer = internalAcquireLeadership();
+        // Now that we have created the exclusive producer, wait for reader to get over
+        if (tailer != null) {
+            tailer.stopWhenNoMoreMessages();
+            tailer.close();
+        }
+        this.schedulerManager.schedule();
+        log.info("FunctionMetaDataManager done becoming leader by doing its first schedule");
     }
 
-    /**
-     * Sends a start/stop function request to the FMT (Function Metadata Topic) for a function
-     * @param tenant the tenant the function that needs to be deregistered belongs to
-     * @param namespace the namespace the function that needs to be deregistered belongs to
-     * @param functionName the name of the function
-     * @param instanceId the instanceId of the function, -1 if for all instances
-     * @param start do we need to start or stop
-     * @return a completable future of when the start/stop has been applied
-     */
-    public synchronized CompletableFuture<RequestResult> changeFunctionInstanceStatus(String tenant, String namespace, String functionName,
-                                                                                      Integer instanceId, boolean start) {
-        FunctionMetaData functionMetaData = this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.changeFunctionInstanceStatus(functionMetaData, instanceId, start);
-
-        Request.ServiceRequest updateRequest = ServiceRequestUtils.getUpdateRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
+    private synchronized FunctionMetaDataTopicTailer internalAcquireLeadership() {
+        if (exclusiveLeaderProducer == null) {
+            try {
+                exclusiveLeaderProducer = pulsarClient.newProducer()
+                        .topic(this.workerConfig.getFunctionMetadataTopic())
+                        .producerName(workerConfig.getWorkerId() + "-leader")
+                        // .type(EXCLUSIVE)
+                        .create();
+            } catch (PulsarClientException e) {
+                log.error("Error creating exclusive producer", e);
+                errorNotifier.triggerError(e);
+            }
+        } else {
+            log.error("Logic Error in FunctionMetaData Manager");
+            errorNotifier.triggerError(new IllegalStateException());
+        }
+        FunctionMetaDataTopicTailer tailer = this.functionMetaDataTopicTailer;
+        this.functionMetaDataTopicTailer = null;
+        return tailer;
+    }
 
-        return submit(updateRequest);
+    public synchronized void giveupLeadership() {
+        log.info("FunctionMetaDataManager giving up leadership by closing exclusive producer");
+        try {
+            exclusiveLeaderProducer.close();
+        } catch (PulsarClientException e) {
+            log.error("Error closing exclusive producer", e);
+            errorNotifier.triggerError(e);
+        }
+        exclusiveLeaderProducer = null;
+        initializeTailer();

Review comment:
       will do




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r444551384



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
##########
@@ -392,24 +391,13 @@ public void deregisterFunction(final String tenant,
             throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
         }
 
-        CompletableFuture<RequestResult> completableFuture = functionMetaDataManager.deregisterFunction(tenant,
-                namespace, componentName);
-
-        RequestResult requestResult = null;
-        try {
-            requestResult = completableFuture.get();
-            if (!requestResult.isSuccess()) {
-                throw new RestException(Status.BAD_REQUEST, requestResult.getMessage());
-            }
-        } catch (ExecutionException e) {
-            log.error("Execution Exception while deregistering {} @ /{}/{}/{}",
-                    ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
-            throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getCause().getMessage());
-        } catch (InterruptedException e) {
-            log.error("Interrupted Exception while deregistering {} @ /{}/{}/{}",
-                    ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
-            throw new RestException(Status.REQUEST_TIMEOUT, e.getMessage());
-        }
+        FunctionMetaData newVersionedMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(functionMetaData, functionMetaData);

Review comment:
       Can we rename " FunctionMetaDataUtils.generateUpdatedMetadata" to something like  "FunctionMetaDataUtils.incrMetadataVersion"




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r444461439



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -52,20 +61,15 @@
     @VisibleForTesting
     final Map<String, Map<String, Map<String, FunctionMetaData>>> functionMetaDataMap = new ConcurrentHashMap<>();
 
-    // A map in which the key is the service request id and value is the service request
-    private final Map<String, ServiceRequestInfo> pendingServiceRequests = new ConcurrentHashMap<>();
-
     private final ServiceRequestManager serviceRequestManager;
     private final SchedulerManager schedulerManager;
     private final WorkerConfig workerConfig;
     private final PulsarClient pulsarClient;
     private final ErrorNotifier errorNotifier;
 
     private FunctionMetaDataTopicTailer functionMetaDataTopicTailer;
-
-    @Setter
-    @Getter
-    boolean isInitializePhase = false;
+    private Producer exclusiveLeaderProducer;
+    private MessageId lastMessageSeen = MessageId.earliest;

Review comment:
       Actually no. Manager should keep track of it since in leader mode there is no tailer

##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -80,37 +84,40 @@ public FunctionMetaDataManager(WorkerConfig workerConfig,
                 this.pulsarClient, this.workerConfig.getFunctionMetadataTopic());
         this.schedulerManager = schedulerManager;
         this.errorNotifier = errorNotifier;
+        exclusiveLeaderProducer = null;
     }
 
     /**
      * Public methods. Please use these methods if references FunctionMetaManager from an external class
      */
 
     /**
-     * Initializes the FunctionMetaDataManager.  Does the following:
-     * 1. Consume all existing function meta data upon start to establish existing state
+     * Initializes the FunctionMetaDataManager. By default we start in the worker mode.
+     * We consume all existing function meta data to establish existing state
      */
     public void initialize() {
         try {
+            initializeTailer();
             this.functionMetaDataTopicTailer = new FunctionMetaDataTopicTailer(this,
                     pulsarClient.newReader(), this.workerConfig, this.errorNotifier);
             // read all existing messages
-            this.setInitializePhase(true);
             while (this.functionMetaDataTopicTailer.getReader().hasMessageAvailable()) {
                 this.functionMetaDataTopicTailer.processRequest(this.functionMetaDataTopicTailer.getReader().readNext());

Review comment:
       Agreed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r444459451



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -168,68 +151,78 @@ public synchronized boolean containsFunction(String tenant, String namespace, St
         return containsFunctionMetaData(tenant, namespace, functionName);
     }
 
-    /**
-     * Sends an update request to the FMT (Function Metadata Topic)
-     * @param functionMetaData The function metadata that needs to be updated
-     * @return a completable future of when the update has been applied
-     */
-    public synchronized CompletableFuture<RequestResult> updateFunction(FunctionMetaData functionMetaData) {
-
-        FunctionMetaData existingFunctionMetadata = null;
-        if (containsFunction(functionMetaData.getFunctionDetails().getTenant(),
-                functionMetaData.getFunctionDetails().getNamespace(),
-                functionMetaData.getFunctionDetails().getName())) {
-            existingFunctionMetadata = getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(),
-                    functionMetaData.getFunctionDetails().getNamespace(),
-                    functionMetaData.getFunctionDetails().getName());
+    public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaData, boolean delete)

Review comment:
       Yes. All of that workflow is gone because we no longer use submit workflow. Leader either directly updates cache or in worker mode the trailer does the update 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] wolfstudy commented on pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#issuecomment-717778890


   Move this change to 2.6.2, because the #8143 depends on it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r444444502



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -80,37 +84,40 @@ public FunctionMetaDataManager(WorkerConfig workerConfig,
                 this.pulsarClient, this.workerConfig.getFunctionMetadataTopic());
         this.schedulerManager = schedulerManager;
         this.errorNotifier = errorNotifier;
+        exclusiveLeaderProducer = null;
     }
 
     /**
      * Public methods. Please use these methods if references FunctionMetaManager from an external class
      */
 
     /**
-     * Initializes the FunctionMetaDataManager.  Does the following:
-     * 1. Consume all existing function meta data upon start to establish existing state
+     * Initializes the FunctionMetaDataManager. By default we start in the worker mode.
+     * We consume all existing function meta data to establish existing state
      */
     public void initialize() {
         try {
+            initializeTailer();
             this.functionMetaDataTopicTailer = new FunctionMetaDataTopicTailer(this,
                     pulsarClient.newReader(), this.workerConfig, this.errorNotifier);
             // read all existing messages
-            this.setInitializePhase(true);
             while (this.functionMetaDataTopicTailer.getReader().hasMessageAvailable()) {
                 this.functionMetaDataTopicTailer.processRequest(this.functionMetaDataTopicTailer.getReader().readNext());
             }
-            this.setInitializePhase(false);
             
             this.isInitialized.complete(null);
         } catch (Exception e) {
             log.error("Failed to initialize meta data store", e);
-            throw new RuntimeException(e);
+            errorNotifier.triggerError(e);
         }
     }
-    
+
+    private void initializeTailer() throws PulsarClientException {
+        this.functionMetaDataTopicTailer = new FunctionMetaDataTopicTailer(this,
+                pulsarClient.newReader().startMessageId(lastMessageSeen), this.workerConfig, this.errorNotifier);
+    }
+
     public void start() {
-        // schedule functions if necessary
-        this.schedulerManager.schedule();

Review comment:
       nvm




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r440345721



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
##########
@@ -692,4 +693,25 @@ public StreamingOutput downloadFunction(
     public List<ConnectorDefinition> getConnectorsList() throws IOException {
         return functions.getListOfConnectors();
     }
+
+    @PUT
+    @ApiOperation(value = "Updates a Pulsar Function on the worker leader")
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have super-user permissions"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 307, message = "Redirecting to the worker leader"),
+            @ApiResponse(code = 200, message = "Pulsar Function successfully updated")
+    })
+    @Path("/leader/{tenant}/{namespace}/{functionName}")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public void updateFunctionOnWorkerLeader(final @PathParam("tenant") String tenant,

Review comment:
       I don't think it is a good idea to expose this in the admin API.  This is an internal thing and users should not have to the option to call this directly




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r444561903



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -174,89 +181,124 @@ public synchronized boolean containsFunction(String tenant, String namespace, St
     }
 
     /**
-     * Sends an update request to the FMT (Function Metadata Topic)
-     * @param functionMetaData The function metadata that needs to be updated
-     * @return a completable future of when the update has been applied
+     * Called by the worker when we are in the leader mode.  In this state, we update our in-memory
+     * data structures and then write to the metadata topic.
+     * @param functionMetaData The function metadata in question
+     * @param delete Is this a delete operation
+     * @throws IllegalStateException if we are not the leader
+     * @throws IllegalArgumentException if the request is out of date.
      */
-    public synchronized CompletableFuture<RequestResult> updateFunction(FunctionMetaData functionMetaData) {
-
-        FunctionMetaData existingFunctionMetadata = null;
-        if (containsFunction(functionMetaData.getFunctionDetails().getTenant(),
-                functionMetaData.getFunctionDetails().getNamespace(),
-                functionMetaData.getFunctionDetails().getName())) {
-            existingFunctionMetadata = getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(),
-                    functionMetaData.getFunctionDetails().getNamespace(),
-                    functionMetaData.getFunctionDetails().getName());
+    public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaData, boolean delete)
+            throws IllegalStateException, IllegalArgumentException {
+        if (exclusiveLeaderProducer == null) {
+            throw new IllegalStateException("Not the leader");
+        }
+        boolean needsScheduling;
+        if (delete) {
+            needsScheduling = proccessDeregister(functionMetaData);
+        } else {
+            needsScheduling = processUpdate(functionMetaData);
+        }
+        Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
+                .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
+                .setFunctionMetaData(functionMetaData)
+                .setWorkerId(workerConfig.getWorkerId())
+                .setRequestId(UUID.randomUUID().toString())
+                .build();
+        try {
+            lastMessageSeen = exclusiveLeaderProducer.send(serviceRequest.toByteArray());
+        } catch (Exception e) {

Review comment:
       "errorNotifier.triggerError(e);" is still being called.  The worker might exit before exception gets bubbled up and a response send back




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r442571198



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
##########
@@ -171,6 +171,9 @@ public void start(URI dlogUri,
             this.functionMetaDataManager = new FunctionMetaDataManager(
                     this.workerConfig, this.schedulerManager, this.client, errorNotifier);
 
+            // initialize function metadata manager
+            this.functionMetaDataManager.initialize();

Review comment:
       Whats the harm before doing that? All that will happen is that this will read the topic and update its internal state




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r444541880



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -174,89 +181,124 @@ public synchronized boolean containsFunction(String tenant, String namespace, St
     }
 
     /**
-     * Sends an update request to the FMT (Function Metadata Topic)
-     * @param functionMetaData The function metadata that needs to be updated
-     * @return a completable future of when the update has been applied
+     * Called by the worker when we are in the leader mode.  In this state, we update our in-memory
+     * data structures and then write to the metadata topic.
+     * @param functionMetaData The function metadata in question
+     * @param delete Is this a delete operation
+     * @throws IllegalStateException if we are not the leader
+     * @throws IllegalArgumentException if the request is out of date.
      */
-    public synchronized CompletableFuture<RequestResult> updateFunction(FunctionMetaData functionMetaData) {
-
-        FunctionMetaData existingFunctionMetadata = null;
-        if (containsFunction(functionMetaData.getFunctionDetails().getTenant(),
-                functionMetaData.getFunctionDetails().getNamespace(),
-                functionMetaData.getFunctionDetails().getName())) {
-            existingFunctionMetadata = getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(),
-                    functionMetaData.getFunctionDetails().getNamespace(),
-                    functionMetaData.getFunctionDetails().getName());
+    public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaData, boolean delete)
+            throws IllegalStateException, IllegalArgumentException {
+        if (exclusiveLeaderProducer == null) {
+            throw new IllegalStateException("Not the leader");
+        }
+        boolean needsScheduling;
+        if (delete) {
+            needsScheduling = proccessDeregister(functionMetaData);
+        } else {
+            needsScheduling = processUpdate(functionMetaData);
+        }
+        Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
+                .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
+                .setFunctionMetaData(functionMetaData)
+                .setWorkerId(workerConfig.getWorkerId())
+                .setRequestId(UUID.randomUUID().toString())
+                .build();
+        try {
+            lastMessageSeen = exclusiveLeaderProducer.send(serviceRequest.toByteArray());
+        } catch (Exception e) {
+            log.error("Could not write into Function Metadata topic", e);
+            errorNotifier.triggerError(e);
+        }
+        if (needsScheduling) {
+            this.schedulerManager.schedule();
         }
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(existingFunctionMetadata, functionMetaData);
-
-        Request.ServiceRequest updateRequest = ServiceRequestUtils.getUpdateRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
-
-        return submit(updateRequest);
     }
 
-
     /**
-     * Sends a deregister request to the FMT (Function Metadata Topic) for a function
-     * @param tenant the tenant the function that needs to be deregistered belongs to
-     * @param namespace the namespace the function that needs to be deregistered belongs to
-     * @param functionName the name of the function
-     * @return a completable future of when the deregister has been applied
+     * Called by the leader service when this worker becomes the leader.
+     * We first get exclusive producer on the metadata topic. Next we drain the tailer
+     * to ensure that we have caught up to metadata topic. After which we close the tailer.
+     * Note that this method cannot be syncrhonized because the tailer might still be processing messages
      */
-    public synchronized CompletableFuture<RequestResult> deregisterFunction(String tenant, String namespace, String functionName) {
-        FunctionMetaData functionMetaData = this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(functionMetaData, functionMetaData);
-
-        Request.ServiceRequest deregisterRequest = ServiceRequestUtils.getDeregisterRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
+    public void acquireLeadership() {
+        log.info("FunctionMetaDataManager becoming leader by creating exclusive producer");
+        FunctionMetaDataTopicTailer tailer = internalAcquireLeadership();
+        // Now that we have created the exclusive producer, wait for reader to get over
+        if (tailer != null) {
+            try {
+                tailer.stopWhenNoMoreMessages().get();
+            } catch (Exception e) {
+                log.error("Error while waiting for metadata tailer thread to finish", e);
+                errorNotifier.triggerError(e);
+            }
+            tailer.close();
+        }
+        this.schedulerManager.schedule();
+        log.info("FunctionMetaDataManager done becoming leader by doing its first schedule");
+    }
 
-        return submit(deregisterRequest);
+    private synchronized FunctionMetaDataTopicTailer internalAcquireLeadership() {

Review comment:
       How could that happen? internalAcquireLeadership() is only called in acquireLeadership() which is only called in becomeActive() which is synchronized.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r444431425



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -80,37 +84,40 @@ public FunctionMetaDataManager(WorkerConfig workerConfig,
                 this.pulsarClient, this.workerConfig.getFunctionMetadataTopic());
         this.schedulerManager = schedulerManager;
         this.errorNotifier = errorNotifier;
+        exclusiveLeaderProducer = null;
     }
 
     /**
      * Public methods. Please use these methods if references FunctionMetaManager from an external class
      */
 
     /**
-     * Initializes the FunctionMetaDataManager.  Does the following:
-     * 1. Consume all existing function meta data upon start to establish existing state
+     * Initializes the FunctionMetaDataManager. By default we start in the worker mode.
+     * We consume all existing function meta data to establish existing state
      */
     public void initialize() {
         try {
+            initializeTailer();
             this.functionMetaDataTopicTailer = new FunctionMetaDataTopicTailer(this,
                     pulsarClient.newReader(), this.workerConfig, this.errorNotifier);
             // read all existing messages
-            this.setInitializePhase(true);
             while (this.functionMetaDataTopicTailer.getReader().hasMessageAvailable()) {
                 this.functionMetaDataTopicTailer.processRequest(this.functionMetaDataTopicTailer.getReader().readNext());
             }
-            this.setInitializePhase(false);
             
             this.isInitialized.complete(null);
         } catch (Exception e) {
             log.error("Failed to initialize meta data store", e);
-            throw new RuntimeException(e);
+            errorNotifier.triggerError(e);
         }
     }
-    
+
+    private void initializeTailer() throws PulsarClientException {
+        this.functionMetaDataTopicTailer = new FunctionMetaDataTopicTailer(this,
+                pulsarClient.newReader().startMessageId(lastMessageSeen), this.workerConfig, this.errorNotifier);
+    }
+
     public void start() {
-        // schedule functions if necessary
-        this.schedulerManager.schedule();

Review comment:
       Why are we removing this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r442373165



##########
File path: pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
##########
@@ -893,4 +893,42 @@ void putFunctionState(String tenant, String namespace, String function, Function
      */
     CompletableFuture<Void> putFunctionStateAsync(
             String tenant, String namespace, String function, FunctionState state);
+
+    /**
+     * Sends update function request to worker leader. This is an internal only api
+     * <p/>
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param function
+     *            Function name
+     * @param functionMetaData
+     *            byte repr of FunctionMetaData
+     **
+     * @throws NotAuthorizedException
+     *             You don't have admin permission to get the configuration of the cluster
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void updateOnWorkerLeader(String tenant, String namespace, String function, byte[] functionMetaData,

Review comment:
       removed 

##########
File path: pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
##########
@@ -984,4 +985,53 @@ public void putFunctionState(String tenant, String namespace, String function, F
         }
         return future;
     }
+
+    @Override
+    public void updateOnWorkerLeader(String tenant, String namespace,
+                                     String function, byte[] functionMetaData,
+                                     boolean delete) throws PulsarAdminException {
+        try {
+            updateOnWorkerLeaderAsync(tenant, namespace, function,
+                    functionMetaData, delete).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> updateOnWorkerLeaderAsync(String tenant, String namespace,

Review comment:
       removed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r440411334



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -168,68 +151,78 @@ public synchronized boolean containsFunction(String tenant, String namespace, St
         return containsFunctionMetaData(tenant, namespace, functionName);
     }
 
-    /**
-     * Sends an update request to the FMT (Function Metadata Topic)
-     * @param functionMetaData The function metadata that needs to be updated
-     * @return a completable future of when the update has been applied
-     */
-    public synchronized CompletableFuture<RequestResult> updateFunction(FunctionMetaData functionMetaData) {
-
-        FunctionMetaData existingFunctionMetadata = null;
-        if (containsFunction(functionMetaData.getFunctionDetails().getTenant(),
-                functionMetaData.getFunctionDetails().getNamespace(),
-                functionMetaData.getFunctionDetails().getName())) {
-            existingFunctionMetadata = getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(),
-                    functionMetaData.getFunctionDetails().getNamespace(),
-                    functionMetaData.getFunctionDetails().getName());
+    public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaData, boolean delete)

Review comment:
       That behaviour hasn't changed. Users will still see 'This request is outdated, please try again' messages upon concurrent updates. That behaviour is now executed by the leader which will return that error in the /leader request.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r440403279



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -168,68 +151,78 @@ public synchronized boolean containsFunction(String tenant, String namespace, St
         return containsFunctionMetaData(tenant, namespace, functionName);
     }
 
-    /**
-     * Sends an update request to the FMT (Function Metadata Topic)
-     * @param functionMetaData The function metadata that needs to be updated
-     * @return a completable future of when the update has been applied
-     */
-    public synchronized CompletableFuture<RequestResult> updateFunction(FunctionMetaData functionMetaData) {
-
-        FunctionMetaData existingFunctionMetadata = null;
-        if (containsFunction(functionMetaData.getFunctionDetails().getTenant(),
-                functionMetaData.getFunctionDetails().getNamespace(),
-                functionMetaData.getFunctionDetails().getName())) {
-            existingFunctionMetadata = getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(),
-                    functionMetaData.getFunctionDetails().getNamespace(),
-                    functionMetaData.getFunctionDetails().getName());
+    public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaData, boolean delete)

Review comment:
       Currently when there are concurrent modifications, the worker will fail requests.  That doesn't seem to be done now.  All requests are accepted and 200s will be returned to the user but internally some requests will just be silently ignored.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r444438587



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -52,20 +61,15 @@
     @VisibleForTesting
     final Map<String, Map<String, Map<String, FunctionMetaData>>> functionMetaDataMap = new ConcurrentHashMap<>();
 
-    // A map in which the key is the service request id and value is the service request
-    private final Map<String, ServiceRequestInfo> pendingServiceRequests = new ConcurrentHashMap<>();
-
     private final ServiceRequestManager serviceRequestManager;
     private final SchedulerManager schedulerManager;
     private final WorkerConfig workerConfig;
     private final PulsarClient pulsarClient;
     private final ErrorNotifier errorNotifier;
 
     private FunctionMetaDataTopicTailer functionMetaDataTopicTailer;
-
-    @Setter
-    @Getter
-    boolean isInitializePhase = false;
+    private Producer exclusiveLeaderProducer;
+    private MessageId lastMessageSeen = MessageId.earliest;

Review comment:
       It seems more natural that the FunctionMetaDataTopicTailer keeps track of this since the FunctionMetadataManager isn't in charge of reading the topic




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org