You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/09/22 03:52:03 UTC
[flink] branch release-1.11 updated: [FLINK-19244][csv] Fix CSV
format can't deserialize null ROW field
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 1ba6c7e [FLINK-19244][csv] Fix CSV format can't deserialize null ROW field
1ba6c7e is described below
commit 1ba6c7e591d838beaf74e932e919575a78ef9302
Author: yingshin <iz...@163.com>
AuthorDate: Tue Sep 22 11:47:43 2020 +0800
[FLINK-19244][csv] Fix CSV format can't deserialize null ROW field
This closes #13444
---
.../csv/CsvRowDataDeserializationSchema.java | 6 ++-
.../formats/csv/CsvRowDeserializationSchema.java | 6 ++-
.../formats/csv/CsvRowDataSerDeSchemaTest.java | 44 ++++++++++++++++++++++
.../csv/CsvRowDeSerializationSchemaTest.java | 34 +++++++++++++++++
4 files changed, 88 insertions(+), 2 deletions(-)
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
index 8805e98..e238037 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
@@ -241,7 +241,11 @@ public final class CsvRowDataDeserializationSchema implements DeserializationSch
return jsonNode -> {
int nodeSize = jsonNode.size();
- validateArity(arity, nodeSize, ignoreParseErrors);
+ if (nodeSize != 0) {
+ validateArity(arity, nodeSize, ignoreParseErrors);
+ } else {
+ return null;
+ }
GenericRowData row = new GenericRowData(arity);
for (int i = 0; i < arity; i++) {
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
index 684bea7..ab32a98 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
@@ -248,7 +248,11 @@ public final class CsvRowDeserializationSchema implements DeserializationSchema<
return (node) -> {
final int nodeSize = node.size();
- validateArity(rowArity, nodeSize, ignoreParseErrors);
+ if (nodeSize != 0) {
+ validateArity(rowArity, nodeSize, ignoreParseErrors);
+ } else {
+ return null;
+ }
final Row row = new Row(rowArity);
for (int i = 0; i < Math.min(rowArity, nodeSize); i++) {
diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDataSerDeSchemaTest.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDataSerDeSchemaTest.java
index 92b10ef..b6bfa61 100644
--- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDataSerDeSchemaTest.java
+++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDataSerDeSchemaTest.java
@@ -250,6 +250,39 @@ public class CsvRowDataSerDeSchemaTest {
new java.util.Date());
}
+ @Test
+ public void testSerializeDeserializeNestedTypes() throws Exception {
+ DataType subDataType0 = ROW(
+ FIELD("f0c0", STRING()),
+ FIELD("f0c1", INT()),
+ FIELD("f0c2", STRING()));
+ DataType subDataType1 = ROW(
+ FIELD("f1c0", STRING()),
+ FIELD("f1c1", INT()),
+ FIELD("f1c2", STRING()));
+ DataType dataType = ROW(
+ FIELD("f0", subDataType0),
+ FIELD("f1", subDataType1));
+ RowType rowType = (RowType) dataType.getLogicalType();
+
+ // serialization
+ CsvRowDataSerializationSchema.Builder serSchemaBuilder =
+ new CsvRowDataSerializationSchema.Builder(rowType);
+ // deserialization
+ CsvRowDataDeserializationSchema.Builder deserSchemaBuilder =
+ new CsvRowDataDeserializationSchema.Builder(rowType, new RowDataTypeInfo(rowType));
+
+ RowData normalRow = GenericRowData.of(
+ rowData("hello", 1, "This is 1st top column"),
+ rowData("world", 2, "This is 2nd top column"));
+ testSerDeConsistency(normalRow, serSchemaBuilder, deserSchemaBuilder);
+
+ RowData nullRow = GenericRowData.of(
+ null,
+ rowData("world", 2, "This is 2nd top column after null"));
+ testSerDeConsistency(nullRow, serSchemaBuilder, deserSchemaBuilder);
+ }
+
private void testNullableField(DataType fieldType, String string, Object value) throws Exception {
testField(
fieldType,
@@ -331,6 +364,16 @@ public class CsvRowDataSerDeSchemaTest {
.toExternal(deserializedRow);
}
+ private void testSerDeConsistency(
+ RowData originalRow,
+ CsvRowDataSerializationSchema.Builder serSchemaBuilder,
+ CsvRowDataDeserializationSchema.Builder deserSchemaBuilder) throws Exception {
+ RowData deserializedRow = deserialize(
+ deserSchemaBuilder,
+ new String(serialize(serSchemaBuilder, originalRow)));
+ assertEquals(deserializedRow, originalRow);
+ }
+
private static byte[] serialize(CsvRowDataSerializationSchema.Builder serSchemaBuilder, RowData row) throws Exception {
// we serialize and deserialize the schema to test runtime behavior
// when the schema is shipped to the cluster
@@ -352,4 +395,5 @@ public class CsvRowDataSerDeSchemaTest {
private static RowData rowData(String str1, int integer, String str2) {
return GenericRowData.of(fromString(str1), integer, fromString(str2));
}
+
}
diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java
index b449283..c1dd477 100644
--- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java
+++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java
@@ -207,6 +207,30 @@ public class CsvRowDeSerializationSchemaTest {
testNullableField(Types.GENERIC(java.util.Date.class), "FAIL", new java.util.Date());
}
+ @Test
+ public void testSerializeDeserializeNestedTypes() throws Exception {
+ final TypeInformation<Row> subDataType0 = Types.ROW(Types.STRING, Types.INT, Types.STRING);
+ final TypeInformation<Row> subDataType1 = Types.ROW(Types.STRING, Types.INT, Types.STRING);
+ final TypeInformation<Row> rowInfo = Types.ROW(subDataType0, subDataType1);
+
+ // serialization
+ CsvRowSerializationSchema.Builder serSchemaBuilder =
+ new CsvRowSerializationSchema.Builder(rowInfo);
+ // deserialization
+ CsvRowDeserializationSchema.Builder deserSchemaBuilder =
+ new CsvRowDeserializationSchema.Builder(rowInfo);
+
+ Row normalRow = Row.of(
+ Row.of("hello", 1, "This is 1st top column"),
+ Row.of("world", 2, "This is 2nd top column"));
+ testSerDeConsistency(normalRow, serSchemaBuilder, deserSchemaBuilder);
+
+ Row nullRow = Row.of(
+ null,
+ Row.of("world", 2, "This is 2nd top column after null"));
+ testSerDeConsistency(nullRow, serSchemaBuilder, deserSchemaBuilder);
+ }
+
private <T> void testNullableField(TypeInformation<T> fieldInfo, String string, T value) throws Exception {
testField(
fieldInfo,
@@ -269,6 +293,16 @@ public class CsvRowDeSerializationSchemaTest {
return deserialize(deserSchemaBuilder, string);
}
+ private void testSerDeConsistency(
+ Row originalRow,
+ CsvRowSerializationSchema.Builder serSchemaBuilder,
+ CsvRowDeserializationSchema.Builder deserSchemaBuilder) throws Exception {
+ Row deserializedRow = deserialize(
+ deserSchemaBuilder,
+ new String(serialize(serSchemaBuilder, originalRow)));
+ assertEquals(deserializedRow, originalRow);
+ }
+
private static byte[] serialize(CsvRowSerializationSchema.Builder serSchemaBuilder, Row row) throws Exception {
// we serialize and deserialize the schema to test runtime behavior
// when the schema is shipped to the cluster