You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/19 00:49:18 UTC

[pulsar] 03/26: Improve error handling for triggering function when input data does not conform with input topic schema (#3995)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit dfd4cc2a9087314798b3152c8681708c2426115c
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Sun Apr 7 23:06:07 2019 -0700

    Improve error handling for triggering function when input data does not conform with input topic schema (#3995)
---
 .../org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java     | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index ecc5180..d844972 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -84,6 +84,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.FunctionState;
 import org.apache.pulsar.common.functions.WorkerInfo;
@@ -1136,6 +1137,8 @@ public abstract class ComponentImpl {
                 curTime = System.currentTimeMillis();
             }
             throw new RestException(Status.REQUEST_TIMEOUT, "Request Timed Out");
+        } catch (SchemaSerializationException e) {
+            throw new RestException(Status.BAD_REQUEST, String.format("Failed to serialize input with error: %s. Please check if input data conforms with the schema of the input topic.", e.getMessage()));
         } catch (IOException e) {
             throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
         } finally {