You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/12/19 11:20:38 UTC

[GitHub] sijie closed pull request #3210: Provide a flag to ignore Json format error in pulsar flink connector

sijie closed pull request #3210: Provide a flag to ignore Json format error in pulsar flink connector
URL: https://github.com/apache/pulsar/pull/3210
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/JsonRowDeserializationSchema.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/JsonRowDeserializationSchema.java
index dfc89b9be8..0235e61ebd 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/JsonRowDeserializationSchema.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/JsonRowDeserializationSchema.java
@@ -38,6 +38,30 @@
  */
 public class JsonRowDeserializationSchema implements DeserializationSchema<Row> {
 
+    /*
+        What to do when detecting that a json line cannot be deserialized :
+        (1).false : Throw A IOException and Terminate application。
+        (2).true  : Ignore the error line and add a null line。
+     */
+    private boolean ignoreJsonFormatError = false;
+
+
+    /**
+     *
+     * @return true or false
+     */
+    public boolean getIgnoreJsonFormatError() {
+        return ignoreJsonFormatError;
+    }
+
+    /**
+     * set ignoreJsonFormatError
+     * @param ignoreJsonFormatError
+     */
+    public void setIgnoreJsonFormatError(boolean ignoreJsonFormatError) {
+        this.ignoreJsonFormatError = ignoreJsonFormatError;
+    }
+
     /**
      * Type information describing the result type.
      */
@@ -102,7 +126,13 @@ public Row deserialize(byte[] message) throws IOException {
 
             return row;
         } catch (Throwable t) {
-            throw new IOException("Failed to deserialize JSON object.", t);
+            if (ignoreJsonFormatError) {
+                final int arity = typeInfo.getArity();
+                final Object[] nullsArray = new Object[arity];
+                return Row.of(nullsArray);
+            } else {
+                throw new IOException("Failed to deserialize JSON object.", t);
+            }
         }
     }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services