You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/05/13 07:46:04 UTC
[pulsar] 10/46: Always return from trigger even if read from output
topic times out
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch 2.7.2_ds_tmp
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b56a6d86f9dd3f728f0e33e8b157060580dd1ef6
Author: Chris Bartholomew <c_...@yahoo.com>
AuthorDate: Sat Nov 23 18:49:49 2019 -0500
Always return from trigger even if read from output topic times out
(cherry picked from commit d5b70adb742b02f3f3ea3ef31aa956994cbea9be)
---
.../functions/worker/rest/api/ComponentImpl.java | 35 +++++++++++-----------
1 file changed, 18 insertions(+), 17 deletions(-)
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 3bcea1b..41db4c9 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -958,6 +958,7 @@ public abstract class ComponentImpl {
log.error("Function in trigger function is not ready @ /{}/{}/{}", tenant, namespace, functionName);
throw new RestException(Status.BAD_REQUEST, "Function in trigger function is not ready");
}
+
String outputTopic = functionMetaData.getFunctionDetails().getSink().getTopic();
Reader<byte[]> reader = null;
Producer<byte[]> producer = null;
@@ -986,25 +987,22 @@ public abstract class ComponentImpl {
if (reader == null) {
return null;
}
- long curTime = System.currentTimeMillis();
- long maxTime = curTime + 1000;
- while (curTime < maxTime) {
- Message msg = reader.readNext(10000, TimeUnit.MILLISECONDS);
- if (msg == null)
- break;
- if (msg.getProperties().containsKey("__pfn_input_msg_id__")
- && msg.getProperties().containsKey("__pfn_input_topic__")) {
- MessageId newMsgId = MessageId.fromByteArray(
- Base64.getDecoder().decode((String) msg.getProperties().get("__pfn_input_msg_id__")));
-
- if (msgId.equals(newMsgId)
- && msg.getProperties().get("__pfn_input_topic__").equals(TopicName.get(inputTopicToWrite).toString())) {
- return new String(msg.getData());
- }
+
+ Message msg = reader.readNext(2500, TimeUnit.MILLISECONDS);
+
+ if (msg == null) {
+ return new String("No Message On Output Topic");
+ }
+
+ if (msg.getProperties().containsKey("__pfn_input_msg_id__")
+ && msg.getProperties().containsKey("__pfn_input_topic__")) {
+ MessageId newMsgId = MessageId.fromByteArray(
+ Base64.getDecoder().decode((String) msg.getProperties().get("__pfn_input_msg_id__")));
+ if (msgId.equals(newMsgId)
+ && msg.getProperties().get("__pfn_input_topic__").equals(TopicName.get(inputTopicToWrite).toString())) {
+ return new String(msg.getData());
}
- curTime = System.currentTimeMillis();
}
- throw new RestException(Status.REQUEST_TIMEOUT, "Request Timed Out");
} catch (SchemaSerializationException e) {
throw new RestException(Status.BAD_REQUEST, String.format("Failed to serialize input with error: %s. Please check if input data conforms with the schema of the input topic.", e.getMessage()));
} catch (IOException e) {
@@ -1017,6 +1015,9 @@ public abstract class ComponentImpl {
producer.closeAsync();
}
}
+
+ return null;
+
}
public FunctionState getFunctionState(final String tenant,