You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/09/22 03:52:03 UTC

[flink] branch release-1.11 updated: [FLINK-19244][csv] Fix CSV format can't deserialize null ROW field

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 1ba6c7e  [FLINK-19244][csv] Fix CSV format can't deserialize null ROW field
1ba6c7e is described below

commit 1ba6c7e591d838beaf74e932e919575a78ef9302
Author: yingshin <iz...@163.com>
AuthorDate: Tue Sep 22 11:47:43 2020 +0800

    [FLINK-19244][csv] Fix CSV format can't deserialize null ROW field
    
    This closes #13444
---
 .../csv/CsvRowDataDeserializationSchema.java       |  6 ++-
 .../formats/csv/CsvRowDeserializationSchema.java   |  6 ++-
 .../formats/csv/CsvRowDataSerDeSchemaTest.java     | 44 ++++++++++++++++++++++
 .../csv/CsvRowDeSerializationSchemaTest.java       | 34 +++++++++++++++++
 4 files changed, 88 insertions(+), 2 deletions(-)

diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
index 8805e98..e238037 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
@@ -241,7 +241,11 @@ public final class CsvRowDataDeserializationSchema implements DeserializationSch
 		return jsonNode -> {
 			int nodeSize = jsonNode.size();
 
-			validateArity(arity, nodeSize, ignoreParseErrors);
+			if (nodeSize != 0) {
+				validateArity(arity, nodeSize, ignoreParseErrors);
+			} else {
+				return null;
+			}
 
 			GenericRowData row = new GenericRowData(arity);
 			for (int i = 0; i < arity; i++) {
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
index 684bea7..ab32a98 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
@@ -248,7 +248,11 @@ public final class CsvRowDeserializationSchema implements DeserializationSchema<
 		return (node) -> {
 			final int nodeSize = node.size();
 
-			validateArity(rowArity, nodeSize, ignoreParseErrors);
+			if (nodeSize != 0) {
+				validateArity(rowArity, nodeSize, ignoreParseErrors);
+			} else {
+				return null;
+			}
 
 			final Row row = new Row(rowArity);
 			for (int i = 0; i < Math.min(rowArity, nodeSize); i++) {
diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDataSerDeSchemaTest.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDataSerDeSchemaTest.java
index 92b10ef..b6bfa61 100644
--- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDataSerDeSchemaTest.java
+++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDataSerDeSchemaTest.java
@@ -250,6 +250,39 @@ public class CsvRowDataSerDeSchemaTest {
 			new java.util.Date());
 	}
 
+	@Test
+	public void testSerializeDeserializeNestedTypes() throws Exception {
+		DataType subDataType0 = ROW(
+			FIELD("f0c0", STRING()),
+			FIELD("f0c1", INT()),
+			FIELD("f0c2", STRING()));
+		DataType subDataType1 = ROW(
+			FIELD("f1c0", STRING()),
+			FIELD("f1c1", INT()),
+			FIELD("f1c2", STRING()));
+		DataType dataType = ROW(
+			FIELD("f0", subDataType0),
+			FIELD("f1", subDataType1));
+		RowType rowType = (RowType) dataType.getLogicalType();
+
+		// serialization
+		CsvRowDataSerializationSchema.Builder serSchemaBuilder =
+			new CsvRowDataSerializationSchema.Builder(rowType);
+		// deserialization
+		CsvRowDataDeserializationSchema.Builder deserSchemaBuilder =
+			new CsvRowDataDeserializationSchema.Builder(rowType, new RowDataTypeInfo(rowType));
+
+		RowData normalRow = GenericRowData.of(
+			rowData("hello", 1, "This is 1st top column"),
+			rowData("world", 2, "This is 2nd top column"));
+		testSerDeConsistency(normalRow, serSchemaBuilder, deserSchemaBuilder);
+
+		RowData nullRow = GenericRowData.of(
+			null,
+			rowData("world", 2, "This is 2nd top column after null"));
+		testSerDeConsistency(nullRow, serSchemaBuilder, deserSchemaBuilder);
+	}
+
 	private void testNullableField(DataType fieldType, String string, Object value) throws Exception {
 		testField(
 			fieldType,
@@ -331,6 +364,16 @@ public class CsvRowDataSerDeSchemaTest {
 			.toExternal(deserializedRow);
 	}
 
+	private void testSerDeConsistency(
+			RowData originalRow,
+			CsvRowDataSerializationSchema.Builder serSchemaBuilder,
+			CsvRowDataDeserializationSchema.Builder deserSchemaBuilder) throws Exception {
+		RowData deserializedRow = deserialize(
+			deserSchemaBuilder,
+			new String(serialize(serSchemaBuilder, originalRow)));
+		assertEquals(deserializedRow, originalRow);
+	}
+
 	private static byte[] serialize(CsvRowDataSerializationSchema.Builder serSchemaBuilder, RowData row) throws Exception {
 		// we serialize and deserialize the schema to test runtime behavior
 		// when the schema is shipped to the cluster
@@ -352,4 +395,5 @@ public class CsvRowDataSerDeSchemaTest {
 	private static RowData rowData(String str1, int integer, String str2) {
 		return GenericRowData.of(fromString(str1), integer, fromString(str2));
 	}
+
 }
diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java
index b449283..c1dd477 100644
--- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java
+++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java
@@ -207,6 +207,30 @@ public class CsvRowDeSerializationSchemaTest {
 		testNullableField(Types.GENERIC(java.util.Date.class), "FAIL", new java.util.Date());
 	}
 
+	@Test
+	public void testSerializeDeserializeNestedTypes() throws Exception {
+		final TypeInformation<Row> subDataType0 = Types.ROW(Types.STRING, Types.INT, Types.STRING);
+		final TypeInformation<Row> subDataType1 = Types.ROW(Types.STRING, Types.INT, Types.STRING);
+		final TypeInformation<Row> rowInfo = Types.ROW(subDataType0, subDataType1);
+
+		// serialization
+		CsvRowSerializationSchema.Builder serSchemaBuilder =
+			new CsvRowSerializationSchema.Builder(rowInfo);
+		// deserialization
+		CsvRowDeserializationSchema.Builder deserSchemaBuilder =
+			new CsvRowDeserializationSchema.Builder(rowInfo);
+
+		Row normalRow = Row.of(
+			Row.of("hello", 1, "This is 1st top column"),
+			Row.of("world", 2, "This is 2nd top column"));
+		testSerDeConsistency(normalRow, serSchemaBuilder, deserSchemaBuilder);
+
+		Row nullRow = Row.of(
+			null,
+			Row.of("world", 2, "This is 2nd top column after null"));
+		testSerDeConsistency(nullRow, serSchemaBuilder, deserSchemaBuilder);
+	}
+
 	private <T> void testNullableField(TypeInformation<T> fieldInfo, String string, T value) throws Exception {
 		testField(
 			fieldInfo,
@@ -269,6 +293,16 @@ public class CsvRowDeSerializationSchemaTest {
 		return deserialize(deserSchemaBuilder, string);
 	}
 
+	private void testSerDeConsistency(
+			Row originalRow,
+			CsvRowSerializationSchema.Builder serSchemaBuilder,
+			CsvRowDeserializationSchema.Builder deserSchemaBuilder) throws Exception {
+		Row deserializedRow = deserialize(
+			deserSchemaBuilder,
+			new String(serialize(serSchemaBuilder, originalRow)));
+		assertEquals(deserializedRow, originalRow);
+	}
+
 	private static byte[] serialize(CsvRowSerializationSchema.Builder serSchemaBuilder, Row row) throws Exception {
 		// we serialize and deserialize the schema to test runtime behavior
 		// when the schema is shipped to the cluster