You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/04/19 20:41:23 UTC

[GitHub] [pulsar] david-streamlio opened a new pull request #10270: [Issue-10269] [pulsar-io] Pulsar IO Sink errors are not bubbled up correctly

david-streamlio opened a new pull request #10270:
URL: https://github.com/apache/pulsar/pull/10270


   Fixes #10269 
   
   ### Motivation
   
   When an Error or Exception occurs in write() method of the sink, the error is logged but no actions are performed.  We need the exception to be re-thrown so that the user/framework can be made aware of the underlying issue rather than having the Sink continuously failing silently, e.g. zombie sink instances that are running but not making any progress.
   
   ### Modifications
   
   Added logic to re-throw an exception if one is encountered inside the processResult() method of the JavaInstanceRunnable class 
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API: (no)
     - The schema: (no)
     - The default values of configurations: (no)
     - The wire protocol: (no)
     - The rest endpoints: (no)
     - The admin cli options: (no)
     - Anything that affects deployment: (no)
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (no)
     - If yes, how is the feature documented? (not applicable)
     - If a feature is not applicable for documentation, explain why? This is a bug fix to correct the behavior of a Sink when an exception is thrown
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] david-streamlio commented on a change in pull request #10270: [Issue-10269] [pulsar-io] Pulsar IO Sink errors are not bubbled up correctly

Posted by GitBox <gi...@apache.org>.
david-streamlio commented on a change in pull request #10270:
URL: https://github.com/apache/pulsar/pull/10270#discussion_r616229860



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
##########
@@ -322,6 +322,7 @@ private void processResult(Record srcRecord,
                         srcRecord, t);
                 stats.incrUserExceptions(t);
                 srcRecord.fail();
+                throw new RuntimeException(t);

Review comment:
       The call to the Sink write method is wrapped in a try/catch block that re-throws any exception it encounters. 
   https://github.com/apache/pulsar/blob/7a89212afabd044c922a039c897df966557dff56/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L345 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] david-streamlio commented on a change in pull request #10270: [Issue-10269] [pulsar-io] Pulsar IO Sink errors are not bubbled up correctly

Posted by GitBox <gi...@apache.org>.
david-streamlio commented on a change in pull request #10270:
URL: https://github.com/apache/pulsar/pull/10270#discussion_r616257032



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
##########
@@ -337,16 +344,18 @@ private void processResult(Record srcRecord,
         });
     }
 
