You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/05/08 08:11:14 UTC
[incubator-inlong] branch master updated: [INLONG-3817][Sort] Fix null point exception in canal-json format (#4125)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 8a1420a6c [INLONG-3817][Sort] Fix null point exception in canal-json format (#4125)
8a1420a6c is described below
commit 8a1420a6c38bff439b4ac67e135b29165e601fc3
Author: yunqingmoswu <44...@users.noreply.github.com>
AuthorDate: Sun May 8 16:11:09 2022 +0800
[INLONG-3817][Sort] Fix null point exception in canal-json format (#4125)
---
.../canal/CanalJsonEnhancedDecodingFormat.java | 83 ++++++++++++----------
.../canal/CanalJsonEnhancedEncodingFormat.java | 39 ++++++++--
2 files changed, 78 insertions(+), 44 deletions(-)
diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDecodingFormat.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDecodingFormat.java
index 7e77e1eb0..dfc40e548 100644
--- a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDecodingFormat.java
+++ b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDecodingFormat.java
@@ -18,13 +18,6 @@
package org.apache.inlong.sort.formats.json.canal;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import javax.annotation.Nullable;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.common.TimestampFormat;
@@ -40,30 +33,35 @@ import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.types.RowKind;
import org.apache.inlong.sort.formats.json.canal.CanalJsonEnhancedDeserializationSchema.MetadataConverter;
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
/**
* {@link DecodingFormat} for Canal using JSON encoding.
* different from flink:1.13.5. This support more metadata.
*/
public class CanalJsonEnhancedDecodingFormat implements DecodingFormat<DeserializationSchema<RowData>> {
- // --------------------------------------------------------------------------------------------
- // Mutable attributes
- // --------------------------------------------------------------------------------------------
-
- private List<String> metadataKeys;
-
// --------------------------------------------------------------------------------------------
// Canal-specific attributes
// --------------------------------------------------------------------------------------------
-
- private final @Nullable String database;
-
- private final @Nullable String table;
-
+ @Nullable
+ private final String database;
+ @Nullable
+ private final String table;
private final boolean ignoreParseErrors;
-
private final TimestampFormat timestampFormat;
+ // --------------------------------------------------------------------------------------------
+ // Mutable attributes
+ // --------------------------------------------------------------------------------------------
+ private List<String> metadataKeys;
+
public CanalJsonEnhancedDecodingFormat(
String database,
String table,
@@ -79,25 +77,18 @@ public class CanalJsonEnhancedDecodingFormat implements DecodingFormat<Deseriali
@Override
public DeserializationSchema<RowData> createRuntimeDecoder(
DynamicTableSource.Context context, DataType physicalDataType) {
- final List<ReadableMetadata> readableMetadata =
- metadataKeys.stream()
- .map(
- k ->
- Stream.of(ReadableMetadata.values())
- .filter(rm -> rm.key.equals(k))
- .findFirst()
- .<IllegalStateException>orElseThrow(IllegalStateException::new))
- .collect(Collectors.toList());
- final List<DataTypes.Field> metadataFields =
- readableMetadata.stream()
- .map(m -> DataTypes.FIELD(m.key, m.dataType))
- .collect(Collectors.toList());
- final DataType producedDataType =
- DataTypeUtils.appendRowFields(physicalDataType, metadataFields);
- final TypeInformation<RowData> producedTypeInfo =
- context.createTypeInformation(producedDataType);
- return CanalJsonEnhancedDeserializationSchema.builder(
- physicalDataType, readableMetadata, producedTypeInfo)
+ final List<ReadableMetadata> readableMetadata = metadataKeys.stream()
+ .map(k -> Stream.of(ReadableMetadata.values())
+ .filter(rm -> rm.key.equals(k))
+ .findFirst()
+ .orElseThrow(IllegalStateException::new))
+ .collect(Collectors.toList());
+ final List<DataTypes.Field> metadataFields = readableMetadata.stream()
+ .map(m -> DataTypes.FIELD(m.key, m.dataType))
+ .collect(Collectors.toList());
+ final DataType producedDataType = DataTypeUtils.appendRowFields(physicalDataType, metadataFields);
+ final TypeInformation<RowData> producedTypeInfo = context.createTypeInformation(producedDataType);
+ return CanalJsonEnhancedDeserializationSchema.builder(physicalDataType, readableMetadata, producedTypeInfo)
.setDatabase(database)
.setTable(table)
.setIgnoreParseErrors(ignoreParseErrors)
@@ -132,7 +123,9 @@ public class CanalJsonEnhancedDecodingFormat implements DecodingFormat<Deseriali
// Metadata handling
// --------------------------------------------------------------------------------------------
- /** List of metadata that can be read with this format. */
+ /**
+ * List of metadata that can be read with this format.
+ */
public enum ReadableMetadata {
DATABASE(
"database",
@@ -143,6 +136,9 @@ public class CanalJsonEnhancedDecodingFormat implements DecodingFormat<Deseriali
@Override
public Object convert(GenericRowData row, int pos) {
+ if (row.isNullAt(pos)) {
+ return null;
+ }
return row.getString(pos);
}
}),
@@ -156,6 +152,9 @@ public class CanalJsonEnhancedDecodingFormat implements DecodingFormat<Deseriali
@Override
public Object convert(GenericRowData row, int pos) {
+ if (row.isNullAt(pos)) {
+ return null;
+ }
return row.getString(pos);
}
}),
@@ -171,6 +170,9 @@ public class CanalJsonEnhancedDecodingFormat implements DecodingFormat<Deseriali
@Override
public Object convert(GenericRowData row, int pos) {
+ if (row.isNullAt(pos)) {
+ return null;
+ }
return row.getMap(pos);
}
}),
@@ -184,6 +186,9 @@ public class CanalJsonEnhancedDecodingFormat implements DecodingFormat<Deseriali
@Override
public Object convert(GenericRowData row, int pos) {
+ if (row.isNullAt(pos)) {
+ return null;
+ }
return row.getArray(pos);
}
}),
diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedEncodingFormat.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedEncodingFormat.java
index a74d9ecf0..2595fe477 100644
--- a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedEncodingFormat.java
+++ b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedEncodingFormat.java
@@ -44,14 +44,10 @@ import java.util.stream.Stream;
*/
public class CanalJsonEnhancedEncodingFormat implements EncodingFormat<SerializationSchema<RowData>> {
- private List<String> metadataKeys;
-
private final TimestampFormat timestampFormat;
-
private final JsonOptions.MapNullKeyMode mapNullKeyMode;
-
private final String mapNullKeyLiteral;
-
+ private List<String> metadataKeys;
private boolean encodeDecimalAsPlainNumber;
public CanalJsonEnhancedEncodingFormat(
@@ -126,6 +122,9 @@ public class CanalJsonEnhancedEncodingFormat implements EncodingFormat<Serializa
@Override
public Object convert(RowData row, int pos) {
+ if (row.isNullAt(pos)) {
+ return null;
+ }
return row.getString(pos);
}
}),
@@ -138,6 +137,9 @@ public class CanalJsonEnhancedEncodingFormat implements EncodingFormat<Serializa
@Override
public Object convert(RowData row, int pos) {
+ if (row.isNullAt(pos)) {
+ return null;
+ }
return row.getString(pos);
}
}),
@@ -152,6 +154,9 @@ public class CanalJsonEnhancedEncodingFormat implements EncodingFormat<Serializa
@Override
public Object convert(RowData row, int pos) {
+ if (row.isNullAt(pos)) {
+ return null;
+ }
return row.getMap(pos);
}
}),
@@ -164,6 +169,9 @@ public class CanalJsonEnhancedEncodingFormat implements EncodingFormat<Serializa
@Override
public Object convert(RowData row, int pos) {
+ if (row.isNullAt(pos)) {
+ return null;
+ }
return row.getArray(pos);
}
}),
@@ -176,6 +184,9 @@ public class CanalJsonEnhancedEncodingFormat implements EncodingFormat<Serializa
@Override
public Object convert(RowData row, int pos) {
+ if (row.isNullAt(pos)) {
+ return null;
+ }
return row.getTimestamp(pos, 3).getMillisecond();
}
}),
@@ -188,6 +199,9 @@ public class CanalJsonEnhancedEncodingFormat implements EncodingFormat<Serializa
@Override
public Object convert(RowData row, int pos) {
+ if (row.isNullAt(pos)) {
+ return null;
+ }
return row.getTimestamp(pos, 3).getMillisecond();
}
}),
@@ -201,6 +215,9 @@ public class CanalJsonEnhancedEncodingFormat implements EncodingFormat<Serializa
@Override
public Object convert(RowData row, int pos) {
+ if (row.isNullAt(pos)) {
+ return null;
+ }
return row.getString(pos);
}
}),
@@ -213,6 +230,9 @@ public class CanalJsonEnhancedEncodingFormat implements EncodingFormat<Serializa
@Override
public Object convert(RowData row, int pos) {
+ if (row.isNullAt(pos)) {
+ return null;
+ }
return row.getBoolean(pos);
}
}),
@@ -226,6 +246,9 @@ public class CanalJsonEnhancedEncodingFormat implements EncodingFormat<Serializa
@Override
public Object convert(RowData row, int pos) {
+ if (row.isNullAt(pos)) {
+ return null;
+ }
return row.getMap(pos);
}
}),
@@ -238,6 +261,9 @@ public class CanalJsonEnhancedEncodingFormat implements EncodingFormat<Serializa
@Override
public Object convert(RowData row, int pos) {
+ if (row.isNullAt(pos)) {
+ return null;
+ }
return row.getLong(pos);
}
}),
@@ -254,6 +280,9 @@ public class CanalJsonEnhancedEncodingFormat implements EncodingFormat<Serializa
@Override
public Object convert(RowData row, int pos) {
+ if (row.isNullAt(pos)) {
+ return null;
+ }
return row.getArray(pos);
}
});