You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/06/05 06:54:29 UTC

[GitHub] [pulsar] jerrypeng opened a new pull request #7180: FunctionAssignmentTailer should use its own thread

jerrypeng opened a new pull request #7180:
URL: https://github.com/apache/pulsar/pull/7180


   
   ### Motivation
   
   Currently, the FunctionAssignmentTailer reads assignments and processes them using the pulsar-external-listener thread part of the pulsar-client. Processing assignments may take a "long" time and should not block pulsar-external-listener. Thus, FunctionAssignmentTailer should use its own thread for reading and processing assignments.
   
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7180: FunctionAssignmentTailer should use its own thread

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7180:
URL: https://github.com/apache/pulsar/pull/7180#discussion_r436121183



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
##########
@@ -18,56 +18,85 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import java.io.IOException;
-import java.util.function.Function;
-
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.functions.proto.Function.Assignment;
 
-import lombok.extern.slf4j.Slf4j;
+import java.io.IOException;
 
 @Slf4j
-public class FunctionAssignmentTailer
-    implements java.util.function.Consumer<Message<byte[]>>, Function<Throwable, Void>, AutoCloseable {
+public class FunctionAssignmentTailer implements AutoCloseable {
 
     private final FunctionRuntimeManager functionRuntimeManager;
+    @Getter
     private final Reader<byte[]> reader;
-    private boolean closed = false;
+    private volatile boolean isRunning = false;
 
-    public FunctionAssignmentTailer(FunctionRuntimeManager functionRuntimeManager, Reader<byte[]> reader) {
+    private final Thread tailerThread;
+    
+    public FunctionAssignmentTailer(FunctionRuntimeManager functionRuntimeManager, ReaderBuilder readerBuilder, WorkerConfig workerConfig) throws PulsarClientException {
         this.functionRuntimeManager = functionRuntimeManager;
-        this.reader = reader;
-    }
+        
+        this.reader = readerBuilder
+          .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-runtime-manager")
+          .readerName(workerConfig.getWorkerId() + "-function-runtime-manager")
+          .topic(workerConfig.getFunctionAssignmentTopic())
+          .readCompacted(true)
+          .startMessageId(MessageId.earliest)
+          .create();
 
-    public void start() {
-        receiveOne();
+        this.tailerThread = new Thread(() -> {
+            while(isRunning) {
+                try {
+                    Message<byte[]> msg = reader.readNext();
+                    processAssignment(msg);
+                } catch (Exception e) {
+                    if (isRunning) {
+                        log.error("Encountered error in assignment tailer", e);
+
+                        // trigger fatal error
+                        // TODO add mechanism to notify main thread
+                    } else {
+                        if (!(e instanceof InterruptedException)) {
+                            log.warn("Encountered error when assignment tailer is not running", e);
+                        }
+                    }
+
+                }
+            }
+        });
+        this.tailerThread.setName("assignment-tailer-thread");
     }
 
-    private void receiveOne() {
-        reader.readNextAsync()
-                .thenAccept(this)
-                .exceptionally(this);
+    public void start() {
+        isRunning = true;
+        tailerThread.start();
     }
 
     @Override
     public void close() {
-        if (closed) {
-            return;
-        }
-        log.info("Stopping function state consumer");
+        log.info("Stopping function assignment tailer");
         try {
-            closed = true;
-            reader.close();
+            isRunning = false;
+            if (tailerThread != null && tailerThread.isAlive()) {
+                tailerThread.interrupt();
+            }
+            if (reader != null) {
+                reader.close();
+            }

Review comment:
       Is there a use case for that right now?  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7180: FunctionAssignmentTailer should use its own thread

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7180:
URL: https://github.com/apache/pulsar/pull/7180#discussion_r435740515



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
##########
@@ -18,56 +18,90 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import java.io.IOException;
-import java.util.function.Function;
-
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.functions.proto.Function.Assignment;
 
-import lombok.extern.slf4j.Slf4j;
+import java.io.IOException;
 
 @Slf4j
-public class FunctionAssignmentTailer
-    implements java.util.function.Consumer<Message<byte[]>>, Function<Throwable, Void>, AutoCloseable {
+public class FunctionAssignmentTailer implements AutoCloseable {
 
     private final FunctionRuntimeManager functionRuntimeManager;
+    @Getter
     private final Reader<byte[]> reader;
-    private boolean closed = false;
+    private volatile boolean isRunning = false;
 
-    public FunctionAssignmentTailer(FunctionRuntimeManager functionRuntimeManager, Reader<byte[]> reader) {
+    private final Thread tailerThread;
+    
+    public FunctionAssignmentTailer(FunctionRuntimeManager functionRuntimeManager, ReaderBuilder readerBuilder, WorkerConfig workerConfig) throws PulsarClientException {
         this.functionRuntimeManager = functionRuntimeManager;
-        this.reader = reader;
-    }
+        
+        this.reader = readerBuilder
+          .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-runtime-manager")
+          .readerName(workerConfig.getWorkerId() + "-function-runtime-manager")
+          .topic(workerConfig.getFunctionAssignmentTopic())
+          .readCompacted(true)
+          .startMessageId(MessageId.earliest)
+          .create();
 
-    public void start() {
-        receiveOne();
+        this.tailerThread = new Thread(() -> {
+            while(true) {
+                try {
+                    while(isRunning) {
+                        Message<byte[]> msg = reader.readNext();
+                        processAssignment(msg);
+                    }
+                } catch (Exception e) {
+                    if (isRunning) {
+                        log.error("Encountered error in assignment tailer", e);
+
+                        // trigger fatal error
+                        // TODO add mechanism to notify main thread
+                    } else {
+                        if (!(e instanceof InterruptedException)) {
+                            log.warn("Encountered error when assignment tailer is not running", e);
+                        }
+                    }
+
+                }
+            }
+        });
+        this.tailerThread.setName("assignment-tailer-thread");
     }
 
-    private void receiveOne() {
-        reader.readNextAsync()
-                .thenAccept(this)
-                .exceptionally(this);
+    public void start() {
+        isRunning = true;
+        tailerThread.start();
     }
 
     @Override
     public void close() {
-        if (closed) {
+        if (!isRunning) {
             return;
         }
-        log.info("Stopping function state consumer");
+        log.info("Stopping function assignment tailer");
         try {
-            closed = true;
-            reader.close();
+            isRunning = false;
+            if (tailerThread != null && tailerThread.isAlive()) {
+                tailerThread.interrupt();

Review comment:
       probably should wait as well?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7180: FunctionAssignmentTailer should use its own thread

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7180:
URL: https://github.com/apache/pulsar/pull/7180#discussion_r436052317



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
##########
@@ -18,56 +18,91 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import java.io.IOException;
-import java.util.function.Function;
-
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.functions.proto.Function.Assignment;
 
-import lombok.extern.slf4j.Slf4j;
+import java.io.IOException;
 
 @Slf4j
-public class FunctionAssignmentTailer
-    implements java.util.function.Consumer<Message<byte[]>>, Function<Throwable, Void>, AutoCloseable {
+public class FunctionAssignmentTailer implements AutoCloseable {
 
     private final FunctionRuntimeManager functionRuntimeManager;
+    @Getter
     private final Reader<byte[]> reader;
-    private boolean closed = false;
+    private volatile boolean isRunning = false;

Review comment:
       instead of isRunning, better to word it as stopRunning




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7180: FunctionAssignmentTailer should use its own thread

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7180:
URL: https://github.com/apache/pulsar/pull/7180#discussion_r435741992



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
##########
@@ -18,56 +18,90 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import java.io.IOException;
-import java.util.function.Function;
-
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.functions.proto.Function.Assignment;
 
-import lombok.extern.slf4j.Slf4j;
+import java.io.IOException;
 
 @Slf4j
-public class FunctionAssignmentTailer
-    implements java.util.function.Consumer<Message<byte[]>>, Function<Throwable, Void>, AutoCloseable {
+public class FunctionAssignmentTailer implements AutoCloseable {
 
     private final FunctionRuntimeManager functionRuntimeManager;
+    @Getter
     private final Reader<byte[]> reader;
-    private boolean closed = false;
+    private volatile boolean isRunning = false;
 
-    public FunctionAssignmentTailer(FunctionRuntimeManager functionRuntimeManager, Reader<byte[]> reader) {
+    private final Thread tailerThread;
+    
+    public FunctionAssignmentTailer(FunctionRuntimeManager functionRuntimeManager, ReaderBuilder readerBuilder, WorkerConfig workerConfig) throws PulsarClientException {
         this.functionRuntimeManager = functionRuntimeManager;
-        this.reader = reader;
-    }
+        
+        this.reader = readerBuilder
+          .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-runtime-manager")
+          .readerName(workerConfig.getWorkerId() + "-function-runtime-manager")
+          .topic(workerConfig.getFunctionAssignmentTopic())
+          .readCompacted(true)
+          .startMessageId(MessageId.earliest)
+          .create();
 
-    public void start() {
-        receiveOne();
+        this.tailerThread = new Thread(() -> {
+            while(true) {

Review comment:
       instead of two while loop, maybe you can do
   while(isRunning) {
      try { readNext(); processAssignment(); } catch (Exeepton e) { see what kind of exception 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7180: FunctionAssignmentTailer should use its own thread

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7180:
URL: https://github.com/apache/pulsar/pull/7180#discussion_r436070345



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
##########
@@ -18,56 +18,91 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import java.io.IOException;
-import java.util.function.Function;
-
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.functions.proto.Function.Assignment;
 
-import lombok.extern.slf4j.Slf4j;
+import java.io.IOException;
 
 @Slf4j
-public class FunctionAssignmentTailer
-    implements java.util.function.Consumer<Message<byte[]>>, Function<Throwable, Void>, AutoCloseable {
+public class FunctionAssignmentTailer implements AutoCloseable {
 
     private final FunctionRuntimeManager functionRuntimeManager;
+    @Getter
     private final Reader<byte[]> reader;
-    private boolean closed = false;
+    private volatile boolean isRunning = false;

Review comment:
       why do we need to change it?  We would also need to reverse the boolean logic checks to be semantically correct?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7180: FunctionAssignmentTailer should use its own thread

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7180:
URL: https://github.com/apache/pulsar/pull/7180#discussion_r436049886



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
##########
@@ -18,56 +18,90 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import java.io.IOException;
-import java.util.function.Function;
-
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.functions.proto.Function.Assignment;
 
-import lombok.extern.slf4j.Slf4j;
+import java.io.IOException;
 
 @Slf4j
-public class FunctionAssignmentTailer
-    implements java.util.function.Consumer<Message<byte[]>>, Function<Throwable, Void>, AutoCloseable {
+public class FunctionAssignmentTailer implements AutoCloseable {
 
     private final FunctionRuntimeManager functionRuntimeManager;
+    @Getter
     private final Reader<byte[]> reader;
-    private boolean closed = false;
+    private volatile boolean isRunning = false;
 
-    public FunctionAssignmentTailer(FunctionRuntimeManager functionRuntimeManager, Reader<byte[]> reader) {
+    private final Thread tailerThread;
+    
+    public FunctionAssignmentTailer(FunctionRuntimeManager functionRuntimeManager, ReaderBuilder readerBuilder, WorkerConfig workerConfig) throws PulsarClientException {
         this.functionRuntimeManager = functionRuntimeManager;
-        this.reader = reader;
-    }
+        
+        this.reader = readerBuilder
+          .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-runtime-manager")
+          .readerName(workerConfig.getWorkerId() + "-function-runtime-manager")
+          .topic(workerConfig.getFunctionAssignmentTopic())
+          .readCompacted(true)
+          .startMessageId(MessageId.earliest)
+          .create();
 
-    public void start() {
-        receiveOne();
+        this.tailerThread = new Thread(() -> {
+            while(true) {
+                try {
+                    while(isRunning) {
+                        Message<byte[]> msg = reader.readNext();
+                        processAssignment(msg);
+                    }
+                } catch (Exception e) {
+                    if (isRunning) {
+                        log.error("Encountered error in assignment tailer", e);
+
+                        // trigger fatal error
+                        // TODO add mechanism to notify main thread
+                    } else {
+                        if (!(e instanceof InterruptedException)) {
+                            log.warn("Encountered error when assignment tailer is not running", e);
+                        }
+                    }
+
+                }
+            }
+        });
+        this.tailerThread.setName("assignment-tailer-thread");
     }
 
-    private void receiveOne() {
-        reader.readNextAsync()
-                .thenAccept(this)
-                .exceptionally(this);
+    public void start() {
+        isRunning = true;
+        tailerThread.start();
     }
 
     @Override
     public void close() {
-        if (closed) {
+        if (!isRunning) {
             return;
         }
-        log.info("Stopping function state consumer");
+        log.info("Stopping function assignment tailer");
         try {
-            closed = true;
-            reader.close();
+            isRunning = false;
+            if (tailerThread != null && tailerThread.isAlive()) {
+                tailerThread.interrupt();

Review comment:
       we shouldn't need to 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7180: FunctionAssignmentTailer should use its own thread

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7180:
URL: https://github.com/apache/pulsar/pull/7180#discussion_r436077415



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
##########
@@ -18,56 +18,91 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import java.io.IOException;
-import java.util.function.Function;
-
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.functions.proto.Function.Assignment;
 
-import lombok.extern.slf4j.Slf4j;
+import java.io.IOException;
 
 @Slf4j
-public class FunctionAssignmentTailer
-    implements java.util.function.Consumer<Message<byte[]>>, Function<Throwable, Void>, AutoCloseable {
+public class FunctionAssignmentTailer implements AutoCloseable {
 
     private final FunctionRuntimeManager functionRuntimeManager;
+    @Getter
     private final Reader<byte[]> reader;
-    private boolean closed = false;
+    private volatile boolean isRunning = false;

Review comment:
       Yes. isRunning implies that its a state. stopRunning implies its a command.
   What we really have here is that the thread should keep running until its commanded by the main thread to stop. Hence the stopRunning instead of isRunning




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7180: FunctionAssignmentTailer should use its own thread

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7180:
URL: https://github.com/apache/pulsar/pull/7180#discussion_r436078046



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
##########
@@ -18,56 +18,85 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import java.io.IOException;
-import java.util.function.Function;
-
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.functions.proto.Function.Assignment;
 
-import lombok.extern.slf4j.Slf4j;
+import java.io.IOException;
 
 @Slf4j
-public class FunctionAssignmentTailer
-    implements java.util.function.Consumer<Message<byte[]>>, Function<Throwable, Void>, AutoCloseable {
+public class FunctionAssignmentTailer implements AutoCloseable {
 
     private final FunctionRuntimeManager functionRuntimeManager;
+    @Getter
     private final Reader<byte[]> reader;
-    private boolean closed = false;
+    private volatile boolean isRunning = false;
 
-    public FunctionAssignmentTailer(FunctionRuntimeManager functionRuntimeManager, Reader<byte[]> reader) {
+    private final Thread tailerThread;
+    
+    public FunctionAssignmentTailer(FunctionRuntimeManager functionRuntimeManager, ReaderBuilder readerBuilder, WorkerConfig workerConfig) throws PulsarClientException {
         this.functionRuntimeManager = functionRuntimeManager;
-        this.reader = reader;
-    }
+        
+        this.reader = readerBuilder
+          .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-runtime-manager")
+          .readerName(workerConfig.getWorkerId() + "-function-runtime-manager")
+          .topic(workerConfig.getFunctionAssignmentTopic())
+          .readCompacted(true)
+          .startMessageId(MessageId.earliest)
+          .create();
 
-    public void start() {
-        receiveOne();
+        this.tailerThread = new Thread(() -> {
+            while(isRunning) {
+                try {
+                    Message<byte[]> msg = reader.readNext();
+                    processAssignment(msg);
+                } catch (Exception e) {
+                    if (isRunning) {
+                        log.error("Encountered error in assignment tailer", e);
+
+                        // trigger fatal error
+                        // TODO add mechanism to notify main thread
+                    } else {
+                        if (!(e instanceof InterruptedException)) {
+                            log.warn("Encountered error when assignment tailer is not running", e);
+                        }
+                    }
+
+                }
+            }
+        });
+        this.tailerThread.setName("assignment-tailer-thread");
     }
 
-    private void receiveOne() {
-        reader.readNextAsync()
-                .thenAccept(this)
-                .exceptionally(this);
+    public void start() {
+        isRunning = true;
+        tailerThread.start();
     }
 
     @Override
     public void close() {
-        if (closed) {
-            return;
-        }
-        log.info("Stopping function state consumer");
+        log.info("Stopping function assignment tailer");
         try {
-            closed = true;
-            reader.close();
+            isRunning = false;
+            if (tailerThread != null && tailerThread.isAlive()) {
+                tailerThread.interrupt();
+            }
+            if (reader != null) {
+                reader.close();

Review comment:
       should make it null here. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7180: FunctionAssignmentTailer should use its own thread

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7180:
URL: https://github.com/apache/pulsar/pull/7180#discussion_r435739838



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
##########
@@ -18,56 +18,90 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import java.io.IOException;
-import java.util.function.Function;
-
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.functions.proto.Function.Assignment;
 
-import lombok.extern.slf4j.Slf4j;
+import java.io.IOException;
 
 @Slf4j
-public class FunctionAssignmentTailer
-    implements java.util.function.Consumer<Message<byte[]>>, Function<Throwable, Void>, AutoCloseable {
+public class FunctionAssignmentTailer implements AutoCloseable {
 
     private final FunctionRuntimeManager functionRuntimeManager;
+    @Getter
     private final Reader<byte[]> reader;
-    private boolean closed = false;
+    private volatile boolean isRunning = false;
 
-    public FunctionAssignmentTailer(FunctionRuntimeManager functionRuntimeManager, Reader<byte[]> reader) {
+    private final Thread tailerThread;
+    
+    public FunctionAssignmentTailer(FunctionRuntimeManager functionRuntimeManager, ReaderBuilder readerBuilder, WorkerConfig workerConfig) throws PulsarClientException {
         this.functionRuntimeManager = functionRuntimeManager;
-        this.reader = reader;
-    }
+        
+        this.reader = readerBuilder
+          .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-runtime-manager")
+          .readerName(workerConfig.getWorkerId() + "-function-runtime-manager")
+          .topic(workerConfig.getFunctionAssignmentTopic())
+          .readCompacted(true)
+          .startMessageId(MessageId.earliest)
+          .create();
 
-    public void start() {
-        receiveOne();
+        this.tailerThread = new Thread(() -> {
+            while(true) {
+                try {
+                    while(isRunning) {
+                        Message<byte[]> msg = reader.readNext();
+                        processAssignment(msg);
+                    }
+                } catch (Exception e) {
+                    if (isRunning) {
+                        log.error("Encountered error in assignment tailer", e);
+
+                        // trigger fatal error
+                        // TODO add mechanism to notify main thread
+                    } else {
+                        if (!(e instanceof InterruptedException)) {
+                            log.warn("Encountered error when assignment tailer is not running", e);
+                        }
+                    }
+
+                }
+            }
+        });
+        this.tailerThread.setName("assignment-tailer-thread");
     }
 
-    private void receiveOne() {
-        reader.readNextAsync()
-                .thenAccept(this)
-                .exceptionally(this);
+    public void start() {
+        isRunning = true;
+        tailerThread.start();
     }
 
     @Override
     public void close() {
-        if (closed) {
+        if (!isRunning) {

Review comment:
       reader is still not closed here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7180: FunctionAssignmentTailer should use its own thread

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7180:
URL: https://github.com/apache/pulsar/pull/7180#discussion_r436156605



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
##########
@@ -18,56 +18,91 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import java.io.IOException;
-import java.util.function.Function;
-
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.functions.proto.Function.Assignment;
 
-import lombok.extern.slf4j.Slf4j;
+import java.io.IOException;
 
 @Slf4j
-public class FunctionAssignmentTailer
-    implements java.util.function.Consumer<Message<byte[]>>, Function<Throwable, Void>, AutoCloseable {
+public class FunctionAssignmentTailer implements AutoCloseable {
 
     private final FunctionRuntimeManager functionRuntimeManager;
+    @Getter
     private final Reader<byte[]> reader;
-    private boolean closed = false;
+    private volatile boolean isRunning = false;

Review comment:
       At start() {
   shouldStop = false;
   create the thread and start
   }
   At close() {
   set shoutStop = true, interrupt thread, join and then set it to null;
   close reader and set it to null
   }
   
   This way I can do start() /close() multiple times




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7180: FunctionAssignmentTailer should use its own thread

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7180:
URL: https://github.com/apache/pulsar/pull/7180#discussion_r436048569



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
##########
@@ -18,56 +18,90 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import java.io.IOException;
-import java.util.function.Function;
-
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.functions.proto.Function.Assignment;
 
-import lombok.extern.slf4j.Slf4j;
+import java.io.IOException;
 
 @Slf4j
-public class FunctionAssignmentTailer
-    implements java.util.function.Consumer<Message<byte[]>>, Function<Throwable, Void>, AutoCloseable {
+public class FunctionAssignmentTailer implements AutoCloseable {
 
     private final FunctionRuntimeManager functionRuntimeManager;
+    @Getter
     private final Reader<byte[]> reader;
-    private boolean closed = false;
+    private volatile boolean isRunning = false;
 
-    public FunctionAssignmentTailer(FunctionRuntimeManager functionRuntimeManager, Reader<byte[]> reader) {
+    private final Thread tailerThread;
+    
+    public FunctionAssignmentTailer(FunctionRuntimeManager functionRuntimeManager, ReaderBuilder readerBuilder, WorkerConfig workerConfig) throws PulsarClientException {
         this.functionRuntimeManager = functionRuntimeManager;
-        this.reader = reader;
-    }
+        
+        this.reader = readerBuilder
+          .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-runtime-manager")
+          .readerName(workerConfig.getWorkerId() + "-function-runtime-manager")
+          .topic(workerConfig.getFunctionAssignmentTopic())
+          .readCompacted(true)
+          .startMessageId(MessageId.earliest)
+          .create();
 
-    public void start() {
-        receiveOne();
+        this.tailerThread = new Thread(() -> {
+            while(true) {

Review comment:
       ya there shouldn't be two while loops




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7180: FunctionAssignmentTailer should use its own thread

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7180:
URL: https://github.com/apache/pulsar/pull/7180#discussion_r436078520



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
##########
@@ -18,56 +18,85 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import java.io.IOException;
-import java.util.function.Function;
-
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.functions.proto.Function.Assignment;
 
-import lombok.extern.slf4j.Slf4j;
+import java.io.IOException;
 
 @Slf4j
-public class FunctionAssignmentTailer
-    implements java.util.function.Consumer<Message<byte[]>>, Function<Throwable, Void>, AutoCloseable {
+public class FunctionAssignmentTailer implements AutoCloseable {
 
     private final FunctionRuntimeManager functionRuntimeManager;
+    @Getter
     private final Reader<byte[]> reader;
-    private boolean closed = false;
+    private volatile boolean isRunning = false;
 
-    public FunctionAssignmentTailer(FunctionRuntimeManager functionRuntimeManager, Reader<byte[]> reader) {
+    private final Thread tailerThread;
+    
+    public FunctionAssignmentTailer(FunctionRuntimeManager functionRuntimeManager, ReaderBuilder readerBuilder, WorkerConfig workerConfig) throws PulsarClientException {
         this.functionRuntimeManager = functionRuntimeManager;
-        this.reader = reader;
-    }
+        
+        this.reader = readerBuilder
+          .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-runtime-manager")
+          .readerName(workerConfig.getWorkerId() + "-function-runtime-manager")
+          .topic(workerConfig.getFunctionAssignmentTopic())
+          .readCompacted(true)
+          .startMessageId(MessageId.earliest)
+          .create();
 
-    public void start() {
-        receiveOne();
+        this.tailerThread = new Thread(() -> {
+            while(isRunning) {
+                try {
+                    Message<byte[]> msg = reader.readNext();
+                    processAssignment(msg);
+                } catch (Exception e) {
+                    if (isRunning) {
+                        log.error("Encountered error in assignment tailer", e);
+
+                        // trigger fatal error
+                        // TODO add mechanism to notify main thread
+                    } else {
+                        if (!(e instanceof InterruptedException)) {
+                            log.warn("Encountered error when assignment tailer is not running", e);
+                        }
+                    }
+
+                }
+            }
+        });
+        this.tailerThread.setName("assignment-tailer-thread");
     }
 
-    private void receiveOne() {
-        reader.readNextAsync()
-                .thenAccept(this)
-                .exceptionally(this);
+    public void start() {
+        isRunning = true;
+        tailerThread.start();
     }
 
     @Override
     public void close() {
-        if (closed) {
-            return;
-        }
-        log.info("Stopping function state consumer");
+        log.info("Stopping function assignment tailer");
         try {
-            closed = true;
-            reader.close();
+            isRunning = false;
+            if (tailerThread != null && tailerThread.isAlive()) {
+                tailerThread.interrupt();
+            }
+            if (reader != null) {
+                reader.close();
+            }

Review comment:
       also one benefit of doing a join here would be that once the thread is done, we can make it null again.
   This way we could actually do a start/close multiple times.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7180: FunctionAssignmentTailer should use its own thread

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7180:
URL: https://github.com/apache/pulsar/pull/7180#discussion_r436124733



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
##########
@@ -18,56 +18,85 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import java.io.IOException;
-import java.util.function.Function;
-
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.functions.proto.Function.Assignment;
 
-import lombok.extern.slf4j.Slf4j;
+import java.io.IOException;
 
 @Slf4j
-public class FunctionAssignmentTailer
-    implements java.util.function.Consumer<Message<byte[]>>, Function<Throwable, Void>, AutoCloseable {
+public class FunctionAssignmentTailer implements AutoCloseable {
 
     private final FunctionRuntimeManager functionRuntimeManager;
+    @Getter
     private final Reader<byte[]> reader;
-    private boolean closed = false;
+    private volatile boolean isRunning = false;
 
-    public FunctionAssignmentTailer(FunctionRuntimeManager functionRuntimeManager, Reader<byte[]> reader) {
+    private final Thread tailerThread;
+    
+    public FunctionAssignmentTailer(FunctionRuntimeManager functionRuntimeManager, ReaderBuilder readerBuilder, WorkerConfig workerConfig) throws PulsarClientException {
         this.functionRuntimeManager = functionRuntimeManager;
-        this.reader = reader;
-    }
+        
+        this.reader = readerBuilder
+          .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-runtime-manager")
+          .readerName(workerConfig.getWorkerId() + "-function-runtime-manager")
+          .topic(workerConfig.getFunctionAssignmentTopic())
+          .readCompacted(true)
+          .startMessageId(MessageId.earliest)
+          .create();
 
-    public void start() {
-        receiveOne();
+        this.tailerThread = new Thread(() -> {
+            while(isRunning) {
+                try {
+                    Message<byte[]> msg = reader.readNext();
+                    processAssignment(msg);
+                } catch (Exception e) {
+                    if (isRunning) {
+                        log.error("Encountered error in assignment tailer", e);
+
+                        // trigger fatal error
+                        // TODO add mechanism to notify main thread
+                    } else {
+                        if (!(e instanceof InterruptedException)) {
+                            log.warn("Encountered error when assignment tailer is not running", e);
+                        }
+                    }
+
+                }
+            }
+        });
+        this.tailerThread.setName("assignment-tailer-thread");
     }
 
-    private void receiveOne() {
-        reader.readNextAsync()
-                .thenAccept(this)
-                .exceptionally(this);
+    public void start() {
+        isRunning = true;
+        tailerThread.start();
     }
 
     @Override
     public void close() {
-        if (closed) {
-            return;
-        }
-        log.info("Stopping function state consumer");
+        log.info("Stopping function assignment tailer");
         try {
-            closed = true;
-            reader.close();
+            isRunning = false;
+            if (tailerThread != null && tailerThread.isAlive()) {
+                tailerThread.interrupt();
+            }
+            if (reader != null) {
+                reader.close();
+            }

Review comment:
       Doing this right now makes the code more complicated since the reader needs to be used in FuntcionRuntimeManager




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7180: FunctionAssignmentTailer should use its own thread

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7180:
URL: https://github.com/apache/pulsar/pull/7180#discussion_r436052083



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
##########
@@ -18,56 +18,90 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import java.io.IOException;
-import java.util.function.Function;
-
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.functions.proto.Function.Assignment;
 
-import lombok.extern.slf4j.Slf4j;
+import java.io.IOException;
 
 @Slf4j
-public class FunctionAssignmentTailer
-    implements java.util.function.Consumer<Message<byte[]>>, Function<Throwable, Void>, AutoCloseable {
+public class FunctionAssignmentTailer implements AutoCloseable {
 
     private final FunctionRuntimeManager functionRuntimeManager;
+    @Getter
     private final Reader<byte[]> reader;
-    private boolean closed = false;
+    private volatile boolean isRunning = false;
 
-    public FunctionAssignmentTailer(FunctionRuntimeManager functionRuntimeManager, Reader<byte[]> reader) {
+    private final Thread tailerThread;
+    
+    public FunctionAssignmentTailer(FunctionRuntimeManager functionRuntimeManager, ReaderBuilder readerBuilder, WorkerConfig workerConfig) throws PulsarClientException {
         this.functionRuntimeManager = functionRuntimeManager;
-        this.reader = reader;
-    }
+        
+        this.reader = readerBuilder
+          .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-runtime-manager")
+          .readerName(workerConfig.getWorkerId() + "-function-runtime-manager")
+          .topic(workerConfig.getFunctionAssignmentTopic())
+          .readCompacted(true)
+          .startMessageId(MessageId.earliest)
+          .create();
 
-    public void start() {
-        receiveOne();
+        this.tailerThread = new Thread(() -> {
+            while(true) {
+                try {
+                    while(isRunning) {
+                        Message<byte[]> msg = reader.readNext();
+                        processAssignment(msg);
+                    }
+                } catch (Exception e) {
+                    if (isRunning) {
+                        log.error("Encountered error in assignment tailer", e);
+
+                        // trigger fatal error
+                        // TODO add mechanism to notify main thread
+                    } else {
+                        if (!(e instanceof InterruptedException)) {
+                            log.warn("Encountered error when assignment tailer is not running", e);
+                        }
+                    }
+
+                }
+            }
+        });
+        this.tailerThread.setName("assignment-tailer-thread");
     }
 
-    private void receiveOne() {
-        reader.readNextAsync()
-                .thenAccept(this)
-                .exceptionally(this);
+    public void start() {
+        isRunning = true;
+        tailerThread.start();
     }
 
     @Override
     public void close() {
-        if (closed) {
+        if (!isRunning) {
             return;
         }
-        log.info("Stopping function state consumer");
+        log.info("Stopping function assignment tailer");
         try {
-            closed = true;
-            reader.close();
+            isRunning = false;
+            if (tailerThread != null && tailerThread.isAlive()) {
+                tailerThread.interrupt();

Review comment:
       Should we just do
   isRunning = false;
   if (thread is alive) { interrupt(); }
   reader.close();
   much simpler




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7180: FunctionAssignmentTailer should use its own thread

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7180:
URL: https://github.com/apache/pulsar/pull/7180#discussion_r436071639



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
##########
@@ -18,56 +18,90 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import java.io.IOException;
-import java.util.function.Function;
-
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.functions.proto.Function.Assignment;
 
-import lombok.extern.slf4j.Slf4j;
+import java.io.IOException;
 
 @Slf4j
-public class FunctionAssignmentTailer
-    implements java.util.function.Consumer<Message<byte[]>>, Function<Throwable, Void>, AutoCloseable {
+public class FunctionAssignmentTailer implements AutoCloseable {
 
     private final FunctionRuntimeManager functionRuntimeManager;
+    @Getter
     private final Reader<byte[]> reader;
-    private boolean closed = false;
+    private volatile boolean isRunning = false;
 
-    public FunctionAssignmentTailer(FunctionRuntimeManager functionRuntimeManager, Reader<byte[]> reader) {
+    private final Thread tailerThread;
+    
+    public FunctionAssignmentTailer(FunctionRuntimeManager functionRuntimeManager, ReaderBuilder readerBuilder, WorkerConfig workerConfig) throws PulsarClientException {
         this.functionRuntimeManager = functionRuntimeManager;
-        this.reader = reader;
-    }
+        
+        this.reader = readerBuilder
+          .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-runtime-manager")
+          .readerName(workerConfig.getWorkerId() + "-function-runtime-manager")
+          .topic(workerConfig.getFunctionAssignmentTopic())
+          .readCompacted(true)
+          .startMessageId(MessageId.earliest)
+          .create();
 
-    public void start() {
-        receiveOne();
+        this.tailerThread = new Thread(() -> {
+            while(true) {
+                try {
+                    while(isRunning) {
+                        Message<byte[]> msg = reader.readNext();
+                        processAssignment(msg);
+                    }
+                } catch (Exception e) {
+                    if (isRunning) {
+                        log.error("Encountered error in assignment tailer", e);
+
+                        // trigger fatal error
+                        // TODO add mechanism to notify main thread
+                    } else {
+                        if (!(e instanceof InterruptedException)) {
+                            log.warn("Encountered error when assignment tailer is not running", e);
+                        }
+                    }
+
+                }
+            }
+        });
+        this.tailerThread.setName("assignment-tailer-thread");
     }
 
-    private void receiveOne() {
-        reader.readNextAsync()
-                .thenAccept(this)
-                .exceptionally(this);
+    public void start() {
+        isRunning = true;
+        tailerThread.start();
     }
 
     @Override
     public void close() {
-        if (closed) {
+        if (!isRunning) {
             return;
         }
-        log.info("Stopping function state consumer");
+        log.info("Stopping function assignment tailer");
         try {
-            closed = true;
-            reader.close();
+            isRunning = false;
+            if (tailerThread != null && tailerThread.isAlive()) {
+                tailerThread.interrupt();

Review comment:
       sure




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng merged pull request #7180: FunctionAssignmentTailer should use its own thread

Posted by GitBox <gi...@apache.org>.
jerrypeng merged pull request #7180:
URL: https://github.com/apache/pulsar/pull/7180


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7180: FunctionAssignmentTailer should use its own thread

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7180:
URL: https://github.com/apache/pulsar/pull/7180#discussion_r436118161



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
##########
@@ -18,56 +18,91 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import java.io.IOException;
-import java.util.function.Function;
-
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.functions.proto.Function.Assignment;
 
-import lombok.extern.slf4j.Slf4j;
+import java.io.IOException;
 
 @Slf4j
-public class FunctionAssignmentTailer
-    implements java.util.function.Consumer<Message<byte[]>>, Function<Throwable, Void>, AutoCloseable {
+public class FunctionAssignmentTailer implements AutoCloseable {
 
     private final FunctionRuntimeManager functionRuntimeManager;
+    @Getter
     private final Reader<byte[]> reader;
-    private boolean closed = false;
+    private volatile boolean isRunning = false;

Review comment:
       But we also have a method called "start"?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org