You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/05/13 07:46:04 UTC

[pulsar] 10/46: Always return from trigger even if read from output topic times out

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch 2.7.2_ds_tmp
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b56a6d86f9dd3f728f0e33e8b157060580dd1ef6
Author: Chris Bartholomew <c_...@yahoo.com>
AuthorDate: Sat Nov 23 18:49:49 2019 -0500

    Always return from trigger even if read from output topic times out
    
    (cherry picked from commit d5b70adb742b02f3f3ea3ef31aa956994cbea9be)
---
 .../functions/worker/rest/api/ComponentImpl.java   | 35 +++++++++++-----------
 1 file changed, 18 insertions(+), 17 deletions(-)

diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 3bcea1b..41db4c9 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -958,6 +958,7 @@ public abstract class ComponentImpl {
             log.error("Function in trigger function is not ready @ /{}/{}/{}", tenant, namespace, functionName);
             throw new RestException(Status.BAD_REQUEST, "Function in trigger function is not ready");
         }
+
         String outputTopic = functionMetaData.getFunctionDetails().getSink().getTopic();
         Reader<byte[]> reader = null;
         Producer<byte[]> producer = null;
@@ -986,25 +987,22 @@ public abstract class ComponentImpl {
             if (reader == null) {
                 return null;
             }
-            long curTime = System.currentTimeMillis();
-            long maxTime = curTime + 1000;
-            while (curTime < maxTime) {
-                Message msg = reader.readNext(10000, TimeUnit.MILLISECONDS);
-                if (msg == null)
-                    break;
-                if (msg.getProperties().containsKey("__pfn_input_msg_id__")
-                        && msg.getProperties().containsKey("__pfn_input_topic__")) {
-                    MessageId newMsgId = MessageId.fromByteArray(
-                            Base64.getDecoder().decode((String) msg.getProperties().get("__pfn_input_msg_id__")));
-
-                    if (msgId.equals(newMsgId)
-                            && msg.getProperties().get("__pfn_input_topic__").equals(TopicName.get(inputTopicToWrite).toString())) {
-                       return new String(msg.getData());
-                    }
+
+            Message msg = reader.readNext(2500, TimeUnit.MILLISECONDS);
+
+            if (msg == null) {
+                return new String("No Message On Output Topic");
+            }
+
+            if (msg.getProperties().containsKey("__pfn_input_msg_id__")
+                    && msg.getProperties().containsKey("__pfn_input_topic__")) {
+                MessageId newMsgId = MessageId.fromByteArray(
+                        Base64.getDecoder().decode((String) msg.getProperties().get("__pfn_input_msg_id__")));
+                if (msgId.equals(newMsgId)
+                        && msg.getProperties().get("__pfn_input_topic__").equals(TopicName.get(inputTopicToWrite).toString())) {
+                    return new String(msg.getData());
                 }
-                curTime = System.currentTimeMillis();
             }
-            throw new RestException(Status.REQUEST_TIMEOUT, "Request Timed Out");
         } catch (SchemaSerializationException e) {
             throw new RestException(Status.BAD_REQUEST, String.format("Failed to serialize input with error: %s. Please check if input data conforms with the schema of the input topic.", e.getMessage()));
         } catch (IOException e) {
@@ -1017,6 +1015,9 @@ public abstract class ComponentImpl {
                 producer.closeAsync();
             }
         }
+
+        return null;
+
     }
 
     public FunctionState getFunctionState(final String tenant,