You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/05/17 22:51:15 UTC

[GitHub] [pulsar] sijie opened a new pull request #10618: [Functions] Process async results in the same Java runnable thread

sijie opened a new pull request #10618:
URL: https://github.com/apache/pulsar/pull/10618


   *Motivation*
   
   After introducing the support for async functions, the java function processing semantic is not enforced.
   For example, if it fails to write a sink, it doesn't fail the java instance or fail the message. Hence it keeps
   receiving messages but never ack or nack.
   
   *Modification*
   
   Change the way how aysnc function requests are processed to fix the issues we have seen in Kinesis connector.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on a change in pull request #10618: [Functions] Process async results in the same Java runnable thread

Posted by GitBox <gi...@apache.org>.
sijie commented on a change in pull request #10618:
URL: https://github.com/apache/pulsar/pull/10618#discussion_r633949502



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
##########
@@ -272,11 +273,12 @@ public void run() {
 
                 removeLogTopicHandler();
 
-                try {
-                    processResult(currentRecord, result);
-                } catch (Exception e) {
-                    log.warn("Failed to process result of message {}", currentRecord, e);
-                    currentRecord.fail();
+                // process the synchronous results
+                if (result != null) {
+                    handleResult(currentRecord, result);
+                } else {
+                    // process the asynchronous results
+                    processAsyncResults();

Review comment:
       @nlu90 Nice catch! I have addressed your comment in commit 326ae15




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] nlu90 commented on a change in pull request #10618: [Functions] Process async results in the same Java runnable thread

Posted by GitBox <gi...@apache.org>.
nlu90 commented on a change in pull request #10618:
URL: https://github.com/apache/pulsar/pull/10618#discussion_r634010528



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
##########
@@ -318,28 +321,29 @@ private void setupStateStore() throws Exception {
         }
     }
 
-    private void processResult(Record srcRecord,
-                               CompletableFuture<JavaExecutionResult> result) throws Exception {
-        result.whenComplete((result1, throwable) -> {
-            if (throwable != null || result1.getUserException() != null) {
-                Throwable t = throwable != null ? throwable : result1.getUserException();
-                log.warn("Encountered exception when processing message {}",
-                        srcRecord, t);
-                stats.incrUserExceptions(t);
-                srcRecord.fail();
+    private void processAsyncResults() throws InterruptedException {

Review comment:
       no need for this empty method




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] nlu90 commented on a change in pull request #10618: [Functions] Process async results in the same Java runnable thread

Posted by GitBox <gi...@apache.org>.
nlu90 commented on a change in pull request #10618:
URL: https://github.com/apache/pulsar/pull/10618#discussion_r633929657



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
##########
@@ -272,11 +273,12 @@ public void run() {
 
                 removeLogTopicHandler();
 
-                try {
-                    processResult(currentRecord, result);
-                } catch (Exception e) {
-                    log.warn("Failed to process result of message {}", currentRecord, e);
-                    currentRecord.fail();
+                // process the synchronous results
+                if (result != null) {
+                    handleResult(currentRecord, result);
+                } else {
+                    // process the asynchronous results
+                    processAsyncResults();

Review comment:
       The `null` result indicates the current record is placed into the queue for async processing.
   
   The `processAsyncResults()` method takes the first element from the queue and checks/proceeds with the result.
   
   1. It seems a newly added record triggers the check of whether the oldest added record is done. This behavior is undesirable since records are independent. 
   2. I'm wondering there'll always be one element left in the queue regardless whether it's done or not.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #10618: [Functions] Process async results in the same Java runnable thread

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #10618:
URL: https://github.com/apache/pulsar/pull/10618


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org