You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/05/08 08:11:14 UTC

[incubator-inlong] branch master updated: [INLONG-3817][Sort] Fix null point exception in canal-json format (#4125)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a1420a6c [INLONG-3817][Sort] Fix null point exception in canal-json format (#4125)
8a1420a6c is described below

commit 8a1420a6c38bff439b4ac67e135b29165e601fc3
Author: yunqingmoswu <44...@users.noreply.github.com>
AuthorDate: Sun May 8 16:11:09 2022 +0800

    [INLONG-3817][Sort] Fix null point exception in canal-json format (#4125)
---
 .../canal/CanalJsonEnhancedDecodingFormat.java     | 83 ++++++++++++----------
 .../canal/CanalJsonEnhancedEncodingFormat.java     | 39 ++++++++--
 2 files changed, 78 insertions(+), 44 deletions(-)

diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDecodingFormat.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDecodingFormat.java
index 7e77e1eb0..dfc40e548 100644
--- a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDecodingFormat.java
+++ b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDecodingFormat.java
@@ -18,13 +18,6 @@
 
 package org.apache.inlong.sort.formats.json.canal;
 
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import javax.annotation.Nullable;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.formats.common.TimestampFormat;
@@ -40,30 +33,35 @@ import org.apache.flink.table.types.utils.DataTypeUtils;
 import org.apache.flink.types.RowKind;
 import org.apache.inlong.sort.formats.json.canal.CanalJsonEnhancedDeserializationSchema.MetadataConverter;
 
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
 /**
  * {@link DecodingFormat} for Canal using JSON encoding.
  * different from flink:1.13.5. This support more metadata.
  */
 public class CanalJsonEnhancedDecodingFormat implements DecodingFormat<DeserializationSchema<RowData>> {
 
-    // --------------------------------------------------------------------------------------------
-    // Mutable attributes
-    // --------------------------------------------------------------------------------------------
-
-    private List<String> metadataKeys;
-
     // --------------------------------------------------------------------------------------------
     // Canal-specific attributes
     // --------------------------------------------------------------------------------------------
-
-    private final @Nullable String database;
-
-    private final @Nullable String table;
-
+    @Nullable
+    private final String database;
+    @Nullable
+    private final String table;
     private final boolean ignoreParseErrors;
-
     private final TimestampFormat timestampFormat;
 
+    // --------------------------------------------------------------------------------------------
+    // Mutable attributes
+    // --------------------------------------------------------------------------------------------
+    private List<String> metadataKeys;
+
     public CanalJsonEnhancedDecodingFormat(
             String database,
             String table,
@@ -79,25 +77,18 @@ public class CanalJsonEnhancedDecodingFormat implements DecodingFormat<Deseriali
     @Override
     public DeserializationSchema<RowData> createRuntimeDecoder(
             DynamicTableSource.Context context, DataType physicalDataType) {
-        final List<ReadableMetadata> readableMetadata =
-                metadataKeys.stream()
-                        .map(
-                                k ->
-                                        Stream.of(ReadableMetadata.values())
-                                                .filter(rm -> rm.key.equals(k))
-                                                .findFirst()
-                                                .<IllegalStateException>orElseThrow(IllegalStateException::new))
-                        .collect(Collectors.toList());
-        final List<DataTypes.Field> metadataFields =
-                readableMetadata.stream()
-                        .map(m -> DataTypes.FIELD(m.key, m.dataType))
-                        .collect(Collectors.toList());
-        final DataType producedDataType =
-                DataTypeUtils.appendRowFields(physicalDataType, metadataFields);
-        final TypeInformation<RowData> producedTypeInfo =
-                context.createTypeInformation(producedDataType);
-        return CanalJsonEnhancedDeserializationSchema.builder(
-                        physicalDataType, readableMetadata, producedTypeInfo)
+        final List<ReadableMetadata> readableMetadata = metadataKeys.stream()
+                .map(k -> Stream.of(ReadableMetadata.values())
+                        .filter(rm -> rm.key.equals(k))
+                        .findFirst()
+                        .orElseThrow(IllegalStateException::new))
+                .collect(Collectors.toList());
+        final List<DataTypes.Field> metadataFields = readableMetadata.stream()
+                .map(m -> DataTypes.FIELD(m.key, m.dataType))
+                .collect(Collectors.toList());
+        final DataType producedDataType = DataTypeUtils.appendRowFields(physicalDataType, metadataFields);
+        final TypeInformation<RowData> producedTypeInfo = context.createTypeInformation(producedDataType);
+        return CanalJsonEnhancedDeserializationSchema.builder(physicalDataType, readableMetadata, producedTypeInfo)
                 .setDatabase(database)
                 .setTable(table)
                 .setIgnoreParseErrors(ignoreParseErrors)
@@ -132,7 +123,9 @@ public class CanalJsonEnhancedDecodingFormat implements DecodingFormat<Deseriali
     // Metadata handling
     // --------------------------------------------------------------------------------------------
 
-    /** List of metadata that can be read with this format. */
+    /**
+     * List of metadata that can be read with this format.
+     */
     public enum ReadableMetadata {
         DATABASE(
                 "database",
@@ -143,6 +136,9 @@ public class CanalJsonEnhancedDecodingFormat implements DecodingFormat<Deseriali
 
                     @Override
                     public Object convert(GenericRowData row, int pos) {
+                        if (row.isNullAt(pos)) {
+                            return null;
+                        }
                         return row.getString(pos);
                     }
                 }),
@@ -156,6 +152,9 @@ public class CanalJsonEnhancedDecodingFormat implements DecodingFormat<Deseriali
 
                     @Override
                     public Object convert(GenericRowData row, int pos) {
+                        if (row.isNullAt(pos)) {
+                            return null;
+                        }
                         return row.getString(pos);
                     }
                 }),
@@ -171,6 +170,9 @@ public class CanalJsonEnhancedDecodingFormat implements DecodingFormat<Deseriali
 
                     @Override
                     public Object convert(GenericRowData row, int pos) {
+                        if (row.isNullAt(pos)) {
+                            return null;
+                        }
                         return row.getMap(pos);
                     }
                 }),
@@ -184,6 +186,9 @@ public class CanalJsonEnhancedDecodingFormat implements DecodingFormat<Deseriali
 
                     @Override
                     public Object convert(GenericRowData row, int pos) {
+                        if (row.isNullAt(pos)) {
+                            return null;
+                        }
                         return row.getArray(pos);
                     }
                 }),
diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedEncodingFormat.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedEncodingFormat.java
index a74d9ecf0..2595fe477 100644
--- a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedEncodingFormat.java
+++ b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedEncodingFormat.java
@@ -44,14 +44,10 @@ import java.util.stream.Stream;
  */
 public class CanalJsonEnhancedEncodingFormat implements EncodingFormat<SerializationSchema<RowData>> {
 
-    private List<String> metadataKeys;
-
     private final TimestampFormat timestampFormat;
-
     private final JsonOptions.MapNullKeyMode mapNullKeyMode;
-
     private final String mapNullKeyLiteral;
-
+    private List<String> metadataKeys;
     private boolean encodeDecimalAsPlainNumber;
 
     public CanalJsonEnhancedEncodingFormat(
@@ -126,6 +122,9 @@ public class CanalJsonEnhancedEncodingFormat implements EncodingFormat<Serializa
 
                     @Override
                     public Object convert(RowData row, int pos) {
+                        if (row.isNullAt(pos)) {
+                            return null;
+                        }
                         return row.getString(pos);
                     }
                 }),
@@ -138,6 +137,9 @@ public class CanalJsonEnhancedEncodingFormat implements EncodingFormat<Serializa
 
                     @Override
                     public Object convert(RowData row, int pos) {
+                        if (row.isNullAt(pos)) {
+                            return null;
+                        }
                         return row.getString(pos);
                     }
                 }),
@@ -152,6 +154,9 @@ public class CanalJsonEnhancedEncodingFormat implements EncodingFormat<Serializa
 
                     @Override
                     public Object convert(RowData row, int pos) {
+                        if (row.isNullAt(pos)) {
+                            return null;
+                        }
                         return row.getMap(pos);
                     }
                 }),
@@ -164,6 +169,9 @@ public class CanalJsonEnhancedEncodingFormat implements EncodingFormat<Serializa
 
                     @Override
                     public Object convert(RowData row, int pos) {
+                        if (row.isNullAt(pos)) {
+                            return null;
+                        }
                         return row.getArray(pos);
                     }
                 }),
@@ -176,6 +184,9 @@ public class CanalJsonEnhancedEncodingFormat implements EncodingFormat<Serializa
 
                     @Override
                     public Object convert(RowData row, int pos) {
+                        if (row.isNullAt(pos)) {
+                            return null;
+                        }
                         return row.getTimestamp(pos, 3).getMillisecond();
                     }
                 }),
@@ -188,6 +199,9 @@ public class CanalJsonEnhancedEncodingFormat implements EncodingFormat<Serializa
 
                     @Override
                     public Object convert(RowData row, int pos) {
+                        if (row.isNullAt(pos)) {
+                            return null;
+                        }
                         return row.getTimestamp(pos, 3).getMillisecond();
                     }
                 }),
@@ -201,6 +215,9 @@ public class CanalJsonEnhancedEncodingFormat implements EncodingFormat<Serializa
 
                     @Override
                     public Object convert(RowData row, int pos) {
+                        if (row.isNullAt(pos)) {
+                            return null;
+                        }
                         return row.getString(pos);
                     }
                 }),
@@ -213,6 +230,9 @@ public class CanalJsonEnhancedEncodingFormat implements EncodingFormat<Serializa
 
                     @Override
                     public Object convert(RowData row, int pos) {
+                        if (row.isNullAt(pos)) {
+                            return null;
+                        }
                         return row.getBoolean(pos);
                     }
                 }),
@@ -226,6 +246,9 @@ public class CanalJsonEnhancedEncodingFormat implements EncodingFormat<Serializa
 
                     @Override
                     public Object convert(RowData row, int pos) {
+                        if (row.isNullAt(pos)) {
+                            return null;
+                        }
                         return row.getMap(pos);
                     }
                 }),
@@ -238,6 +261,9 @@ public class CanalJsonEnhancedEncodingFormat implements EncodingFormat<Serializa
 
                     @Override
                     public Object convert(RowData row, int pos) {
+                        if (row.isNullAt(pos)) {
+                            return null;
+                        }
                         return row.getLong(pos);
                     }
                 }),
@@ -254,6 +280,9 @@ public class CanalJsonEnhancedEncodingFormat implements EncodingFormat<Serializa
 
                     @Override
                     public Object convert(RowData row, int pos) {
+                        if (row.isNullAt(pos)) {
+                            return null;
+                        }
                         return row.getArray(pos);
                     }
                 });