You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/04/22 06:26:42 UTC

[GitHub] [pulsar] eolivelli commented on a change in pull request #10270: [Issue-10269] [pulsar-io] Pulsar IO Sink errors are not bubbled up correctly

eolivelli commented on a change in pull request #10270:
URL: https://github.com/apache/pulsar/pull/10270#discussion_r618112537



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
##########
@@ -65,64 +65,92 @@ public JavaInstance(ContextImpl contextImpl, Object userClassObject, InstanceCon
             this.javaUtilFunction = (java.util.function.Function) userClassObject;
         }
     }
-
-    public CompletableFuture<JavaExecutionResult> handleMessage(Record<?> record, Object input) {
-        if (context != null) {
-            context.setCurrentMessageContext(record);
+    
+    /**
+     * Invokes the function code against the given input data.
+     * 
+     * @param input - The input data provided to the Function code
+     * @return An ExecutionResult object that contains the function result along with any user exceptions
+     * that occurred when executing the Function code.
+     */
+    @SuppressWarnings("unchecked")
+	private JavaExecutionResult executeFunction(Object input) {
+    	JavaExecutionResult executionResult = new JavaExecutionResult();
+    	
+        try { 	
+        	Object result = (function != null) ? function.process(input, context) :
+        		javaUtilFunction.apply(input);
+        	
+            executionResult.setResult(result);  
+        } catch (Exception ex) {
+            executionResult.setUserException(ex);

Review comment:
       should we handle explicitly InterruptedException ? and call Thread.currentThread().interrupt() ?

##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
##########
@@ -290,63 +318,63 @@ public void run() {
         }
     }
 
-    private void setupStateStore() throws Exception {
-        this.stateManager = new InstanceStateManager();
-
-        if (null == stateStorageServiceUrl) {
-            stateStoreProvider = StateStoreProvider.NULL;
-        } else {
-            stateStoreProvider = new BKStateStoreProviderImpl();
-            Map<String, Object> stateStoreProviderConfig = new HashMap();
-            stateStoreProviderConfig.put(BKStateStoreProviderImpl.STATE_STORAGE_SERVICE_URL, stateStorageServiceUrl);
-            stateStoreProvider.init(stateStoreProviderConfig, instanceConfig.getFunctionDetails());
-
-            StateStore store = stateStoreProvider.getStateStore(
-                instanceConfig.getFunctionDetails().getTenant(),
-                instanceConfig.getFunctionDetails().getNamespace(),
-                instanceConfig.getFunctionDetails().getName()
-            );
-            StateStoreContext context = new StateStoreContextImpl();
-            store.init(context);
-
-            stateManager.registerStore(store);
-        }
-    }
-
-    private void processResult(Record srcRecord,
-                               CompletableFuture<JavaExecutionResult> result) throws Exception {
-        result.whenComplete((result1, throwable) -> {
-            if (throwable != null || result1.getUserException() != null) {
-                Throwable t = throwable != null ? throwable : result1.getUserException();
-                log.warn("Encountered exception when processing message {}",
-                        srcRecord, t);
-                stats.incrUserExceptions(t);
-                srcRecord.fail();
-            } else {
-                if (result1.getResult() != null) {
-                    sendOutputMessage(srcRecord, result1.getResult());
-                } else {
-                    if (instanceConfig.getFunctionDetails().getAutoAck()) {
-                        // the function doesn't produce any result or the user doesn't want the result.
-                        srcRecord.ack();
-                    }
-                }
-                // increment total successfully processed
-                stats.incrTotalProcessedSuccessfully();
+    @SuppressWarnings("rawtypes")
+	private void processResult(Record srcRecord, JavaExecutionResult result) throws Exception {
+    	
+    	if (result.getUserException() != null) {
+    		log.warn("Encountered exception when processing message {}",
+                    srcRecord, result.getUserException());
+            stats.incrUserExceptions(result.getUserException());
+            throw result.getUserException();
+    	} 
+    	
+    	if (result.getSystemException() != null) {
+    		log.warn("Encountered exception when processing message {}",
+                    srcRecord, result.getSystemException());
+            stats.incrSysExceptions(result.getSystemException());
+            throw result.getSystemException();
+    	}
+    	
+    	if (result.getResult() == null) {
+    		if (instanceConfig.getFunctionDetails().getAutoAck()) {
+                // the function doesn't produce any result or the user doesn't want the result.
+                srcRecord.ack();
             }
-        });
+    	} else { 
+    		
+    		try {
+				Object output = (result.getResult() instanceof CompletableFuture) ?
+					((CompletableFuture)result.getResult()).get() : result.getResult();
+				
+				sendOutputMessage(srcRecord, output);
+				
+				if (instanceConfig.getFunctionDetails().getAutoAck()) {
+					srcRecord.ack();
+				}
+				stats.incrTotalProcessedSuccessfully();
+				
+			} catch (InterruptedException | ExecutionException e) {
+				log.warn("Encountered exception when processing message {}",

Review comment:
       please handle InterruptedException

##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
##########
@@ -65,64 +65,92 @@ public JavaInstance(ContextImpl contextImpl, Object userClassObject, InstanceCon
             this.javaUtilFunction = (java.util.function.Function) userClassObject;
         }
     }
-
-    public CompletableFuture<JavaExecutionResult> handleMessage(Record<?> record, Object input) {
-        if (context != null) {
-            context.setCurrentMessageContext(record);
+    
+    /**
+     * Invokes the function code against the given input data.
+     * 
+     * @param input - The input data provided to the Function code
+     * @return An ExecutionResult object that contains the function result along with any user exceptions
+     * that occurred when executing the Function code.
+     */
+    @SuppressWarnings("unchecked")
+	private JavaExecutionResult executeFunction(Object input) {
+    	JavaExecutionResult executionResult = new JavaExecutionResult();
+    	
+        try { 	
+        	Object result = (function != null) ? function.process(input, context) :
+        		javaUtilFunction.apply(input);
+        	
+            executionResult.setResult(result);  
+        } catch (Exception ex) {
+            executionResult.setUserException(ex);
+        } 
+    	
+    	return executionResult;
+    }
+    
+    /**
+     * If the Function code returns a CompletableFuture object, then this method is used to handle that by
+     * adding the async request to the internal pendingAsyncRequests queue, specifying the logic to be executed
+     * once the function execution is complete.
+     * 
+     * @param future
+     * @param executionResult
+     */
+    @SuppressWarnings("unchecked")
+	private void handleAsync(@SuppressWarnings("rawtypes") CompletableFuture future, JavaExecutionResult executionResult) {
+    	try {
+            pendingAsyncRequests.put(future);
+        } catch (InterruptedException ie) {
+            log.warn("Exception while put Async requests", ie);

Review comment:
       Thread.currentThread().interrupt() ?

##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
##########
@@ -269,9 +270,11 @@ public void run() {
 
                 try {
                     processResult(currentRecord, result);
-                } catch (Exception e) {
+                    result.join();
+                } catch (CompletionException e) {
                     log.warn("Failed to process result of message {}", currentRecord, e);
                     currentRecord.fail();
+                    throw e.getCause();

Review comment:
       please also handle InterruptedException as well




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org