-    private void sendOutputMessage(Record srcRecord, Object output) {
+    private void sendOutputMessage(Record srcRecord, Object output) throws Exception {
         if (!(this.sink instanceof PulsarSink)) {
             Thread.currentThread().setContextClassLoader(functionClassLoader);
         }
         try {
             this.sink.write(new SinkRecord<>(srcRecord, output));
         } catch (Exception e) {
+        	if (stats != null) {

Review comment:
       Added null check to make it consistent with the `readInput()` method exception handling




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] david-streamlio commented on a change in pull request #10270: [Issue-10269] [pulsar-io] Pulsar IO Sink errors are not bubbled up correctly

Posted by GitBox <gi...@apache.org>.
david-streamlio commented on a change in pull request #10270:
URL: https://github.com/apache/pulsar/pull/10270#discussion_r616257284



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
##########
@@ -337,16 +344,18 @@ private void processResult(Record srcRecord,
         });
     }
 
-    private void sendOutputMessage(Record srcRecord, Object output) {
+    private void sendOutputMessage(Record srcRecord, Object output) throws Exception {
         if (!(this.sink instanceof PulsarSink)) {
             Thread.currentThread().setContextClassLoader(functionClassLoader);
         }
         try {
             this.sink.write(new SinkRecord<>(srcRecord, output));
         } catch (Exception e) {
+        	if (stats != null) {
+              stats.incrSinkExceptions(e);
+        	}
             log.info("Encountered exception in sink write: ", e);
-            stats.incrSinkExceptions(e);
-            throw new RuntimeException(e);
+            throw e;

Review comment:
       Avoid wrapping the real exception with a new Exception type. It just hides the real issue




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] david-streamlio commented on a change in pull request #10270: [Issue-10269] [pulsar-io] Pulsar IO Sink errors are not bubbled up correctly

Posted by GitBox <gi...@apache.org>.
david-streamlio commented on a change in pull request #10270:
URL: https://github.com/apache/pulsar/pull/10270#discussion_r616998062



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
##########
@@ -269,9 +270,11 @@ public void run() {
 
                 try {
                     processResult(currentRecord, result);
-                } catch (Exception e) {
+                    result.join();

Review comment:
       @jerrypeng Agreed on both points. However, we still need to address the async use case scenario. I am thinking that a shared data struct can be used by the Futures to communicate exceptions, and the main thread can check the data struct for the presence of any errors and if one is found raise the exception? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] david-streamlio commented on a change in pull request #10270: [Issue-10269] [pulsar-io] Pulsar IO Sink errors are not bubbled up correctly

Posted by GitBox <gi...@apache.org>.
david-streamlio commented on a change in pull request #10270:
URL: https://github.com/apache/pulsar/pull/10270#discussion_r618603122



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
##########
@@ -290,63 +318,63 @@ public void run() {
         }
     }
 
-    private void setupStateStore() throws Exception {
-        this.stateManager = new InstanceStateManager();
-
-        if (null == stateStorageServiceUrl) {
-            stateStoreProvider = StateStoreProvider.NULL;
-        } else {
-            stateStoreProvider = new BKStateStoreProviderImpl();
-            Map<String, Object> stateStoreProviderConfig = new HashMap();
-            stateStoreProviderConfig.put(BKStateStoreProviderImpl.STATE_STORAGE_SERVICE_URL, stateStorageServiceUrl);
-            stateStoreProvider.init(stateStoreProviderConfig, instanceConfig.getFunctionDetails());
-
-            StateStore store = stateStoreProvider.getStateStore(
-                instanceConfig.getFunctionDetails().getTenant(),
-                instanceConfig.getFunctionDetails().getNamespace(),
-                instanceConfig.getFunctionDetails().getName()
-            );
-            StateStoreContext context = new StateStoreContextImpl();
-            store.init(context);
-
-            stateManager.registerStore(store);
-        }
-    }
-
-    private void processResult(Record srcRecord,
-                               CompletableFuture<JavaExecutionResult> result) throws Exception {
-        result.whenComplete((result1, throwable) -> {
-            if (throwable != null || result1.getUserException() != null) {
-                Throwable t = throwable != null ? throwable : result1.getUserException();
-                log.warn("Encountered exception when processing message {}",
-                        srcRecord, t);
-                stats.incrUserExceptions(t);
-                srcRecord.fail();
-            } else {
-                if (result1.getResult() != null) {
-                    sendOutputMessage(srcRecord, result1.getResult());
-                } else {
-                    if (instanceConfig.getFunctionDetails().getAutoAck()) {
-                        // the function doesn't produce any result or the user doesn't want the result.
-                        srcRecord.ack();
-                    }
-                }
-                // increment total successfully processed
-                stats.incrTotalProcessedSuccessfully();
+    @SuppressWarnings("rawtypes")
+	private void processResult(Record srcRecord, JavaExecutionResult result) throws Exception {
+    	
+    	if (result.getUserException() != null) {
+    		log.warn("Encountered exception when processing message {}",
+                    srcRecord, result.getUserException());
+            stats.incrUserExceptions(result.getUserException());
+            throw result.getUserException();
+    	} 
+    	
+    	if (result.getSystemException() != null) {
+    		log.warn("Encountered exception when processing message {}",
+                    srcRecord, result.getSystemException());
+            stats.incrSysExceptions(result.getSystemException());
+            throw result.getSystemException();
+    	}
+    	
+    	if (result.getResult() == null) {
+    		if (instanceConfig.getFunctionDetails().getAutoAck()) {
+                // the function doesn't produce any result or the user doesn't want the result.
+                srcRecord.ack();
             }
-        });
+    	} else { 
+    		
+    		try {
+				Object output = (result.getResult() instanceof CompletableFuture) ?
+					((CompletableFuture)result.getResult()).get() : result.getResult();
+				
+				sendOutputMessage(srcRecord, output);
+				
+				if (instanceConfig.getFunctionDetails().getAutoAck()) {
+					srcRecord.ack();
+				}
+				stats.incrTotalProcessedSuccessfully();
+				
+			} catch (InterruptedException | ExecutionException e) {
+				log.warn("Encountered exception when processing message {}",

Review comment:
       @eolivelli Just to clarify, you want to call `Thread.currentThread().interrupt()` when we encounter an `InterruptedException`?  Why? and why only for that exception an not the `ExecutionException` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #10270: [Issue-10269] [pulsar-io] Pulsar IO Sink errors are not bubbled up correctly

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #10270:
URL: https://github.com/apache/pulsar/pull/10270#discussion_r616985234



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
##########
@@ -269,9 +270,11 @@ public void run() {
 
                 try {
                     processResult(currentRecord, result);
-                } catch (Exception e) {
+                    result.join();

Review comment:
       it is also pointless for create new CompletableFutures for non-async functions here:
   
   https://github.com/apache/pulsar/blob/master/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java#L74
   
   leads to more GC




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] david-streamlio commented on a change in pull request #10270: [Issue-10269] [pulsar-io] Pulsar IO Sink errors are not bubbled up correctly

Posted by GitBox <gi...@apache.org>.
david-streamlio commented on a change in pull request #10270:
URL: https://github.com/apache/pulsar/pull/10270#discussion_r616936511



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
##########
@@ -269,9 +270,11 @@ public void run() {
 
                 try {
                     processResult(currentRecord, result);
-                } catch (Exception e) {
+                    result.join();

Review comment:
       Yes, but we need a way to interrogate the Future task to determine if an exception was thrown. Otherwise, we won't be able to tell if an exception occurred at all.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10270: [Issue-10269] [pulsar-io] Pulsar IO Sink errors are not bubbled up correctly

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10270:
URL: https://github.com/apache/pulsar/pull/10270#discussion_r618639963



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
##########
@@ -290,63 +318,63 @@ public void run() {
         }
     }
 
-    private void setupStateStore() throws Exception {
-        this.stateManager = new InstanceStateManager();
-
-        if (null == stateStorageServiceUrl) {
-            stateStoreProvider = StateStoreProvider.NULL;
-        } else {
-            stateStoreProvider = new BKStateStoreProviderImpl();
-            Map<String, Object> stateStoreProviderConfig = new HashMap();
-            stateStoreProviderConfig.put(BKStateStoreProviderImpl.STATE_STORAGE_SERVICE_URL, stateStorageServiceUrl);
-            stateStoreProvider.init(stateStoreProviderConfig, instanceConfig.getFunctionDetails());
-
-            StateStore store = stateStoreProvider.getStateStore(
-                instanceConfig.getFunctionDetails().getTenant(),
-                instanceConfig.getFunctionDetails().getNamespace(),
-                instanceConfig.getFunctionDetails().getName()
-            );
-            StateStoreContext context = new StateStoreContextImpl();
-            store.init(context);
-
-            stateManager.registerStore(store);
-        }
-    }
-
-    private void processResult(Record srcRecord,
-                               CompletableFuture<JavaExecutionResult> result) throws Exception {
-        result.whenComplete((result1, throwable) -> {
-            if (throwable != null || result1.getUserException() != null) {
-                Throwable t = throwable != null ? throwable : result1.getUserException();
-                log.warn("Encountered exception when processing message {}",
-                        srcRecord, t);
-                stats.incrUserExceptions(t);
-                srcRecord.fail();
-            } else {
-                if (result1.getResult() != null) {
-                    sendOutputMessage(srcRecord, result1.getResult());
-                } else {
-                    if (instanceConfig.getFunctionDetails().getAutoAck()) {
-                        // the function doesn't produce any result or the user doesn't want the result.
-                        srcRecord.ack();
-                    }
-                }
-                // increment total successfully processed
-                stats.incrTotalProcessedSuccessfully();
+    @SuppressWarnings("rawtypes")
+	private void processResult(Record srcRecord, JavaExecutionResult result) throws Exception {
+    	
+    	if (result.getUserException() != null) {
+    		log.warn("Encountered exception when processing message {}",
+                    srcRecord, result.getUserException());
+            stats.incrUserExceptions(result.getUserException());
+            throw result.getUserException();
+    	} 
+    	
+    	if (result.getSystemException() != null) {
+    		log.warn("Encountered exception when processing message {}",
+                    srcRecord, result.getSystemException());
+            stats.incrSysExceptions(result.getSystemException());
+            throw result.getSystemException();
+    	}
+    	
+    	if (result.getResult() == null) {
+    		if (instanceConfig.getFunctionDetails().getAutoAck()) {
+                // the function doesn't produce any result or the user doesn't want the result.
+                srcRecord.ack();
             }
-        });
+    	} else { 
+    		
+    		try {
+				Object output = (result.getResult() instanceof CompletableFuture) ?
+					((CompletableFuture)result.getResult()).get() : result.getResult();
+				
+				sendOutputMessage(srcRecord, output);
+				
+				if (instanceConfig.getFunctionDetails().getAutoAck()) {
+					srcRecord.ack();
+				}
+				stats.incrTotalProcessedSuccessfully();
+				
+			} catch (InterruptedException | ExecutionException e) {
+				log.warn("Encountered exception when processing message {}",

Review comment:
       because it is the standard Java practice.
   when you catch a InterruptedException the "interrupted" flag is reset on the Thread and you have to set it again in order to let the information bubble up to the caller.
   
   > why only for that exception an not the ExecutionException
   because ExecutionException is not the special (and annoying) InterruptedException




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] david-streamlio closed pull request #10270: [Issue-10269] [pulsar-io] Pulsar IO Sink errors are not bubbled up correctly

Posted by GitBox <gi...@apache.org>.
david-streamlio closed pull request #10270:
URL: https://github.com/apache/pulsar/pull/10270


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] david-streamlio commented on a change in pull request #10270: [Issue-10269] [pulsar-io] Pulsar IO Sink errors are not bubbled up correctly

Posted by GitBox <gi...@apache.org>.
david-streamlio commented on a change in pull request #10270:
URL: https://github.com/apache/pulsar/pull/10270#discussion_r616229860



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
##########
@@ -322,6 +322,7 @@ private void processResult(Record srcRecord,
                         srcRecord, t);
                 stats.incrUserExceptions(t);
                 srcRecord.fail();
+                throw new RuntimeException(t);

Review comment:
       The call to the Sink write method is wrapped in a try/catch block that re-throws any exception it encounters. 
   https://github.com/apache/pulsar/blob/7a89212afabd044c922a039c897df966557dff56/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L345 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] david-streamlio commented on a change in pull request #10270: [Issue-10269] [pulsar-io] Pulsar IO Sink errors are not bubbled up correctly

Posted by GitBox <gi...@apache.org>.
david-streamlio commented on a change in pull request #10270:
URL: https://github.com/apache/pulsar/pull/10270#discussion_r616257832



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
##########
@@ -324,7 +327,11 @@ private void processResult(Record srcRecord,
                 srcRecord.fail();
             } else {
                 if (result1.getResult() != null) {
-                    sendOutputMessage(srcRecord, result1.getResult());
+                    try {
+						sendOutputMessage(srcRecord, result1.getResult());
+					} catch (Exception ex) {
+						throw new CompletionException(ex);

Review comment:
       Solution based roughly on [this](https://stackoverflow.com/questions/44409962/throwing-exception-from-completablefuture) suggestion. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10270: [Issue-10269] [pulsar-io] Pulsar IO Sink errors are not bubbled up correctly

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10270:
URL: https://github.com/apache/pulsar/pull/10270#discussion_r618112537



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
##########
@@ -65,64 +65,92 @@ public JavaInstance(ContextImpl contextImpl, Object userClassObject, InstanceCon
             this.javaUtilFunction = (java.util.function.Function) userClassObject;
         }
     }
-
-    public CompletableFuture<JavaExecutionResult> handleMessage(Record<?> record, Object input) {
-        if (context != null) {
-            context.setCurrentMessageContext(record);
+    
+    /**
+     * Invokes the function code against the given input data.
+     * 
+     * @param input - The input data provided to the Function code
+     * @return An ExecutionResult object that contains the function result along with any user exceptions
+     * that occurred when executing the Function code.
+     */
+    @SuppressWarnings("unchecked")
+	private JavaExecutionResult executeFunction(Object input) {
+    	JavaExecutionResult executionResult = new JavaExecutionResult();
+    	
+        try { 	
+        	Object result = (function != null) ? function.process(input, context) :
+        		javaUtilFunction.apply(input);
+        	
+            executionResult.setResult(result);  
+        } catch (Exception ex) {
+            executionResult.setUserException(ex);

Review comment:
       should we handle explicitly InterruptedException ? and call Thread.currentThread().interrupt() ?

##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
##########
@@ -290,63 +318,63 @@ public void run() {
         }
     }
 
-    private void setupStateStore() throws Exception {
-        this.stateManager = new InstanceStateManager();
-
-        if (null == stateStorageServiceUrl) {
-            stateStoreProvider = StateStoreProvider.NULL;
-        } else {
-            stateStoreProvider = new BKStateStoreProviderImpl();
-            Map<String, Object> stateStoreProviderConfig = new HashMap();
-            stateStoreProviderConfig.put(BKStateStoreProviderImpl.STATE_STORAGE_SERVICE_URL, stateStorageServiceUrl);
-            stateStoreProvider.init(stateStoreProviderConfig, instanceConfig.getFunctionDetails());
-
-            StateStore store = stateStoreProvider.getStateStore(
-                instanceConfig.getFunctionDetails().getTenant(),
-                instanceConfig.getFunctionDetails().getNamespace(),
-                instanceConfig.getFunctionDetails().getName()
-            );
-            StateStoreContext context = new StateStoreContextImpl();
-            store.init(context);
-
-            stateManager.registerStore(store);
-        }
-    }
-
-    private void processResult(Record srcRecord,
-                               CompletableFuture<JavaExecutionResult> result) throws Exception {
-        result.whenComplete((result1, throwable) -> {
-            if (throwable != null || result1.getUserException() != null) {
-                Throwable t = throwable != null ? throwable : result1.getUserException();
-                log.warn("Encountered exception when processing message {}",
-                        srcRecord, t);
-                stats.incrUserExceptions(t);
-                srcRecord.fail();
-            } else {
-                if (result1.getResult() != null) {
-                    sendOutputMessage(srcRecord, result1.getResult());
-                } else {
-                    if (instanceConfig.getFunctionDetails().getAutoAck()) {
-                        // the function doesn't produce any result or the user doesn't want the result.
-                        srcRecord.ack();
-                    }
-                }
-                // increment total successfully processed
-                stats.incrTotalProcessedSuccessfully();
+    @SuppressWarnings("rawtypes")
+	private void processResult(Record srcRecord, JavaExecutionResult result) throws Exception {
+    	
+    	if (result.getUserException() != null) {
+    		log.warn("Encountered exception when processing message {}",
+                    srcRecord, result.getUserException());
+            stats.incrUserExceptions(result.getUserException());
+            throw result.getUserException();
+    	} 
+    	
+    	if (result.getSystemException() != null) {
+    		log.warn("Encountered exception when processing message {}",
+                    srcRecord, result.getSystemException());
+            stats.incrSysExceptions(result.getSystemException());
+            throw result.getSystemException();
+    	}
+    	
+    	if (result.getResult() == null) {
+    		if (instanceConfig.getFunctionDetails().getAutoAck()) {
+                // the function doesn't produce any result or the user doesn't want the result.
+                srcRecord.ack();
             }
-        });
+    	} else { 
+    		
+    		try {
+				Object output = (result.getResult() instanceof CompletableFuture) ?
+					((CompletableFuture)result.getResult()).get() : result.getResult();
+				
+				sendOutputMessage(srcRecord, output);
+				
+				if (instanceConfig.getFunctionDetails().getAutoAck()) {
+					srcRecord.ack();
+				}
+				stats.incrTotalProcessedSuccessfully();
+				
+			} catch (InterruptedException | ExecutionException e) {
+				log.warn("Encountered exception when processing message {}",

Review comment:
       please handle InterruptedException

##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
##########
@@ -65,64 +65,92 @@ public JavaInstance(ContextImpl contextImpl, Object userClassObject, InstanceCon
             this.javaUtilFunction = (java.util.function.Function) userClassObject;
         }
     }
-
-    public CompletableFuture<JavaExecutionResult> handleMessage(Record<?> record, Object input) {
-        if (context != null) {
-            context.setCurrentMessageContext(record);
+    
+    /**
+     * Invokes the function code against the given input data.
+     * 
+     * @param input - The input data provided to the Function code
+     * @return An ExecutionResult object that contains the function result along with any user exceptions
+     * that occurred when executing the Function code.
+     */
+    @SuppressWarnings("unchecked")
+	private JavaExecutionResult executeFunction(Object input) {
+    	JavaExecutionResult executionResult = new JavaExecutionResult();
+    	
+        try { 	
+        	Object result = (function != null) ? function.process(input, context) :
+        		javaUtilFunction.apply(input);
+        	
+            executionResult.setResult(result);  
+        } catch (Exception ex) {
+            executionResult.setUserException(ex);
+        } 
+    	
+    	return executionResult;
+    }
+    
+    /**
+     * If the Function code returns a CompletableFuture object, then this method is used to handle that by
+     * adding the async request to the internal pendingAsyncRequests queue, specifying the logic to be executed
+     * once the function execution is complete.
+     * 
+     * @param future
+     * @param executionResult
+     */
+    @SuppressWarnings("unchecked")
+	private void handleAsync(@SuppressWarnings("rawtypes") CompletableFuture future, JavaExecutionResult executionResult) {
+    	try {
+            pendingAsyncRequests.put(future);
+        } catch (InterruptedException ie) {
+            log.warn("Exception while put Async requests", ie);

Review comment:
       Thread.currentThread().interrupt() ?

##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
##########
@@ -269,9 +270,11 @@ public void run() {
 
                 try {
                     processResult(currentRecord, result);
-                } catch (Exception e) {
+                    result.join();
+                } catch (CompletionException e) {
                     log.warn("Failed to process result of message {}", currentRecord, e);
                     currentRecord.fail();
+                    throw e.getCause();

Review comment:
       please also handle InterruptedException as well




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #10270: [Issue-10269] [pulsar-io] Pulsar IO Sink errors are not bubbled up correctly

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #10270:
URL: https://github.com/apache/pulsar/pull/10270#issuecomment-824575600


   Integration tests are failing, they are errors that I have never seen before, probably they are due to this patch


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on a change in pull request #10270: [Issue-10269] [pulsar-io] Pulsar IO Sink errors are not bubbled up correctly

Posted by GitBox <gi...@apache.org>.
sijie commented on a change in pull request #10270:
URL: https://github.com/apache/pulsar/pull/10270#discussion_r616429782



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
##########
@@ -269,9 +270,11 @@ public void run() {
 
                 try {
                     processResult(currentRecord, result);
-                } catch (Exception e) {
+                    result.join();

Review comment:
       This seems to have a huge impact on performance, no?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #10270: [Issue-10269] [pulsar-io] Pulsar IO Sink errors are not bubbled up correctly

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #10270:
URL: https://github.com/apache/pulsar/pull/10270#discussion_r616984700



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
##########
@@ -269,9 +270,11 @@ public void run() {
 
                 try {
                     processResult(currentRecord, result);
-                } catch (Exception e) {
+                    result.join();

Review comment:
       @david-streamlio I would recommend changing JavaInstance.handeMessage():
   
   https://github.com/apache/pulsar/blob/master/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java#L69
   
   I don't think it is appropriate to always return a CompletableFuture<JavaExecutionResult>.  We do not need to return a CompletableFuture for functions that are not implemented to use async.  I would recommend  modifying to existing class JavaExecutionResult to have options on whether to return a completable future result or just the result.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] david-streamlio commented on pull request #10270: [Issue-10269] [pulsar-io] Pulsar IO Sink errors are not bubbled up correctly

Posted by GitBox <gi...@apache.org>.
david-streamlio commented on pull request #10270:
URL: https://github.com/apache/pulsar/pull/10270#issuecomment-824463594


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] david-streamlio closed pull request #10270: [Issue-10269] [pulsar-io] Pulsar IO Sink errors are not bubbled up correctly

Posted by GitBox <gi...@apache.org>.
david-streamlio closed pull request #10270:
URL: https://github.com/apache/pulsar/pull/10270


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] david-streamlio commented on a change in pull request #10270: [Issue-10269] [pulsar-io] Pulsar IO Sink errors are not bubbled up correctly

Posted by GitBox <gi...@apache.org>.
david-streamlio commented on a change in pull request #10270:
URL: https://github.com/apache/pulsar/pull/10270#discussion_r616257540



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
##########
@@ -337,16 +344,18 @@ private void processResult(Record srcRecord,
         });
     }
 
-    private void sendOutputMessage(Record srcRecord, Object output) {
+    private void sendOutputMessage(Record srcRecord, Object output) throws Exception {

Review comment:
       Make it explicit that the method can throw an exception.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #10270: [Issue-10269] [pulsar-io] Pulsar IO Sink errors are not bubbled up correctly

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #10270:
URL: https://github.com/apache/pulsar/pull/10270#discussion_r616207484



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
##########
@@ -322,6 +322,7 @@ private void processResult(Record srcRecord,
                         srcRecord, t);
                 stats.incrUserExceptions(t);
                 srcRecord.fail();
+                throw new RuntimeException(t);

Review comment:
       The error here is an exception that happened in the user function not sink.  Also because it is in the result.whenComplete clause, anything thrown in there will not be bubbled to the main thread




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10270: [Issue-10269] [pulsar-io] Pulsar IO Sink errors are not bubbled up correctly

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10270:
URL: https://github.com/apache/pulsar/pull/10270#discussion_r618703404



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
##########
@@ -65,64 +66,96 @@ public JavaInstance(ContextImpl contextImpl, Object userClassObject, InstanceCon
             this.javaUtilFunction = (java.util.function.Function) userClassObject;
         }
     }
-
-    public CompletableFuture<JavaExecutionResult> handleMessage(Record<?> record, Object input) {
-        if (context != null) {
-            context.setCurrentMessageContext(record);
-        }
-
-        final CompletableFuture<JavaExecutionResult> future = new CompletableFuture<>();
-        JavaExecutionResult executionResult = new JavaExecutionResult();
-
-        final Object output;
-
-        try {
-            if (function != null) {
-                output = function.process(input, context);
-            } else {
-                output = javaUtilFunction.apply(input);
-            }
+    
+    /**
+     * Invokes the function code against the given input data.
+     * 
+     * @param input - The input data provided to the Function code
+     * @return An ExecutionResult object that contains the function result along with any user exceptions
+     * that occurred when executing the Function code.
+     */
+    @SuppressWarnings("unchecked")
+	private JavaExecutionResult executeFunction(Object input) {
+    	JavaExecutionResult executionResult = new JavaExecutionResult();
+    	
+        try { 	
+        	final Object result = (function != null) ? function.process(input, context) :
+        		javaUtilFunction.apply(input);
+        	
+            executionResult.setResult(result);  
         } catch (Exception ex) {
             executionResult.setUserException(ex);
-            future.complete(executionResult);
-            return future;
+        } 
+    	
+    	return executionResult;
+    }
+    
+    /**
+     * Used to handle asynchronous function requests.
+     * 
+     * @param future - The CompleteableFuture returned from the async function call.
+     * @param executionResult 
+     */
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+	private void handleAsync(final CompletableFuture future, JavaExecutionResult executionResult) {
+    	try {
+            pendingAsyncRequests.put(future);
+        } catch (InterruptedException ie) {
+            log.warn("Exception while put Async requests", ie);
+            executionResult.setUserException(ie);
+            Thread.currentThread().interrupt();
         }
 
-        if (output instanceof CompletableFuture) {
-            // Function is in format: Function<I, CompletableFuture<O>>
-            try {
-                pendingAsyncRequests.put((CompletableFuture) output);
-            } catch (InterruptedException ie) {
-                log.warn("Exception while put Async requests", ie);
-                executionResult.setUserException(ie);
-                future.complete(executionResult);
-                return future;
+        future.whenCompleteAsync((functionResult, throwable) -> {
+            if (log.isDebugEnabled()) {
+              log.debug("Got result async: object: {}, throwable: {}", functionResult, throwable);
+            }
+            
+            if (throwable != null) {
+              executionResult.setUserException(new Exception((Throwable)throwable));      
             }
+          
+            pendingAsyncRequests.remove(future);
+            future.complete(functionResult);
+            
+        }, executor);
+        
+        
+		try {
+			executionResult.setResult(future.get());
+		} catch (InterruptedException iEx) {
+			Thread.currentThread().interrupt();

Review comment:
       I am not sure about this case.
   
   should we call `executionResult.setUserException(iEx);` ?
   
   in theory it is not user code that threw this exeception, do we have a way to report it as System Exception ? 
   btw we should "break" and throw an error or at least log it

##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
##########
@@ -65,64 +66,96 @@ public JavaInstance(ContextImpl contextImpl, Object userClassObject, InstanceCon
             this.javaUtilFunction = (java.util.function.Function) userClassObject;
         }
     }
-
-    public CompletableFuture<JavaExecutionResult> handleMessage(Record<?> record, Object input) {
-        if (context != null) {
-            context.setCurrentMessageContext(record);
-        }
-
-        final CompletableFuture<JavaExecutionResult> future = new CompletableFuture<>();
-        JavaExecutionResult executionResult = new JavaExecutionResult();
-
-        final Object output;
-
-        try {
-            if (function != null) {
-                output = function.process(input, context);
-            } else {
-                output = javaUtilFunction.apply(input);
-            }
+    
+    /**
+     * Invokes the function code against the given input data.
+     * 
+     * @param input - The input data provided to the Function code
+     * @return An ExecutionResult object that contains the function result along with any user exceptions
+     * that occurred when executing the Function code.
+     */
+    @SuppressWarnings("unchecked")
+	private JavaExecutionResult executeFunction(Object input) {
+    	JavaExecutionResult executionResult = new JavaExecutionResult();
+    	
+        try { 	
+        	final Object result = (function != null) ? function.process(input, context) :
+        		javaUtilFunction.apply(input);
+        	
+            executionResult.setResult(result);  
         } catch (Exception ex) {
             executionResult.setUserException(ex);
-            future.complete(executionResult);
-            return future;
+        } 
+    	
+    	return executionResult;
+    }
+    
+    /**
+     * Used to handle asynchronous function requests.
+     * 
+     * @param future - The CompleteableFuture returned from the async function call.
+     * @param executionResult 
+     */
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+	private void handleAsync(final CompletableFuture future, JavaExecutionResult executionResult) {
+    	try {
+            pendingAsyncRequests.put(future);
+        } catch (InterruptedException ie) {
+            log.warn("Exception while put Async requests", ie);
+            executionResult.setUserException(ie);
+            Thread.currentThread().interrupt();
         }
 
-        if (output instanceof CompletableFuture) {
-            // Function is in format: Function<I, CompletableFuture<O>>
-            try {
-                pendingAsyncRequests.put((CompletableFuture) output);
-            } catch (InterruptedException ie) {
-                log.warn("Exception while put Async requests", ie);
-                executionResult.setUserException(ie);
-                future.complete(executionResult);
-                return future;
+        future.whenCompleteAsync((functionResult, throwable) -> {
+            if (log.isDebugEnabled()) {
+              log.debug("Got result async: object: {}, throwable: {}", functionResult, throwable);
+            }
+            
+            if (throwable != null) {
+              executionResult.setUserException(new Exception((Throwable)throwable));      
             }
+          
+            pendingAsyncRequests.remove(future);
+            future.complete(functionResult);
+            
+        }, executor);
+        
+        
+		try {
+			executionResult.setResult(future.get());
+		} catch (InterruptedException iEx) {
+			Thread.currentThread().interrupt();
+		} catch (ExecutionException eEx) {
+			executionResult.setUserException(eEx);

Review comment:
       `eEx.getCause()` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] david-streamlio commented on a change in pull request #10270: [Issue-10269] [pulsar-io] Pulsar IO Sink errors are not bubbled up correctly

Posted by GitBox <gi...@apache.org>.
david-streamlio commented on a change in pull request #10270:
URL: https://github.com/apache/pulsar/pull/10270#discussion_r616257932



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
##########
@@ -269,9 +270,11 @@ public void run() {
 
                 try {
                     processResult(currentRecord, result);
-                } catch (Exception e) {
+                    result.join();
+                } catch (CompletionException e) {
                     log.warn("Failed to process result of message {}", currentRecord, e);
                     currentRecord.fail();
+                    throw e.getCause();

Review comment:
       Re-throw the real exception




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org