You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/06/10 20:17:28 UTC

[GitHub] [pulsar] jerrypeng opened a new pull request #7237: Fix leader/scheduler assignment processing lag problem

jerrypeng opened a new pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237


   
   
   ### Motivation
   
   When the leader worker isn't processing assignment messages fast enough. The background routine that checks for unassigned functions instances will trigger scheduler to schedule and write more assignments to the assignment topic. There is essentially a feedback loop that can cause many assignment updates to be published in the assignment topic that are unnecessary.
   
   
   ### Modifications
   
   Allow leader to modify/update locally in-memory assignments map


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7237: Fix leader/scheduler assignment processing lag problem

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r442673058



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -99,16 +98,20 @@ public void initialize() {
                 this.functionMetaDataTopicTailer.processRequest(this.functionMetaDataTopicTailer.getReader().readNext());
             }
             this.setInitializePhase(false);
-            // schedule functions if necessary
-            this.schedulerManager.schedule();
-            // start function metadata tailer
-            this.functionMetaDataTopicTailer.start();
+            
 
         } catch (Exception e) {
             log.error("Failed to initialize meta data store", e);
             throw new RuntimeException(e);
         }
     }
+    
+    public void start() {

Review comment:
       because we cannot start prior to the SchedulerManager is setup since function metadata manager can invoke the scheduler.  We can initialize prior to to setting up the SchedulerManager but we cannot start




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7237: Fix leader/scheduler assignment processing lag problem

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r441875616



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
##########
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerEventListener;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@Slf4j
+public class LeaderService implements AutoCloseable, ConsumerEventListener {
+
+    private final String consumerName;
+    private final FunctionRuntimeManager functionRuntimeManager;
+    private final ErrorNotifier errorNotifier;
+    private final SchedulerManager schedulerManager;
+    private ConsumerImpl<byte[]> consumer;
+    private final WorkerConfig workerConfig;
+    private final PulsarClient pulsarClient;
+    private final AtomicBoolean isLeader = new AtomicBoolean(false);
+
+    static final String COORDINATION_TOPIC_SUBSCRIPTION = "participants";
+
+    private static String WORKER_IDENTIFIER = "id";
+    
+    public LeaderService(WorkerService workerService,
+                         PulsarClient pulsarClient,
+                         FunctionRuntimeManager functionRuntimeManager,
+                         SchedulerManager schedulerManager,
+                         ErrorNotifier errorNotifier) {
+        this.workerConfig = workerService.getWorkerConfig();
+        this.pulsarClient = pulsarClient;
+        this.functionRuntimeManager = functionRuntimeManager;
+        this.schedulerManager = schedulerManager;
+        this.errorNotifier = errorNotifier;
+        consumerName = String.format(
+                "%s:%s:%d",
+                workerConfig.getWorkerId(),
+                workerConfig.getWorkerHostname(),
+                workerConfig.getWorkerPort()
+        );
+
+    }
+
+    public void start() throws PulsarClientException {
+        // the leaders service is using a `coordination` topic for leader election.
+        // we don't produce any messages into this topic, we only use the `failover` subscription
+        // to elect an active consumer as the leader worker. The leader worker will be responsible
+        // for scheduling snapshots for FMT and doing task assignment.
+        consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
+                .topic(workerConfig.getClusterCoordinationTopic())
+                .subscriptionName(COORDINATION_TOPIC_SUBSCRIPTION)
+                .subscriptionType(SubscriptionType.Failover)
+                .consumerEventListener(this)
+                .property(WORKER_IDENTIFIER, consumerName)
+                .consumerName(consumerName)
+                .subscribe();
+
+    }
+
+    @Override
+    public void becameActive(Consumer<?> consumer, int partitionId) {
+        if (isLeader.compareAndSet(false, true)) {
+            log.info("Worker {} became the leader.", consumerName);
+            try {
+                // trigger read to the end of the topic and exit
+                // Since the leader can just update its in memory assignments cache directly
+                functionRuntimeManager.stopReadingAssignments();

Review comment:
       Can you instead call functionRunTimeManager.acquireLeadership() and functionRunTimeManager.giveupLeadership()




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7237: Fix leader/scheduler assignment processing lag problem

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r439175022



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
##########
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerEventListener;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@Slf4j
+public class LeaderService implements AutoCloseable, ConsumerEventListener {
+
+    private final String consumerName;
+    private final FunctionAssignmentTailer functionAssignmentTailer;
+    private final ErrorNotifier errorNotifier;
+    private ConsumerImpl<byte[]> consumer;
+    private final WorkerConfig workerConfig;
+    private final PulsarClient pulsarClient;
+    private final AtomicBoolean isLeader = new AtomicBoolean(false);
+
+    static final String COORDINATION_TOPIC_SUBSCRIPTION = "participants";
+
+    private static String WORKER_IDENTIFIER = "id";
+    
+    public LeaderService(WorkerService workerService,
+                         PulsarClient pulsarClient,
+                         FunctionAssignmentTailer functionAssignmentTailer,
+                         ErrorNotifier errorNotifier) {
+        this.workerConfig = workerService.getWorkerConfig();
+        this.pulsarClient = pulsarClient;
+        this.functionAssignmentTailer = functionAssignmentTailer;
+        this.errorNotifier = errorNotifier;
+        consumerName = String.format(
+                "%s:%s:%d",
+                workerConfig.getWorkerId(),
+                workerConfig.getWorkerHostname(),
+                workerConfig.getWorkerPort()
+        );
+
+    }
+
+    public void start() throws PulsarClientException {
+        // the leaders service is using a `coordination` topic for leader election.
+        // we don't produce any messages into this topic, we only use the `failover` subscription
+        // to elect an active consumer as the leader worker. The leader worker will be responsible
+        // for scheduling snapshots for FMT and doing task assignment.
+        consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
+                .topic(workerConfig.getClusterCoordinationTopic())
+                .subscriptionName(COORDINATION_TOPIC_SUBSCRIPTION)
+                .subscriptionType(SubscriptionType.Failover)
+                .consumerEventListener(this)
+                .property(WORKER_IDENTIFIER, consumerName)
+                .subscribe();
+
+    }
+
+    @Override
+    public void becameActive(Consumer<?> consumer, int partitionId) {
+        if (isLeader.compareAndSet(false, true)) {
+            log.info("Worker {} became the leader.", consumerName);
+            try {
+                // trigger read to the end of the topic and exit
+                // Since the leader can just update its in memory assignments cache directly
+                functionAssignmentTailer.triggerReadToTheEndAndExit().get();

Review comment:
       This should be abstracted out from leaderservice to respective class(in this case functionruntimemanager)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7237: Fix leader/scheduler assignment processing lag problem

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r439266467



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
##########
@@ -70,33 +78,64 @@ public FunctionAssignmentTailer(
                             log.warn("Encountered error when assignment tailer is not running", th);
                         }
                     }
-
                 }
             }
+            log.info("tailer thread exiting...");
+            hasExited.complete(null);
         });
         this.tailerThread.setName("assignment-tailer-thread");
     }
 
-    public void start() {
-        isRunning = true;
-        tailerThread.start();
+    public CompletableFuture<Void> triggerReadToTheEndAndExit() {
+        exitOnEndOfTopic = true;
+        return this.hasExited;
+    }
+
+    public synchronized void start() throws PulsarClientException {
+        if (!isRunning) {
+            isRunning = true;
+            if (reader == null) {
+                reader = createReader();
+            }
+            tailerThread.start();
+        }
     }
+    
 
     @Override
-    public void close() {
-        log.info("Stopping function assignment tailer");
+    public synchronized void close() {
+        log.info("Closing function assignment tailer");
         try {
             isRunning = false;
-            if (tailerThread != null && tailerThread.isAlive()) {
-                tailerThread.interrupt();
-            }
+
+            if (tailerThread != null) {
+                while (true) {
+                    tailerThread.interrupt();
+
+                    try {
+                        tailerThread.join(5000, 0);
+                    } catch (InterruptedException e) {
+                        log.warn("Waiting for assignment tailer thread to stop is interrupted", e);
+                    }
+
+                    if (tailerThread.isAlive()) {
+                        log.warn("Assignment tailer thread is still alive.  Will attempt to interrupt again.");
+                    } else {
+                        break;
+                    }
+                }
+            }            
             if (reader != null) {
                 reader.close();
+                reader = null;
             }
+
+            hasExited = new CompletableFuture<>();

Review comment:
       it is re-initializing the variable, so if we can "start" again, the completeable future is not already completed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7237: Fix leader/scheduler assignment processing lag problem

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r439175663



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
##########
@@ -137,25 +165,35 @@ public SchedulerManager(WorkerConfig workerConfig, PulsarClient pulsarClient, Pu
     }
 
     public Future<?> schedule() {
-        return executorService.submit(() -> {
-            synchronized (SchedulerManager.this) {
-                boolean isLeader = membershipManager.isLeader();
-                if (isLeader) {
-                    try {
-                        invokeScheduler();
-                    } catch (Exception e) {
-                        log.warn("Failed to invoke scheduler", e);
-                        throw e;
+        try {

Review comment:
       I think we need to simplify this massively.
   I think part of the pr that I'm working on wrt metadata simplification will impact this as well.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7237: Fix leader/scheduler assignment processing lag problem

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r439267776



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
##########
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerEventListener;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@Slf4j
+public class LeaderService implements AutoCloseable, ConsumerEventListener {
+
+    private final String consumerName;
+    private final FunctionAssignmentTailer functionAssignmentTailer;
+    private final ErrorNotifier errorNotifier;
+    private ConsumerImpl<byte[]> consumer;
+    private final WorkerConfig workerConfig;
+    private final PulsarClient pulsarClient;
+    private final AtomicBoolean isLeader = new AtomicBoolean(false);
+
+    static final String COORDINATION_TOPIC_SUBSCRIPTION = "participants";
+
+    private static String WORKER_IDENTIFIER = "id";
+    
+    public LeaderService(WorkerService workerService,
+                         PulsarClient pulsarClient,
+                         FunctionAssignmentTailer functionAssignmentTailer,
+                         ErrorNotifier errorNotifier) {
+        this.workerConfig = workerService.getWorkerConfig();
+        this.pulsarClient = pulsarClient;
+        this.functionAssignmentTailer = functionAssignmentTailer;
+        this.errorNotifier = errorNotifier;
+        consumerName = String.format(
+                "%s:%s:%d",
+                workerConfig.getWorkerId(),
+                workerConfig.getWorkerHostname(),
+                workerConfig.getWorkerPort()
+        );
+
+    }
+
+    public void start() throws PulsarClientException {
+        // the leaders service is using a `coordination` topic for leader election.
+        // we don't produce any messages into this topic, we only use the `failover` subscription
+        // to elect an active consumer as the leader worker. The leader worker will be responsible
+        // for scheduling snapshots for FMT and doing task assignment.
+        consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
+                .topic(workerConfig.getClusterCoordinationTopic())
+                .subscriptionName(COORDINATION_TOPIC_SUBSCRIPTION)
+                .subscriptionType(SubscriptionType.Failover)
+                .consumerEventListener(this)
+                .property(WORKER_IDENTIFIER, consumerName)
+                .subscribe();
+
+    }
+
+    @Override
+    public void becameActive(Consumer<?> consumer, int partitionId) {
+        if (isLeader.compareAndSet(false, true)) {
+            log.info("Worker {} became the leader.", consumerName);
+            try {
+                // trigger read to the end of the topic and exit
+                // Since the leader can just update its in memory assignments cache directly
+                functionAssignmentTailer.triggerReadToTheEndAndExit().get();

Review comment:
       > This should be abstracted out from leaderservice to respective class(in this case functionruntimemanager)
   
   yup done
   
   > Also we need to create the producer here right?
   
   Why do we need to create a producer?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7237: Fix leader/scheduler assignment processing lag problem

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r442673058



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -99,16 +98,20 @@ public void initialize() {
                 this.functionMetaDataTopicTailer.processRequest(this.functionMetaDataTopicTailer.getReader().readNext());
             }
             this.setInitializePhase(false);
-            // schedule functions if necessary
-            this.schedulerManager.schedule();
-            // start function metadata tailer
-            this.functionMetaDataTopicTailer.start();
+            
 
         } catch (Exception e) {
             log.error("Failed to initialize meta data store", e);
             throw new RuntimeException(e);
         }
     }
+    
+    public void start() {

Review comment:
       because we cannot start prior to the SchedulerManager is setup since function metadata manager can invoke the scheduler.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni merged pull request #7237: Fix leader/scheduler assignment processing lag problem

Posted by GitBox <gi...@apache.org>.
srkukarni merged pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7237: Fix leader/scheduler assignment processing lag problem

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r442644009



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
##########
@@ -210,21 +210,25 @@ public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerSer
      * 2. After current assignments are read, assignments belonging to this worker will be processed
      */
     public void initialize() {
-        log.info("/** Initializing Runtime Manager **/");
         try {
-            this.functionAssignmentTailer = new FunctionAssignmentTailer(
-                    this,
-                    this.getWorkerService().getClient().newReader(),
-                    this.workerConfig,
-                    this.errorNotifier);
+

Review comment:
       I like that we are no longer using FunctionAssignmenttailer here. However maybe a static method that consolidates. this reader creation and the one in assignment tailer?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7237: Fix leader/scheduler assignment processing lag problem

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r441874257



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
##########
@@ -70,33 +78,64 @@ public FunctionAssignmentTailer(
                             log.warn("Encountered error when assignment tailer is not running", th);
                         }
                     }
-
                 }
             }
+            log.info("tailer thread exiting...");
+            hasExited.complete(null);
         });
         this.tailerThread.setName("assignment-tailer-thread");
     }
 
-    public void start() {
-        isRunning = true;
-        tailerThread.start();
+    public CompletableFuture<Void> triggerReadToTheEndAndExit() {
+        exitOnEndOfTopic = true;
+        return this.hasExited;
+    }
+
+    public synchronized void start() throws PulsarClientException {
+        if (!isRunning) {
+            isRunning = true;
+            if (reader == null) {
+                reader = createReader();
+            }
+            tailerThread.start();
+        }
     }
+    
 
     @Override
-    public void close() {
-        log.info("Stopping function assignment tailer");
+    public synchronized void close() {
+        log.info("Closing function assignment tailer");
         try {
             isRunning = false;
-            if (tailerThread != null && tailerThread.isAlive()) {
-                tailerThread.interrupt();
-            }
+
+            if (tailerThread != null) {
+                while (true) {
+                    tailerThread.interrupt();
+
+                    try {
+                        tailerThread.join(5000, 0);
+                    } catch (InterruptedException e) {
+                        log.warn("Waiting for assignment tailer thread to stop is interrupted", e);
+                    }
+
+                    if (tailerThread.isAlive()) {
+                        log.warn("Assignment tailer thread is still alive.  Will attempt to interrupt again.");
+                    } else {
+                        break;
+                    }
+                }
+            }            
             if (reader != null) {
                 reader.close();
+                reader = null;
             }
+
+            hasExited = new CompletableFuture<>();

Review comment:
       I just dislike creating new objects in something like close. Seems like not the usual pattern




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7237: Fix leader/scheduler assignment processing lag problem

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r442641790



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
##########
@@ -25,95 +25,157 @@
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
-import org.apache.pulsar.functions.proto.Function.Assignment;
 
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
+/**
+ * This class is responsible for reading assignments from the 'assignments' functions internal topic.
+ * Only functions worker leader writes to the topic while other workers read from the topic.
+ * When a worker become a leader, the worker will read to the end of the assignments topic and close its reader to the topic.
+ * Then the worker and new leader will be in charge of computing new assignments when necessary.
+ * The leader does not need to listen to the assignments topic because it can just update its in memory assignments map directly
+ * after it computes a new scheduling.  When a worker loses leadership, the worker is start reading from the assignments topic again.
+ */
 @Slf4j
 public class FunctionAssignmentTailer implements AutoCloseable {
 
     private final FunctionRuntimeManager functionRuntimeManager;
-    @Getter
-    private final Reader<byte[]> reader;
+    private final ReaderBuilder readerBuilder;
+    private final WorkerConfig workerConfig;
+    private final ErrorNotifier errorNotifier;
+    private Reader<byte[]> reader;
     private volatile boolean isRunning = false;
+    private volatile boolean exitOnEndOfTopic = false;
+    private CompletableFuture<Void> hasExited;
+    private Thread tailerThread;
 
-    private final Thread tailerThread;
+    @Getter
+    private MessageId lastMessageId = null;

Review comment:
       Shouldn' t we init this to MessageId.earliest?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7237: Fix leader/scheduler assignment processing lag problem

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r441874133



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
##########
@@ -70,33 +78,64 @@ public FunctionAssignmentTailer(
                             log.warn("Encountered error when assignment tailer is not running", th);
                         }
                     }
-
                 }
             }
+            log.info("tailer thread exiting...");
+            hasExited.complete(null);
         });
         this.tailerThread.setName("assignment-tailer-thread");
     }
 
-    public void start() {
-        isRunning = true;
-        tailerThread.start();
+    public CompletableFuture<Void> triggerReadToTheEndAndExit() {
+        exitOnEndOfTopic = true;
+        return this.hasExited;
+    }
+
+    public synchronized void start() throws PulsarClientException {
+        if (!isRunning) {
+            isRunning = true;
+            if (reader == null) {
+                reader = createReader();
+            }
+            tailerThread.start();
+        }
     }
+    
 
     @Override
-    public void close() {
-        log.info("Stopping function assignment tailer");
+    public synchronized void close() {
+        log.info("Closing function assignment tailer");
         try {
             isRunning = false;
-            if (tailerThread != null && tailerThread.isAlive()) {
-                tailerThread.interrupt();
-            }
+
+            if (tailerThread != null) {
+                while (true) {
+                    tailerThread.interrupt();
+
+                    try {
+                        tailerThread.join(5000, 0);
+                    } catch (InterruptedException e) {
+                        log.warn("Waiting for assignment tailer thread to stop is interrupted", e);
+                    }
+
+                    if (tailerThread.isAlive()) {
+                        log.warn("Assignment tailer thread is still alive.  Will attempt to interrupt again.");
+                    } else {
+                        break;
+                    }
+                }
+            }            
             if (reader != null) {
                 reader.close();
+                reader = null;
             }
+
+            hasExited = new CompletableFuture<>();

Review comment:
       Then maybe we can create this at start instead?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on pull request #7237: Fix leader/scheduler assignment processing lag problem

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#issuecomment-642300094


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7237: Fix leader/scheduler assignment processing lag problem

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r441886883



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
##########
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerEventListener;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@Slf4j
+public class LeaderService implements AutoCloseable, ConsumerEventListener {
+
+    private final String consumerName;
+    private final FunctionRuntimeManager functionRuntimeManager;
+    private final ErrorNotifier errorNotifier;
+    private final SchedulerManager schedulerManager;
+    private ConsumerImpl<byte[]> consumer;
+    private final WorkerConfig workerConfig;
+    private final PulsarClient pulsarClient;
+    private final AtomicBoolean isLeader = new AtomicBoolean(false);
+
+    static final String COORDINATION_TOPIC_SUBSCRIPTION = "participants";
+
+    private static String WORKER_IDENTIFIER = "id";
+    
+    public LeaderService(WorkerService workerService,
+                         PulsarClient pulsarClient,
+                         FunctionRuntimeManager functionRuntimeManager,
+                         SchedulerManager schedulerManager,
+                         ErrorNotifier errorNotifier) {
+        this.workerConfig = workerService.getWorkerConfig();
+        this.pulsarClient = pulsarClient;
+        this.functionRuntimeManager = functionRuntimeManager;
+        this.schedulerManager = schedulerManager;
+        this.errorNotifier = errorNotifier;
+        consumerName = String.format(
+                "%s:%s:%d",
+                workerConfig.getWorkerId(),
+                workerConfig.getWorkerHostname(),
+                workerConfig.getWorkerPort()
+        );
+
+    }
+
+    public void start() throws PulsarClientException {
+        // the leaders service is using a `coordination` topic for leader election.
+        // we don't produce any messages into this topic, we only use the `failover` subscription
+        // to elect an active consumer as the leader worker. The leader worker will be responsible
+        // for scheduling snapshots for FMT and doing task assignment.
+        consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
+                .topic(workerConfig.getClusterCoordinationTopic())
+                .subscriptionName(COORDINATION_TOPIC_SUBSCRIPTION)
+                .subscriptionType(SubscriptionType.Failover)
+                .consumerEventListener(this)
+                .property(WORKER_IDENTIFIER, consumerName)
+                .consumerName(consumerName)
+                .subscribe();
+
+    }
+
+    @Override
+    public void becameActive(Consumer<?> consumer, int partitionId) {
+        if (isLeader.compareAndSet(false, true)) {
+            log.info("Worker {} became the leader.", consumerName);
+            try {
+                // trigger read to the end of the topic and exit
+                // Since the leader can just update its in memory assignments cache directly
+                functionRuntimeManager.stopReadingAssignments();
+
+                // make sure scheduler is initialized because this worker
+                // is the leader and may need to start computing and writing assignments
+                schedulerManager.initialize();
+            } catch (Throwable th) {
+                log.error("Encountered error when initializing to become leader", th);
+                errorNotifier.triggerError(th);
+            }
+        }
+    }
+
+    @Override
+    public void becameInactive(Consumer<?> consumer, int partitionId) {
+        if (isLeader.compareAndSet(true, false)) {
+            log.info("Worker {} lost the leadership.", consumerName);
+            // when a worker has lost leadership it needs to start reading from the assignment topic again
+            try {
+                // acquire scheduler lock to make sure a scheduling is not in process
+                schedulerManager.getSchedulerLock().lock();

Review comment:
       I think the better way is to make scheduler aware of the leadership changes(just like runtime manger) and call acquireLeadership and giveupLeadership




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7237: Fix leader/scheduler assignment processing lag problem

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r441889584



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
##########
@@ -28,75 +29,105 @@
 import org.apache.pulsar.functions.proto.Function.Assignment;
 
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
+/**
+ * This class is responsible for reading assignments from the 'assignments' functions internal topic.
+ * Only functions worker leader writes to the topic while other workers read from the topic.
+ * When a worker become a leader, the worker will read to the end of the assignments topic and close its reader to the topic.
+ * Then the worker and new leader will be in charge of computing new assignments when necessary.
+ * The leader does not need to listen to the assignments topic because it can just update its in memory assignments map directly
+ * after it computes a new scheduling.  When a worker loses leadership, the worker is start reading from the assignments topic again.
+ */
 @Slf4j
 public class FunctionAssignmentTailer implements AutoCloseable {
 
     private final FunctionRuntimeManager functionRuntimeManager;
+    private final ReaderBuilder readerBuilder;
+    private final WorkerConfig workerConfig;
+    private final ErrorNotifier errorNotifier;
     @Getter
-    private final Reader<byte[]> reader;
+    private Reader<byte[]> reader;
     private volatile boolean isRunning = false;
+    private volatile boolean exitOnEndOfTopic = false;
+    private CompletableFuture<Void> hasExited;
 
-    private final Thread tailerThread;
+    private Thread tailerThread;
+
+    @Getter
+    @Setter
+    private MessageId lastMessageId = null;
     
     public FunctionAssignmentTailer(
             FunctionRuntimeManager functionRuntimeManager,
             ReaderBuilder readerBuilder,
             WorkerConfig workerConfig,
             ErrorNotifier errorNotifier) throws PulsarClientException {
         this.functionRuntimeManager = functionRuntimeManager;
+        this.hasExited = new CompletableFuture<>();
+        this.readerBuilder = readerBuilder;
+        this.workerConfig = workerConfig;
+        this.errorNotifier = errorNotifier;
+        this.reader = createReader();
         
-        this.reader = readerBuilder
-          .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-runtime-manager")
-          .readerName(workerConfig.getWorkerId() + "-function-runtime-manager")
-          .topic(workerConfig.getFunctionAssignmentTopic())
-          .readCompacted(true)
-          .startMessageId(MessageId.earliest)
-          .create();
-        
-        this.tailerThread = new Thread(() -> {
-            while(isRunning) {
-                try {
-                    Message<byte[]> msg = reader.readNext();
-                    processAssignment(msg);
-                } catch (Throwable th) {
-                    if (isRunning) {
-                        log.error("Encountered error in assignment tailer", th);
-                        // trigger fatal error
-                        isRunning = false;
-                        errorNotifier.triggerError(th);
-                    } else {
-                        if (!(th instanceof InterruptedException || th.getCause() instanceof InterruptedException)) {
-                            log.warn("Encountered error when assignment tailer is not running", th);
-                        }
-                    }
+        this.tailerThread = getTailerThread();

Review comment:
       sure




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7237: Fix leader/scheduler assignment processing lag problem

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r441913734



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
##########
@@ -130,6 +132,8 @@ public int size() {
     private final FunctionMetaDataManager functionMetaDataManager;
 
     private final ErrorNotifier errorNotifier;
+

Review comment:
       Its not being used and will remove




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7237: Fix leader/scheduler assignment processing lag problem

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r441887595



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
##########
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerEventListener;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@Slf4j
+public class LeaderService implements AutoCloseable, ConsumerEventListener {
+
+    private final String consumerName;
+    private final FunctionRuntimeManager functionRuntimeManager;
+    private final ErrorNotifier errorNotifier;
+    private final SchedulerManager schedulerManager;
+    private ConsumerImpl<byte[]> consumer;
+    private final WorkerConfig workerConfig;
+    private final PulsarClient pulsarClient;
+    private final AtomicBoolean isLeader = new AtomicBoolean(false);
+
+    static final String COORDINATION_TOPIC_SUBSCRIPTION = "participants";
+
+    private static String WORKER_IDENTIFIER = "id";
+    
+    public LeaderService(WorkerService workerService,
+                         PulsarClient pulsarClient,
+                         FunctionRuntimeManager functionRuntimeManager,
+                         SchedulerManager schedulerManager,
+                         ErrorNotifier errorNotifier) {
+        this.workerConfig = workerService.getWorkerConfig();
+        this.pulsarClient = pulsarClient;
+        this.functionRuntimeManager = functionRuntimeManager;
+        this.schedulerManager = schedulerManager;
+        this.errorNotifier = errorNotifier;
+        consumerName = String.format(
+                "%s:%s:%d",
+                workerConfig.getWorkerId(),
+                workerConfig.getWorkerHostname(),
+                workerConfig.getWorkerPort()
+        );
+
+    }
+
+    public void start() throws PulsarClientException {
+        // the leaders service is using a `coordination` topic for leader election.
+        // we don't produce any messages into this topic, we only use the `failover` subscription
+        // to elect an active consumer as the leader worker. The leader worker will be responsible
+        // for scheduling snapshots for FMT and doing task assignment.
+        consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
+                .topic(workerConfig.getClusterCoordinationTopic())
+                .subscriptionName(COORDINATION_TOPIC_SUBSCRIPTION)
+                .subscriptionType(SubscriptionType.Failover)
+                .consumerEventListener(this)
+                .property(WORKER_IDENTIFIER, consumerName)
+                .consumerName(consumerName)
+                .subscribe();
+
+    }
+
+    @Override
+    public void becameActive(Consumer<?> consumer, int partitionId) {
+        if (isLeader.compareAndSet(false, true)) {
+            log.info("Worker {} became the leader.", consumerName);
+            try {
+                // trigger read to the end of the topic and exit
+                // Since the leader can just update its in memory assignments cache directly
+                functionRuntimeManager.stopReadingAssignments();
+
+                // make sure scheduler is initialized because this worker
+                // is the leader and may need to start computing and writing assignments
+                schedulerManager.initialize();
+            } catch (Throwable th) {
+                log.error("Encountered error when initializing to become leader", th);
+                errorNotifier.triggerError(th);
+            }
+        }
+    }
+
+    @Override
+    public void becameInactive(Consumer<?> consumer, int partitionId) {
+        if (isLeader.compareAndSet(true, false)) {
+            log.info("Worker {} lost the leadership.", consumerName);
+            // when a worker has lost leadership it needs to start reading from the assignment topic again
+            try {
+                // acquire scheduler lock to make sure a scheduling is not in process
+                schedulerManager.getSchedulerLock().lock();

Review comment:
       This is a way to do that.  You will need synchronization somewhere and someone will have to wait




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7237: Fix leader/scheduler assignment processing lag problem

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r442666391



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
##########
@@ -210,21 +210,25 @@ public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerSer
      * 2. After current assignments are read, assignments belonging to this worker will be processed
      */
     public void initialize() {
-        log.info("/** Initializing Runtime Manager **/");
         try {
-            this.functionAssignmentTailer = new FunctionAssignmentTailer(
-                    this,
-                    this.getWorkerService().getClient().newReader(),
-                    this.workerConfig,
-                    this.errorNotifier);
+

Review comment:
       sure




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7237: Fix leader/scheduler assignment processing lag problem

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r442662116



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
##########
@@ -25,95 +25,157 @@
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
-import org.apache.pulsar.functions.proto.Function.Assignment;
 
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
+/**
+ * This class is responsible for reading assignments from the 'assignments' functions internal topic.
+ * Only functions worker leader writes to the topic while other workers read from the topic.
+ * When a worker become a leader, the worker will read to the end of the assignments topic and close its reader to the topic.
+ * Then the worker and new leader will be in charge of computing new assignments when necessary.
+ * The leader does not need to listen to the assignments topic because it can just update its in memory assignments map directly
+ * after it computes a new scheduling.  When a worker loses leadership, the worker is start reading from the assignments topic again.
+ */
 @Slf4j
 public class FunctionAssignmentTailer implements AutoCloseable {
 
     private final FunctionRuntimeManager functionRuntimeManager;
-    @Getter
-    private final Reader<byte[]> reader;
+    private final ReaderBuilder readerBuilder;
+    private final WorkerConfig workerConfig;
+    private final ErrorNotifier errorNotifier;
+    private Reader<byte[]> reader;
     private volatile boolean isRunning = false;
+    private volatile boolean exitOnEndOfTopic = false;
+    private CompletableFuture<Void> hasExited;

Review comment:
       sure




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7237: Fix leader/scheduler assignment processing lag problem

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r439175336



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
##########
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerEventListener;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@Slf4j
+public class LeaderService implements AutoCloseable, ConsumerEventListener {
+
+    private final String consumerName;
+    private final FunctionAssignmentTailer functionAssignmentTailer;
+    private final ErrorNotifier errorNotifier;
+    private ConsumerImpl<byte[]> consumer;
+    private final WorkerConfig workerConfig;
+    private final PulsarClient pulsarClient;
+    private final AtomicBoolean isLeader = new AtomicBoolean(false);
+
+    static final String COORDINATION_TOPIC_SUBSCRIPTION = "participants";
+
+    private static String WORKER_IDENTIFIER = "id";
+    
+    public LeaderService(WorkerService workerService,
+                         PulsarClient pulsarClient,
+                         FunctionAssignmentTailer functionAssignmentTailer,
+                         ErrorNotifier errorNotifier) {
+        this.workerConfig = workerService.getWorkerConfig();
+        this.pulsarClient = pulsarClient;
+        this.functionAssignmentTailer = functionAssignmentTailer;
+        this.errorNotifier = errorNotifier;
+        consumerName = String.format(
+                "%s:%s:%d",
+                workerConfig.getWorkerId(),
+                workerConfig.getWorkerHostname(),
+                workerConfig.getWorkerPort()
+        );
+
+    }
+
+    public void start() throws PulsarClientException {
+        // the leaders service is using a `coordination` topic for leader election.
+        // we don't produce any messages into this topic, we only use the `failover` subscription
+        // to elect an active consumer as the leader worker. The leader worker will be responsible
+        // for scheduling snapshots for FMT and doing task assignment.
+        consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
+                .topic(workerConfig.getClusterCoordinationTopic())
+                .subscriptionName(COORDINATION_TOPIC_SUBSCRIPTION)
+                .subscriptionType(SubscriptionType.Failover)
+                .consumerEventListener(this)
+                .property(WORKER_IDENTIFIER, consumerName)
+                .subscribe();
+
+    }
+
+    @Override
+    public void becameActive(Consumer<?> consumer, int partitionId) {
+        if (isLeader.compareAndSet(false, true)) {
+            log.info("Worker {} became the leader.", consumerName);
+            try {
+                // trigger read to the end of the topic and exit
+                // Since the leader can just update its in memory assignments cache directly
+                functionAssignmentTailer.triggerReadToTheEndAndExit().get();

Review comment:
       Also we need to create the producer here right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7237: Fix leader/scheduler assignment processing lag problem

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r439866513



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
##########
@@ -70,33 +78,64 @@ public FunctionAssignmentTailer(
                             log.warn("Encountered error when assignment tailer is not running", th);
                         }
                     }
-
                 }
             }
+            log.info("tailer thread exiting...");
+            hasExited.complete(null);
         });
         this.tailerThread.setName("assignment-tailer-thread");
     }
 
-    public void start() {
-        isRunning = true;
-        tailerThread.start();
+    public CompletableFuture<Void> triggerReadToTheEndAndExit() {
+        exitOnEndOfTopic = true;
+        return this.hasExited;
+    }
+
+    public synchronized void start() throws PulsarClientException {
+        if (!isRunning) {
+            isRunning = true;
+            if (reader == null) {
+                reader = createReader();
+            }
+            tailerThread.start();
+        }
     }
+    
 
     @Override
-    public void close() {
-        log.info("Stopping function assignment tailer");
+    public synchronized void close() {
+        log.info("Closing function assignment tailer");
         try {
             isRunning = false;
-            if (tailerThread != null && tailerThread.isAlive()) {
-                tailerThread.interrupt();
-            }
+
+            if (tailerThread != null) {
+                while (true) {
+                    tailerThread.interrupt();
+
+                    try {
+                        tailerThread.join(5000, 0);
+                    } catch (InterruptedException e) {
+                        log.warn("Waiting for assignment tailer thread to stop is interrupted", e);
+                    }
+
+                    if (tailerThread.isAlive()) {
+                        log.warn("Assignment tailer thread is still alive.  Will attempt to interrupt again.");
+                    } else {
+                        break;
+                    }
+                }
+            }            
             if (reader != null) {
                 reader.close();
+                reader = null;
             }
+
+            hasExited = new CompletableFuture<>();

Review comment:
       Do you think its better to recreate the object? That way this re-create logic becomes simpler




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7237: Fix leader/scheduler assignment processing lag problem

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r439699813



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
##########
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerEventListener;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@Slf4j
+public class LeaderService implements AutoCloseable, ConsumerEventListener {
+
+    private final String consumerName;
+    private final FunctionAssignmentTailer functionAssignmentTailer;
+    private final ErrorNotifier errorNotifier;
+    private ConsumerImpl<byte[]> consumer;
+    private final WorkerConfig workerConfig;
+    private final PulsarClient pulsarClient;
+    private final AtomicBoolean isLeader = new AtomicBoolean(false);
+
+    static final String COORDINATION_TOPIC_SUBSCRIPTION = "participants";
+
+    private static String WORKER_IDENTIFIER = "id";
+    
+    public LeaderService(WorkerService workerService,
+                         PulsarClient pulsarClient,
+                         FunctionAssignmentTailer functionAssignmentTailer,
+                         ErrorNotifier errorNotifier) {
+        this.workerConfig = workerService.getWorkerConfig();
+        this.pulsarClient = pulsarClient;
+        this.functionAssignmentTailer = functionAssignmentTailer;
+        this.errorNotifier = errorNotifier;
+        consumerName = String.format(
+                "%s:%s:%d",
+                workerConfig.getWorkerId(),
+                workerConfig.getWorkerHostname(),
+                workerConfig.getWorkerPort()
+        );
+
+    }
+
+    public void start() throws PulsarClientException {
+        // the leaders service is using a `coordination` topic for leader election.
+        // we don't produce any messages into this topic, we only use the `failover` subscription
+        // to elect an active consumer as the leader worker. The leader worker will be responsible
+        // for scheduling snapshots for FMT and doing task assignment.
+        consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
+                .topic(workerConfig.getClusterCoordinationTopic())
+                .subscriptionName(COORDINATION_TOPIC_SUBSCRIPTION)
+                .subscriptionType(SubscriptionType.Failover)
+                .consumerEventListener(this)
+                .property(WORKER_IDENTIFIER, consumerName)
+                .subscribe();
+
+    }
+
+    @Override
+    public void becameActive(Consumer<?> consumer, int partitionId) {
+        if (isLeader.compareAndSet(false, true)) {
+            log.info("Worker {} became the leader.", consumerName);
+            try {
+                // trigger read to the end of the topic and exit
+                // Since the leader can just update its in memory assignments cache directly
+                functionAssignmentTailer.triggerReadToTheEndAndExit().get();

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7237: Fix leader/scheduler assignment processing lag problem

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r439866577



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
##########
@@ -116,4 +147,51 @@ public void processAssignment(Message<byte[]> msg) {
             this.functionRuntimeManager.processAssignment(assignment);
         }
     }
+    
+    private Reader<byte[]> createReader() throws PulsarClientException {
+        MessageId startMessageId = lastMessageId == null ? MessageId.earliest : lastMessageId;
+        log.info("Assignment tailer will start reading from message id {}", startMessageId);
+
+        return readerBuilder
+                .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-assignment-tailer")
+                .readerName(workerConfig.getWorkerId() + "-function-assignment-tailer")
+                .topic(workerConfig.getFunctionAssignmentTopic())
+                .readCompacted(true)
+                .startMessageId(startMessageId)
+                .create();
+    }
+
+    private Thread getTailerThread() {
+        Thread t = new Thread(() -> {
+            while (isRunning) {
+                try {
+                    Message<byte[]> msg = reader.readNext(5, TimeUnit.SECONDS);
+                    if (msg == null) {
+                        if (exitOnEndOfTopic && !reader.hasMessageAvailable()) {
+                            break;
+                        }
+                    } else {
+                        processAssignment(msg);
+                        // keep track of the last message read
+                        lastMessageId = msg.getMessageId();
+                    }
+                } catch (Throwable th) {
+                    if (isRunning) {

Review comment:
       should we check for exitOnEndofTopic as well?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7237: Fix leader/scheduler assignment processing lag problem

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r441873583



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
##########
@@ -28,75 +29,105 @@
 import org.apache.pulsar.functions.proto.Function.Assignment;
 
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
+/**
+ * This class is responsible for reading assignments from the 'assignments' functions internal topic.
+ * Only functions worker leader writes to the topic while other workers read from the topic.
+ * When a worker become a leader, the worker will read to the end of the assignments topic and close its reader to the topic.
+ * Then the worker and new leader will be in charge of computing new assignments when necessary.
+ * The leader does not need to listen to the assignments topic because it can just update its in memory assignments map directly
+ * after it computes a new scheduling.  When a worker loses leadership, the worker is start reading from the assignments topic again.
+ */
 @Slf4j
 public class FunctionAssignmentTailer implements AutoCloseable {
 
     private final FunctionRuntimeManager functionRuntimeManager;
+    private final ReaderBuilder readerBuilder;
+    private final WorkerConfig workerConfig;
+    private final ErrorNotifier errorNotifier;
     @Getter
-    private final Reader<byte[]> reader;
+    private Reader<byte[]> reader;
     private volatile boolean isRunning = false;
+    private volatile boolean exitOnEndOfTopic = false;
+    private CompletableFuture<Void> hasExited;
 
-    private final Thread tailerThread;
+    private Thread tailerThread;
+
+    @Getter
+    @Setter
+    private MessageId lastMessageId = null;
     
     public FunctionAssignmentTailer(
             FunctionRuntimeManager functionRuntimeManager,
             ReaderBuilder readerBuilder,
             WorkerConfig workerConfig,
             ErrorNotifier errorNotifier) throws PulsarClientException {
         this.functionRuntimeManager = functionRuntimeManager;
+        this.hasExited = new CompletableFuture<>();
+        this.readerBuilder = readerBuilder;
+        this.workerConfig = workerConfig;
+        this.errorNotifier = errorNotifier;
+        this.reader = createReader();
         
-        this.reader = readerBuilder
-          .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-runtime-manager")
-          .readerName(workerConfig.getWorkerId() + "-function-runtime-manager")
-          .topic(workerConfig.getFunctionAssignmentTopic())
-          .readCompacted(true)
-          .startMessageId(MessageId.earliest)
-          .create();
-        
-        this.tailerThread = new Thread(() -> {
-            while(isRunning) {
-                try {
-                    Message<byte[]> msg = reader.readNext();
-                    processAssignment(msg);
-                } catch (Throwable th) {
-                    if (isRunning) {
-                        log.error("Encountered error in assignment tailer", th);
-                        // trigger fatal error
-                        isRunning = false;
-                        errorNotifier.triggerError(th);
-                    } else {
-                        if (!(th instanceof InterruptedException || th.getCause() instanceof InterruptedException)) {
-                            log.warn("Encountered error when assignment tailer is not running", th);
-                        }
-                    }
+        this.tailerThread = getTailerThread();

Review comment:
       maybe defer this till start?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7237: Fix leader/scheduler assignment processing lag problem

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r439266943



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
##########
@@ -137,25 +165,35 @@ public SchedulerManager(WorkerConfig workerConfig, PulsarClient pulsarClient, Pu
     }
 
     public Future<?> schedule() {
-        return executorService.submit(() -> {
-            synchronized (SchedulerManager.this) {
-                boolean isLeader = membershipManager.isLeader();
-                if (isLeader) {
-                    try {
-                        invokeScheduler();
-                    } catch (Exception e) {
-                        log.warn("Failed to invoke scheduler", e);
-                        throw e;
+        try {

Review comment:
       What are you thinking?  What is the complexity here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7237: Fix leader/scheduler assignment processing lag problem

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r439174636



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
##########
@@ -70,33 +78,64 @@ public FunctionAssignmentTailer(
                             log.warn("Encountered error when assignment tailer is not running", th);
                         }
                     }
-
                 }
             }
+            log.info("tailer thread exiting...");
+            hasExited.complete(null);
         });
         this.tailerThread.setName("assignment-tailer-thread");
     }
 
-    public void start() {
-        isRunning = true;
-        tailerThread.start();
+    public CompletableFuture<Void> triggerReadToTheEndAndExit() {
+        exitOnEndOfTopic = true;
+        return this.hasExited;
+    }
+
+    public synchronized void start() throws PulsarClientException {
+        if (!isRunning) {
+            isRunning = true;
+            if (reader == null) {
+                reader = createReader();
+            }
+            tailerThread.start();
+        }
     }
+    
 
     @Override
-    public void close() {
-        log.info("Stopping function assignment tailer");
+    public synchronized void close() {
+        log.info("Closing function assignment tailer");
         try {
             isRunning = false;
-            if (tailerThread != null && tailerThread.isAlive()) {
-                tailerThread.interrupt();
-            }
+
+            if (tailerThread != null) {
+                while (true) {
+                    tailerThread.interrupt();
+
+                    try {
+                        tailerThread.join(5000, 0);
+                    } catch (InterruptedException e) {
+                        log.warn("Waiting for assignment tailer thread to stop is interrupted", e);
+                    }
+
+                    if (tailerThread.isAlive()) {
+                        log.warn("Assignment tailer thread is still alive.  Will attempt to interrupt again.");
+                    } else {
+                        break;
+                    }
+                }
+            }            
             if (reader != null) {
                 reader.close();
+                reader = null;
             }
+
+            hasExited = new CompletableFuture<>();

Review comment:
       this isn't right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7237: Fix leader/scheduler assignment processing lag problem

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r442642430



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
##########
@@ -25,95 +25,157 @@
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
-import org.apache.pulsar.functions.proto.Function.Assignment;
 
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
+/**
+ * This class is responsible for reading assignments from the 'assignments' functions internal topic.
+ * Only functions worker leader writes to the topic while other workers read from the topic.
+ * When a worker become a leader, the worker will read to the end of the assignments topic and close its reader to the topic.
+ * Then the worker and new leader will be in charge of computing new assignments when necessary.
+ * The leader does not need to listen to the assignments topic because it can just update its in memory assignments map directly
+ * after it computes a new scheduling.  When a worker loses leadership, the worker is start reading from the assignments topic again.
+ */
 @Slf4j
 public class FunctionAssignmentTailer implements AutoCloseable {
 
     private final FunctionRuntimeManager functionRuntimeManager;
-    @Getter
-    private final Reader<byte[]> reader;
+    private final ReaderBuilder readerBuilder;
+    private final WorkerConfig workerConfig;
+    private final ErrorNotifier errorNotifier;
+    private Reader<byte[]> reader;
     private volatile boolean isRunning = false;
+    private volatile boolean exitOnEndOfTopic = false;
+    private CompletableFuture<Void> hasExited;
+    private Thread tailerThread;
 
-    private final Thread tailerThread;
+    @Getter
+    private MessageId lastMessageId = null;
     
     public FunctionAssignmentTailer(
             FunctionRuntimeManager functionRuntimeManager,
             ReaderBuilder readerBuilder,
             WorkerConfig workerConfig,
-            ErrorNotifier errorNotifier) throws PulsarClientException {
+            ErrorNotifier errorNotifier) {
         this.functionRuntimeManager = functionRuntimeManager;
-        
-        this.reader = readerBuilder
-          .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-runtime-manager")
-          .readerName(workerConfig.getWorkerId() + "-function-runtime-manager")
-          .topic(workerConfig.getFunctionAssignmentTopic())
-          .readCompacted(true)
-          .startMessageId(MessageId.earliest)
-          .create();
-        
-        this.tailerThread = new Thread(() -> {
-            while(isRunning) {
-                try {
-                    Message<byte[]> msg = reader.readNext();
-                    processAssignment(msg);
-                } catch (Throwable th) {
-                    if (isRunning) {
-                        log.error("Encountered error in assignment tailer", th);
-                        // trigger fatal error
-                        isRunning = false;
-                        errorNotifier.triggerError(th);
-                    } else {
-                        if (!(th instanceof InterruptedException || th.getCause() instanceof InterruptedException)) {
-                            log.warn("Encountered error when assignment tailer is not running", th);
-                        }
-                    }
+        this.hasExited = new CompletableFuture<>();
+        this.readerBuilder = readerBuilder;
+        this.workerConfig = workerConfig;
+        this.errorNotifier = errorNotifier;
+    }
 
-                }
+    public synchronized CompletableFuture<Void> triggerReadToTheEndAndExit() {
+        exitOnEndOfTopic = true;
+        return this.hasExited;
+    }
+
+    public void startFromMessage(MessageId startMessageId) throws PulsarClientException {
+        log.info("Function assignment tailer start reading from topic {} at {}",
+                workerConfig.getFunctionAssignmentTopic(), startMessageId);
+        if (!isRunning) {
+            isRunning = true;
+            if (reader == null) {
+                reader = createReader(startMessageId);
             }
-        });
-        this.tailerThread.setName("assignment-tailer-thread");
+            if (tailerThread == null || !tailerThread.isAlive()) {
+                tailerThread = getTailerThread();
+            }
+            hasExited = new CompletableFuture<>();
+            tailerThread.start();
+        }
     }
 
-    public void start() {
-        isRunning = true;
-        tailerThread.start();
+    public synchronized void start() throws PulsarClientException {

Review comment:
       I think its cleaner to consolidate this and above method to start(MessageId) { ... }




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7237: Fix leader/scheduler assignment processing lag problem

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r442659613



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
##########
@@ -25,95 +25,157 @@
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
-import org.apache.pulsar.functions.proto.Function.Assignment;
 
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
+/**
+ * This class is responsible for reading assignments from the 'assignments' functions internal topic.
+ * Only functions worker leader writes to the topic while other workers read from the topic.
+ * When a worker become a leader, the worker will read to the end of the assignments topic and close its reader to the topic.
+ * Then the worker and new leader will be in charge of computing new assignments when necessary.
+ * The leader does not need to listen to the assignments topic because it can just update its in memory assignments map directly
+ * after it computes a new scheduling.  When a worker loses leadership, the worker is start reading from the assignments topic again.
+ */
 @Slf4j
 public class FunctionAssignmentTailer implements AutoCloseable {
 
     private final FunctionRuntimeManager functionRuntimeManager;
-    @Getter
-    private final Reader<byte[]> reader;
+    private final ReaderBuilder readerBuilder;
+    private final WorkerConfig workerConfig;
+    private final ErrorNotifier errorNotifier;
+    private Reader<byte[]> reader;
     private volatile boolean isRunning = false;
+    private volatile boolean exitOnEndOfTopic = false;
+    private CompletableFuture<Void> hasExited;
+    private Thread tailerThread;
 
-    private final Thread tailerThread;
+    @Getter
+    private MessageId lastMessageId = null;
     
     public FunctionAssignmentTailer(
             FunctionRuntimeManager functionRuntimeManager,
             ReaderBuilder readerBuilder,
             WorkerConfig workerConfig,
-            ErrorNotifier errorNotifier) throws PulsarClientException {
+            ErrorNotifier errorNotifier) {
         this.functionRuntimeManager = functionRuntimeManager;
-        
-        this.reader = readerBuilder
-          .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-runtime-manager")
-          .readerName(workerConfig.getWorkerId() + "-function-runtime-manager")
-          .topic(workerConfig.getFunctionAssignmentTopic())
-          .readCompacted(true)
-          .startMessageId(MessageId.earliest)
-          .create();
-        
-        this.tailerThread = new Thread(() -> {
-            while(isRunning) {
-                try {
-                    Message<byte[]> msg = reader.readNext();
-                    processAssignment(msg);
-                } catch (Throwable th) {
-                    if (isRunning) {
-                        log.error("Encountered error in assignment tailer", th);
-                        // trigger fatal error
-                        isRunning = false;
-                        errorNotifier.triggerError(th);
-                    } else {
-                        if (!(th instanceof InterruptedException || th.getCause() instanceof InterruptedException)) {
-                            log.warn("Encountered error when assignment tailer is not running", th);
-                        }
-                    }
+        this.hasExited = new CompletableFuture<>();
+        this.readerBuilder = readerBuilder;
+        this.workerConfig = workerConfig;
+        this.errorNotifier = errorNotifier;
+    }
 
-                }
+    public synchronized CompletableFuture<Void> triggerReadToTheEndAndExit() {
+        exitOnEndOfTopic = true;
+        return this.hasExited;
+    }
+
+    public void startFromMessage(MessageId startMessageId) throws PulsarClientException {
+        log.info("Function assignment tailer start reading from topic {} at {}",
+                workerConfig.getFunctionAssignmentTopic(), startMessageId);
+        if (!isRunning) {
+            isRunning = true;
+            if (reader == null) {
+                reader = createReader(startMessageId);
             }
-        });
-        this.tailerThread.setName("assignment-tailer-thread");
+            if (tailerThread == null || !tailerThread.isAlive()) {
+                tailerThread = getTailerThread();
+            }
+            hasExited = new CompletableFuture<>();
+            tailerThread.start();
+        }
     }
 
-    public void start() {
-        isRunning = true;
-        tailerThread.start();
+    public synchronized void start() throws PulsarClientException {

Review comment:
       > I also think that some logic will be simpler if we create Tailer object every time we go thru leadership transition
   
   That is not correct.  The functionAssignmentTailer is also responsible for keeping track of a message id.  If a worker becomes a leader and then loses leadership prior to creating any assignments, we shouldn't just start reading the assignment topic from the beginning.  We should resume from the message id stored in the functionAssignmentTailer 

##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
##########
@@ -25,95 +25,157 @@
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
-import org.apache.pulsar.functions.proto.Function.Assignment;
 
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
+/**
+ * This class is responsible for reading assignments from the 'assignments' functions internal topic.
+ * Only functions worker leader writes to the topic while other workers read from the topic.
+ * When a worker become a leader, the worker will read to the end of the assignments topic and close its reader to the topic.
+ * Then the worker and new leader will be in charge of computing new assignments when necessary.
+ * The leader does not need to listen to the assignments topic because it can just update its in memory assignments map directly
+ * after it computes a new scheduling.  When a worker loses leadership, the worker is start reading from the assignments topic again.
+ */
 @Slf4j
 public class FunctionAssignmentTailer implements AutoCloseable {
 
     private final FunctionRuntimeManager functionRuntimeManager;
-    @Getter
-    private final Reader<byte[]> reader;
+    private final ReaderBuilder readerBuilder;
+    private final WorkerConfig workerConfig;
+    private final ErrorNotifier errorNotifier;
+    private Reader<byte[]> reader;
     private volatile boolean isRunning = false;
+    private volatile boolean exitOnEndOfTopic = false;
+    private CompletableFuture<Void> hasExited;
+    private Thread tailerThread;
 
-    private final Thread tailerThread;
+    @Getter
+    private MessageId lastMessageId = null;

Review comment:
       No




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7237: Fix leader/scheduler assignment processing lag problem

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r439937515



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
##########
@@ -70,33 +78,64 @@ public FunctionAssignmentTailer(
                             log.warn("Encountered error when assignment tailer is not running", th);
                         }
                     }
-
                 }
             }
+            log.info("tailer thread exiting...");
+            hasExited.complete(null);
         });
         this.tailerThread.setName("assignment-tailer-thread");
     }
 
-    public void start() {
-        isRunning = true;
-        tailerThread.start();
+    public CompletableFuture<Void> triggerReadToTheEndAndExit() {
+        exitOnEndOfTopic = true;
+        return this.hasExited;
+    }
+
+    public synchronized void start() throws PulsarClientException {
+        if (!isRunning) {
+            isRunning = true;
+            if (reader == null) {
+                reader = createReader();
+            }
+            tailerThread.start();
+        }
     }
+    
 
     @Override
-    public void close() {
-        log.info("Stopping function assignment tailer");
+    public synchronized void close() {
+        log.info("Closing function assignment tailer");
         try {
             isRunning = false;
-            if (tailerThread != null && tailerThread.isAlive()) {
-                tailerThread.interrupt();
-            }
+
+            if (tailerThread != null) {
+                while (true) {
+                    tailerThread.interrupt();
+
+                    try {
+                        tailerThread.join(5000, 0);
+                    } catch (InterruptedException e) {
+                        log.warn("Waiting for assignment tailer thread to stop is interrupted", e);
+                    }
+
+                    if (tailerThread.isAlive()) {
+                        log.warn("Assignment tailer thread is still alive.  Will attempt to interrupt again.");
+                    } else {
+                        break;
+                    }
+                }
+            }            
             if (reader != null) {
                 reader.close();
+                reader = null;
             }
+
+            hasExited = new CompletableFuture<>();

Review comment:
       Creating a new FunctionAssignmentTailer doesn't really simplify the logic much.  "hasExited" is needed regardless of whether we recreate the object from scratch or not.  We are also keeping the track of the "lastMessageId" in FunctionAssignmentTailer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7237: Fix leader/scheduler assignment processing lag problem

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r441888124



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
##########
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerEventListener;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@Slf4j
+public class LeaderService implements AutoCloseable, ConsumerEventListener {
+
+    private final String consumerName;
+    private final FunctionRuntimeManager functionRuntimeManager;
+    private final ErrorNotifier errorNotifier;
+    private final SchedulerManager schedulerManager;
+    private ConsumerImpl<byte[]> consumer;
+    private final WorkerConfig workerConfig;
+    private final PulsarClient pulsarClient;
+    private final AtomicBoolean isLeader = new AtomicBoolean(false);
+
+    static final String COORDINATION_TOPIC_SUBSCRIPTION = "participants";
+
+    private static String WORKER_IDENTIFIER = "id";
+    
+    public LeaderService(WorkerService workerService,
+                         PulsarClient pulsarClient,
+                         FunctionRuntimeManager functionRuntimeManager,
+                         SchedulerManager schedulerManager,
+                         ErrorNotifier errorNotifier) {
+        this.workerConfig = workerService.getWorkerConfig();
+        this.pulsarClient = pulsarClient;
+        this.functionRuntimeManager = functionRuntimeManager;
+        this.schedulerManager = schedulerManager;
+        this.errorNotifier = errorNotifier;
+        consumerName = String.format(
+                "%s:%s:%d",
+                workerConfig.getWorkerId(),
+                workerConfig.getWorkerHostname(),
+                workerConfig.getWorkerPort()
+        );
+
+    }
+
+    public void start() throws PulsarClientException {
+        // the leaders service is using a `coordination` topic for leader election.
+        // we don't produce any messages into this topic, we only use the `failover` subscription
+        // to elect an active consumer as the leader worker. The leader worker will be responsible
+        // for scheduling snapshots for FMT and doing task assignment.
+        consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
+                .topic(workerConfig.getClusterCoordinationTopic())
+                .subscriptionName(COORDINATION_TOPIC_SUBSCRIPTION)
+                .subscriptionType(SubscriptionType.Failover)
+                .consumerEventListener(this)
+                .property(WORKER_IDENTIFIER, consumerName)
+                .consumerName(consumerName)
+                .subscribe();
+
+    }
+
+    @Override
+    public void becameActive(Consumer<?> consumer, int partitionId) {
+        if (isLeader.compareAndSet(false, true)) {
+            log.info("Worker {} became the leader.", consumerName);
+            try {
+                // trigger read to the end of the topic and exit
+                // Since the leader can just update its in memory assignments cache directly
+                functionRuntimeManager.stopReadingAssignments();

Review comment:
       doesn't make to call "schedulerManager.initialize();" there or add the SchedulerManager as a dependency in FunctionRuntimeManager just for this




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7237: Fix leader/scheduler assignment processing lag problem

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r439939070



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
##########
@@ -116,4 +147,51 @@ public void processAssignment(Message<byte[]> msg) {
             this.functionRuntimeManager.processAssignment(assignment);
         }
     }
+    
+    private Reader<byte[]> createReader() throws PulsarClientException {
+        MessageId startMessageId = lastMessageId == null ? MessageId.earliest : lastMessageId;
+        log.info("Assignment tailer will start reading from message id {}", startMessageId);
+
+        return readerBuilder
+                .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-assignment-tailer")
+                .readerName(workerConfig.getWorkerId() + "-function-assignment-tailer")
+                .topic(workerConfig.getFunctionAssignmentTopic())
+                .readCompacted(true)
+                .startMessageId(startMessageId)
+                .create();
+    }
+
+    private Thread getTailerThread() {
+        Thread t = new Thread(() -> {
+            while (isRunning) {
+                try {
+                    Message<byte[]> msg = reader.readNext(5, TimeUnit.SECONDS);
+                    if (msg == null) {
+                        if (exitOnEndOfTopic && !reader.hasMessageAvailable()) {
+                            break;
+                        }
+                    } else {
+                        processAssignment(msg);
+                        // keep track of the last message read
+                        lastMessageId = msg.getMessageId();
+                    }
+                } catch (Throwable th) {
+                    if (isRunning) {

Review comment:
       I don't think we need to since even if "exitOnEndOfTopic" is set ,"isRunning" will still be set to true and any error will be bubbled up as expected 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7237: Fix leader/scheduler assignment processing lag problem

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r442643515



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -99,16 +98,20 @@ public void initialize() {
                 this.functionMetaDataTopicTailer.processRequest(this.functionMetaDataTopicTailer.getReader().readNext());
             }
             this.setInitializePhase(false);
-            // schedule functions if necessary
-            this.schedulerManager.schedule();
-            // start function metadata tailer
-            this.functionMetaDataTopicTailer.start();
+            
 
         } catch (Exception e) {
             log.error("Failed to initialize meta data store", e);
             throw new RuntimeException(e);
         }
     }
+    
+    public void start() {

Review comment:
       initialize and start? what cannot be done during constructor?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7237: Fix leader/scheduler assignment processing lag problem

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r442642745



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
##########
@@ -25,95 +25,157 @@
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
-import org.apache.pulsar.functions.proto.Function.Assignment;
 
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
+/**
+ * This class is responsible for reading assignments from the 'assignments' functions internal topic.
+ * Only functions worker leader writes to the topic while other workers read from the topic.
+ * When a worker become a leader, the worker will read to the end of the assignments topic and close its reader to the topic.
+ * Then the worker and new leader will be in charge of computing new assignments when necessary.
+ * The leader does not need to listen to the assignments topic because it can just update its in memory assignments map directly
+ * after it computes a new scheduling.  When a worker loses leadership, the worker is start reading from the assignments topic again.
+ */
 @Slf4j
 public class FunctionAssignmentTailer implements AutoCloseable {
 
     private final FunctionRuntimeManager functionRuntimeManager;
-    @Getter
-    private final Reader<byte[]> reader;
+    private final ReaderBuilder readerBuilder;
+    private final WorkerConfig workerConfig;
+    private final ErrorNotifier errorNotifier;
+    private Reader<byte[]> reader;
     private volatile boolean isRunning = false;
+    private volatile boolean exitOnEndOfTopic = false;
+    private CompletableFuture<Void> hasExited;
+    private Thread tailerThread;
 
-    private final Thread tailerThread;
+    @Getter
+    private MessageId lastMessageId = null;
     
     public FunctionAssignmentTailer(
             FunctionRuntimeManager functionRuntimeManager,
             ReaderBuilder readerBuilder,
             WorkerConfig workerConfig,
-            ErrorNotifier errorNotifier) throws PulsarClientException {
+            ErrorNotifier errorNotifier) {
         this.functionRuntimeManager = functionRuntimeManager;
-        
-        this.reader = readerBuilder
-          .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-runtime-manager")
-          .readerName(workerConfig.getWorkerId() + "-function-runtime-manager")
-          .topic(workerConfig.getFunctionAssignmentTopic())
-          .readCompacted(true)
-          .startMessageId(MessageId.earliest)
-          .create();
-        
-        this.tailerThread = new Thread(() -> {
-            while(isRunning) {
-                try {
-                    Message<byte[]> msg = reader.readNext();
-                    processAssignment(msg);
-                } catch (Throwable th) {
-                    if (isRunning) {
-                        log.error("Encountered error in assignment tailer", th);
-                        // trigger fatal error
-                        isRunning = false;
-                        errorNotifier.triggerError(th);
-                    } else {
-                        if (!(th instanceof InterruptedException || th.getCause() instanceof InterruptedException)) {
-                            log.warn("Encountered error when assignment tailer is not running", th);
-                        }
-                    }
+        this.hasExited = new CompletableFuture<>();
+        this.readerBuilder = readerBuilder;
+        this.workerConfig = workerConfig;
+        this.errorNotifier = errorNotifier;
+    }
 
-                }
+    public synchronized CompletableFuture<Void> triggerReadToTheEndAndExit() {
+        exitOnEndOfTopic = true;
+        return this.hasExited;
+    }
+
+    public void startFromMessage(MessageId startMessageId) throws PulsarClientException {
+        log.info("Function assignment tailer start reading from topic {} at {}",
+                workerConfig.getFunctionAssignmentTopic(), startMessageId);
+        if (!isRunning) {
+            isRunning = true;
+            if (reader == null) {
+                reader = createReader(startMessageId);
             }
-        });
-        this.tailerThread.setName("assignment-tailer-thread");
+            if (tailerThread == null || !tailerThread.isAlive()) {
+                tailerThread = getTailerThread();
+            }
+            hasExited = new CompletableFuture<>();
+            tailerThread.start();
+        }
     }
 
-    public void start() {
-        isRunning = true;
-        tailerThread.start();
+    public synchronized void start() throws PulsarClientException {

Review comment:
       I also think that some logic will be simpler if we create Tailer object every time we go thru leadership transistion




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7237: Fix leader/scheduler assignment processing lag problem

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r442643303



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
##########
@@ -25,95 +25,157 @@
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
-import org.apache.pulsar.functions.proto.Function.Assignment;
 
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
+/**
+ * This class is responsible for reading assignments from the 'assignments' functions internal topic.
+ * Only functions worker leader writes to the topic while other workers read from the topic.
+ * When a worker become a leader, the worker will read to the end of the assignments topic and close its reader to the topic.
+ * Then the worker and new leader will be in charge of computing new assignments when necessary.
+ * The leader does not need to listen to the assignments topic because it can just update its in memory assignments map directly
+ * after it computes a new scheduling.  When a worker loses leadership, the worker is start reading from the assignments topic again.
+ */
 @Slf4j
 public class FunctionAssignmentTailer implements AutoCloseable {
 
     private final FunctionRuntimeManager functionRuntimeManager;
-    @Getter
-    private final Reader<byte[]> reader;
+    private final ReaderBuilder readerBuilder;
+    private final WorkerConfig workerConfig;
+    private final ErrorNotifier errorNotifier;
+    private Reader<byte[]> reader;
     private volatile boolean isRunning = false;
+    private volatile boolean exitOnEndOfTopic = false;
+    private CompletableFuture<Void> hasExited;
+    private Thread tailerThread;
 
-    private final Thread tailerThread;
+    @Getter
+    private MessageId lastMessageId = null;
     
     public FunctionAssignmentTailer(
             FunctionRuntimeManager functionRuntimeManager,
             ReaderBuilder readerBuilder,
             WorkerConfig workerConfig,
-            ErrorNotifier errorNotifier) throws PulsarClientException {
+            ErrorNotifier errorNotifier) {
         this.functionRuntimeManager = functionRuntimeManager;
-        
-        this.reader = readerBuilder
-          .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-runtime-manager")
-          .readerName(workerConfig.getWorkerId() + "-function-runtime-manager")
-          .topic(workerConfig.getFunctionAssignmentTopic())
-          .readCompacted(true)
-          .startMessageId(MessageId.earliest)
-          .create();
-        
-        this.tailerThread = new Thread(() -> {
-            while(isRunning) {
-                try {
-                    Message<byte[]> msg = reader.readNext();
-                    processAssignment(msg);
-                } catch (Throwable th) {
-                    if (isRunning) {
-                        log.error("Encountered error in assignment tailer", th);
-                        // trigger fatal error
-                        isRunning = false;
-                        errorNotifier.triggerError(th);
-                    } else {
-                        if (!(th instanceof InterruptedException || th.getCause() instanceof InterruptedException)) {
-                            log.warn("Encountered error when assignment tailer is not running", th);
-                        }
-                    }
+        this.hasExited = new CompletableFuture<>();
+        this.readerBuilder = readerBuilder;
+        this.workerConfig = workerConfig;
+        this.errorNotifier = errorNotifier;
+    }
 
-                }
+    public synchronized CompletableFuture<Void> triggerReadToTheEndAndExit() {
+        exitOnEndOfTopic = true;
+        return this.hasExited;
+    }
+
+    public void startFromMessage(MessageId startMessageId) throws PulsarClientException {
+        log.info("Function assignment tailer start reading from topic {} at {}",
+                workerConfig.getFunctionAssignmentTopic(), startMessageId);
+        if (!isRunning) {
+            isRunning = true;
+            if (reader == null) {
+                reader = createReader(startMessageId);
             }
-        });
-        this.tailerThread.setName("assignment-tailer-thread");
+            if (tailerThread == null || !tailerThread.isAlive()) {
+                tailerThread = getTailerThread();
+            }
+            hasExited = new CompletableFuture<>();
+            tailerThread.start();
+        }
     }
 
-    public void start() {
-        isRunning = true;
-        tailerThread.start();
+    public synchronized void start() throws PulsarClientException {
+        MessageId startMessageId = lastMessageId == null ? MessageId.earliest : lastMessageId;
+        startFromMessage(startMessageId);
     }
 
     @Override
-    public void close() {
-        log.info("Stopping function assignment tailer");
+    public synchronized void close() {
+        log.info("Closing function assignment tailer");
         try {
             isRunning = false;
-            if (tailerThread != null && tailerThread.isAlive()) {
-                tailerThread.interrupt();
+
+            if (tailerThread != null) {
+                while (true) {
+                    tailerThread.interrupt();
+
+                    try {
+                        tailerThread.join(5000, 0);
+                    } catch (InterruptedException e) {
+                        log.warn("Waiting for assignment tailer thread to stop is interrupted", e);
+                    }
+
+                    if (tailerThread.isAlive()) {
+                        log.warn("Assignment tailer thread is still alive.  Will attempt to interrupt again.");
+                    } else {
+                        break;
+                    }
+                }
+                tailerThread = null;
             }
             if (reader != null) {
                 reader.close();
+                reader = null;
             }
+
+            hasExited = null;
+            exitOnEndOfTopic = false;
+            
         } catch (IOException e) {
             log.error("Failed to stop function assignment tailer", e);
         }
-        log.info("Stopped function assignment tailer");
+    }
+    
+    private Reader<byte[]> createReader(MessageId startMessageId) throws PulsarClientException {
+        log.info("Assignment tailer will start reading from message id {}", startMessageId);
+
+        return readerBuilder
+                .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-assignment-tailer")
+                .readerName(workerConfig.getWorkerId() + "-function-assignment-tailer")
+                .topic(workerConfig.getFunctionAssignmentTopic())
+                .readCompacted(true)
+                .startMessageId(startMessageId)
+                .create();
     }
 
-    public void processAssignment(Message<byte[]> msg) {
-
-        if(msg.getData()==null || (msg.getData().length==0)) {
-            log.info("Received assignment delete: {}", msg.getKey());
-            this.functionRuntimeManager.deleteAssignment(msg.getKey());
-        } else {
-            Assignment assignment;
-            try {
-                assignment = Assignment.parseFrom(msg.getData());
-            } catch (IOException e) {
-                log.error("[{}] Received bad assignment update at message {}", reader.getTopic(), msg.getMessageId(), e);
-                throw new RuntimeException(e);
+    private Thread getTailerThread() {
+        Thread t = new Thread(() -> {
+            while (isRunning) {
+                try {
+                    Message<byte[]> msg = reader.readNext(5, TimeUnit.SECONDS);
+                    if (msg == null) {
+                        if (exitOnEndOfTopic && !reader.hasMessageAvailable()) {
+                            break;
+                        }

Review comment:
       is it simpler if we do
   while(isRunning) {
   if (exitOnEndOfTopic && !available) break;
   try { read message... }




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7237: Fix leader/scheduler assignment processing lag problem

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r441890309



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
##########
@@ -70,33 +78,64 @@ public FunctionAssignmentTailer(
                             log.warn("Encountered error when assignment tailer is not running", th);
                         }
                     }
-
                 }
             }
+            log.info("tailer thread exiting...");
+            hasExited.complete(null);
         });
         this.tailerThread.setName("assignment-tailer-thread");
     }
 
-    public void start() {
-        isRunning = true;
-        tailerThread.start();
+    public CompletableFuture<Void> triggerReadToTheEndAndExit() {
+        exitOnEndOfTopic = true;
+        return this.hasExited;
+    }
+
+    public synchronized void start() throws PulsarClientException {
+        if (!isRunning) {
+            isRunning = true;
+            if (reader == null) {
+                reader = createReader();
+            }
+            tailerThread.start();
+        }
     }
+    
 
     @Override
-    public void close() {
-        log.info("Stopping function assignment tailer");
+    public synchronized void close() {
+        log.info("Closing function assignment tailer");
         try {
             isRunning = false;
-            if (tailerThread != null && tailerThread.isAlive()) {
-                tailerThread.interrupt();
-            }
+
+            if (tailerThread != null) {
+                while (true) {
+                    tailerThread.interrupt();
+
+                    try {
+                        tailerThread.join(5000, 0);
+                    } catch (InterruptedException e) {
+                        log.warn("Waiting for assignment tailer thread to stop is interrupted", e);
+                    }
+
+                    if (tailerThread.isAlive()) {
+                        log.warn("Assignment tailer thread is still alive.  Will attempt to interrupt again.");
+                    } else {
+                        break;
+                    }
+                }
+            }            
             if (reader != null) {
                 reader.close();
+                reader = null;
             }
+
+            hasExited = new CompletableFuture<>();

Review comment:
       ok I re-initialize it in the start method




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7237: Fix leader/scheduler assignment processing lag problem

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r442641655



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
##########
@@ -25,95 +25,157 @@
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
-import org.apache.pulsar.functions.proto.Function.Assignment;
 
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
+/**
+ * This class is responsible for reading assignments from the 'assignments' functions internal topic.
+ * Only functions worker leader writes to the topic while other workers read from the topic.
+ * When a worker become a leader, the worker will read to the end of the assignments topic and close its reader to the topic.
+ * Then the worker and new leader will be in charge of computing new assignments when necessary.
+ * The leader does not need to listen to the assignments topic because it can just update its in memory assignments map directly
+ * after it computes a new scheduling.  When a worker loses leadership, the worker is start reading from the assignments topic again.
+ */
 @Slf4j
 public class FunctionAssignmentTailer implements AutoCloseable {
 
     private final FunctionRuntimeManager functionRuntimeManager;
-    @Getter
-    private final Reader<byte[]> reader;
+    private final ReaderBuilder readerBuilder;
+    private final WorkerConfig workerConfig;
+    private final ErrorNotifier errorNotifier;
+    private Reader<byte[]> reader;
     private volatile boolean isRunning = false;
+    private volatile boolean exitOnEndOfTopic = false;
+    private CompletableFuture<Void> hasExited;

Review comment:
       exitFuture might be a better name




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7237: Fix leader/scheduler assignment processing lag problem

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r442663082



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
##########
@@ -25,95 +25,157 @@
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
-import org.apache.pulsar.functions.proto.Function.Assignment;
 
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
+/**
+ * This class is responsible for reading assignments from the 'assignments' functions internal topic.
+ * Only functions worker leader writes to the topic while other workers read from the topic.
+ * When a worker become a leader, the worker will read to the end of the assignments topic and close its reader to the topic.
+ * Then the worker and new leader will be in charge of computing new assignments when necessary.
+ * The leader does not need to listen to the assignments topic because it can just update its in memory assignments map directly
+ * after it computes a new scheduling.  When a worker loses leadership, the worker is start reading from the assignments topic again.
+ */
 @Slf4j
 public class FunctionAssignmentTailer implements AutoCloseable {
 
     private final FunctionRuntimeManager functionRuntimeManager;
-    @Getter
-    private final Reader<byte[]> reader;
+    private final ReaderBuilder readerBuilder;
+    private final WorkerConfig workerConfig;
+    private final ErrorNotifier errorNotifier;
+    private Reader<byte[]> reader;
     private volatile boolean isRunning = false;
+    private volatile boolean exitOnEndOfTopic = false;
+    private CompletableFuture<Void> hasExited;
+    private Thread tailerThread;
 
-    private final Thread tailerThread;
+    @Getter
+    private MessageId lastMessageId = null;
     
     public FunctionAssignmentTailer(
             FunctionRuntimeManager functionRuntimeManager,
             ReaderBuilder readerBuilder,
             WorkerConfig workerConfig,
-            ErrorNotifier errorNotifier) throws PulsarClientException {
+            ErrorNotifier errorNotifier) {
         this.functionRuntimeManager = functionRuntimeManager;
-        
-        this.reader = readerBuilder
-          .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-runtime-manager")
-          .readerName(workerConfig.getWorkerId() + "-function-runtime-manager")
-          .topic(workerConfig.getFunctionAssignmentTopic())
-          .readCompacted(true)
-          .startMessageId(MessageId.earliest)
-          .create();
-        
-        this.tailerThread = new Thread(() -> {
-            while(isRunning) {
-                try {
-                    Message<byte[]> msg = reader.readNext();
-                    processAssignment(msg);
-                } catch (Throwable th) {
-                    if (isRunning) {
-                        log.error("Encountered error in assignment tailer", th);
-                        // trigger fatal error
-                        isRunning = false;
-                        errorNotifier.triggerError(th);
-                    } else {
-                        if (!(th instanceof InterruptedException || th.getCause() instanceof InterruptedException)) {
-                            log.warn("Encountered error when assignment tailer is not running", th);
-                        }
-                    }
+        this.hasExited = new CompletableFuture<>();
+        this.readerBuilder = readerBuilder;
+        this.workerConfig = workerConfig;
+        this.errorNotifier = errorNotifier;
+    }
 
-                }
+    public synchronized CompletableFuture<Void> triggerReadToTheEndAndExit() {
+        exitOnEndOfTopic = true;
+        return this.hasExited;
+    }
+
+    public void startFromMessage(MessageId startMessageId) throws PulsarClientException {
+        log.info("Function assignment tailer start reading from topic {} at {}",
+                workerConfig.getFunctionAssignmentTopic(), startMessageId);
+        if (!isRunning) {
+            isRunning = true;
+            if (reader == null) {
+                reader = createReader(startMessageId);
             }
-        });
-        this.tailerThread.setName("assignment-tailer-thread");
+            if (tailerThread == null || !tailerThread.isAlive()) {
+                tailerThread = getTailerThread();
+            }
+            hasExited = new CompletableFuture<>();
+            tailerThread.start();
+        }
     }
 
-    public void start() {
-        isRunning = true;
-        tailerThread.start();
+    public synchronized void start() throws PulsarClientException {
+        MessageId startMessageId = lastMessageId == null ? MessageId.earliest : lastMessageId;
+        startFromMessage(startMessageId);
     }
 
     @Override
-    public void close() {
-        log.info("Stopping function assignment tailer");
+    public synchronized void close() {
+        log.info("Closing function assignment tailer");
         try {
             isRunning = false;
-            if (tailerThread != null && tailerThread.isAlive()) {
-                tailerThread.interrupt();
+
+            if (tailerThread != null) {
+                while (true) {
+                    tailerThread.interrupt();
+
+                    try {
+                        tailerThread.join(5000, 0);
+                    } catch (InterruptedException e) {
+                        log.warn("Waiting for assignment tailer thread to stop is interrupted", e);
+                    }
+
+                    if (tailerThread.isAlive()) {
+                        log.warn("Assignment tailer thread is still alive.  Will attempt to interrupt again.");
+                    } else {
+                        break;
+                    }
+                }
+                tailerThread = null;
             }
             if (reader != null) {
                 reader.close();
+                reader = null;
             }
+
+            hasExited = null;
+            exitOnEndOfTopic = false;
+            
         } catch (IOException e) {
             log.error("Failed to stop function assignment tailer", e);
         }
-        log.info("Stopped function assignment tailer");
+    }
+    
+    private Reader<byte[]> createReader(MessageId startMessageId) throws PulsarClientException {
+        log.info("Assignment tailer will start reading from message id {}", startMessageId);
+
+        return readerBuilder
+                .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-assignment-tailer")
+                .readerName(workerConfig.getWorkerId() + "-function-assignment-tailer")
+                .topic(workerConfig.getFunctionAssignmentTopic())
+                .readCompacted(true)
+                .startMessageId(startMessageId)
+                .create();
     }
 
-    public void processAssignment(Message<byte[]> msg) {
-
-        if(msg.getData()==null || (msg.getData().length==0)) {
-            log.info("Received assignment delete: {}", msg.getKey());
-            this.functionRuntimeManager.deleteAssignment(msg.getKey());
-        } else {
-            Assignment assignment;
-            try {
-                assignment = Assignment.parseFrom(msg.getData());
-            } catch (IOException e) {
-                log.error("[{}] Received bad assignment update at message {}", reader.getTopic(), msg.getMessageId(), e);
-                throw new RuntimeException(e);
+    private Thread getTailerThread() {
+        Thread t = new Thread(() -> {
+            while (isRunning) {
+                try {
+                    Message<byte[]> msg = reader.readNext(5, TimeUnit.SECONDS);
+                    if (msg == null) {
+                        if (exitOnEndOfTopic && !reader.hasMessageAvailable()) {
+                            break;
+                        }

Review comment:
       It's safer to wait for a timeout period to make sure no messages just arrived late




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on pull request #7237: Fix leader/scheduler assignment processing lag problem

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#issuecomment-642334423


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7237: Fix leader/scheduler assignment processing lag problem

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r439542630



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
##########
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerEventListener;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@Slf4j
+public class LeaderService implements AutoCloseable, ConsumerEventListener {
+
+    private final String consumerName;
+    private final FunctionAssignmentTailer functionAssignmentTailer;
+    private final ErrorNotifier errorNotifier;
+    private ConsumerImpl<byte[]> consumer;
+    private final WorkerConfig workerConfig;
+    private final PulsarClient pulsarClient;
+    private final AtomicBoolean isLeader = new AtomicBoolean(false);
+
+    static final String COORDINATION_TOPIC_SUBSCRIPTION = "participants";
+
+    private static String WORKER_IDENTIFIER = "id";
+    
+    public LeaderService(WorkerService workerService,
+                         PulsarClient pulsarClient,
+                         FunctionAssignmentTailer functionAssignmentTailer,
+                         ErrorNotifier errorNotifier) {
+        this.workerConfig = workerService.getWorkerConfig();
+        this.pulsarClient = pulsarClient;
+        this.functionAssignmentTailer = functionAssignmentTailer;
+        this.errorNotifier = errorNotifier;
+        consumerName = String.format(
+                "%s:%s:%d",
+                workerConfig.getWorkerId(),
+                workerConfig.getWorkerHostname(),
+                workerConfig.getWorkerPort()
+        );
+
+    }
+
+    public void start() throws PulsarClientException {
+        // the leaders service is using a `coordination` topic for leader election.
+        // we don't produce any messages into this topic, we only use the `failover` subscription
+        // to elect an active consumer as the leader worker. The leader worker will be responsible
+        // for scheduling snapshots for FMT and doing task assignment.
+        consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
+                .topic(workerConfig.getClusterCoordinationTopic())
+                .subscriptionName(COORDINATION_TOPIC_SUBSCRIPTION)
+                .subscriptionType(SubscriptionType.Failover)
+                .consumerEventListener(this)
+                .property(WORKER_IDENTIFIER, consumerName)
+                .subscribe();
+
+    }
+
+    @Override
+    public void becameActive(Consumer<?> consumer, int partitionId) {
+        if (isLeader.compareAndSet(false, true)) {
+            log.info("Worker {} became the leader.", consumerName);
+            try {
+                // trigger read to the end of the topic and exit
+                // Since the leader can just update its in memory assignments cache directly
+                functionAssignmentTailer.triggerReadToTheEndAndExit().get();

Review comment:
       Yup. That is the same pattern in https://github.com/apache/pulsar/pull/7255 as well




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7237: Fix leader/scheduler assignment processing lag problem

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r442645561



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
##########
@@ -33,8 +33,11 @@
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
 import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.MessageId;

Review comment:
       remove?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7237: Fix leader/scheduler assignment processing lag problem

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r439267776



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
##########
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerEventListener;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@Slf4j
+public class LeaderService implements AutoCloseable, ConsumerEventListener {
+
+    private final String consumerName;
+    private final FunctionAssignmentTailer functionAssignmentTailer;
+    private final ErrorNotifier errorNotifier;
+    private ConsumerImpl<byte[]> consumer;
+    private final WorkerConfig workerConfig;
+    private final PulsarClient pulsarClient;
+    private final AtomicBoolean isLeader = new AtomicBoolean(false);
+
+    static final String COORDINATION_TOPIC_SUBSCRIPTION = "participants";
+
+    private static String WORKER_IDENTIFIER = "id";
+    
+    public LeaderService(WorkerService workerService,
+                         PulsarClient pulsarClient,
+                         FunctionAssignmentTailer functionAssignmentTailer,
+                         ErrorNotifier errorNotifier) {
+        this.workerConfig = workerService.getWorkerConfig();
+        this.pulsarClient = pulsarClient;
+        this.functionAssignmentTailer = functionAssignmentTailer;
+        this.errorNotifier = errorNotifier;
+        consumerName = String.format(
+                "%s:%s:%d",
+                workerConfig.getWorkerId(),
+                workerConfig.getWorkerHostname(),
+                workerConfig.getWorkerPort()
+        );
+
+    }
+
+    public void start() throws PulsarClientException {
+        // the leaders service is using a `coordination` topic for leader election.
+        // we don't produce any messages into this topic, we only use the `failover` subscription
+        // to elect an active consumer as the leader worker. The leader worker will be responsible
+        // for scheduling snapshots for FMT and doing task assignment.
+        consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
+                .topic(workerConfig.getClusterCoordinationTopic())
+                .subscriptionName(COORDINATION_TOPIC_SUBSCRIPTION)
+                .subscriptionType(SubscriptionType.Failover)
+                .consumerEventListener(this)
+                .property(WORKER_IDENTIFIER, consumerName)
+                .subscribe();
+
+    }
+
+    @Override
+    public void becameActive(Consumer<?> consumer, int partitionId) {
+        if (isLeader.compareAndSet(false, true)) {
+            log.info("Worker {} became the leader.", consumerName);
+            try {
+                // trigger read to the end of the topic and exit
+                // Since the leader can just update its in memory assignments cache directly
+                functionAssignmentTailer.triggerReadToTheEndAndExit().get();

Review comment:
       > This should be abstracted out from leaderservice to respective class(in this case functionruntimemanager)
   
   yup done
   
   > Also we need to create the producer here right?
   
   Why do we need to create a producer? To start producing to the assignment topic?  We initialize the producer in the constructor.  I guess we don't need to do that and only when the worker becomes the leader will it create the producer and close the producer when it looses leadership




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7237: Fix leader/scheduler assignment processing lag problem

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r441875052



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
##########
@@ -130,6 +132,8 @@ public int size() {
     private final FunctionMetaDataManager functionMetaDataManager;
 
     private final ErrorNotifier errorNotifier;
+

Review comment:
       Why are we adding this here? I'm not seeing any usages




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org