You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/12/22 03:54:26 UTC

[GitHub] [flink] wuchong commented on a change in pull request #14444: [FLINK-20091][avro] add ignore-parse-error for

wuchong commented on a change in pull request #14444:
URL: https://github.com/apache/flink/pull/14444#discussion_r547049153



##########
File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataDeserializationSchema.java
##########
@@ -85,9 +88,27 @@ public AvroRowDataDeserializationSchema(
 			DeserializationSchema<GenericRecord> nestedSchema,
 			AvroToRowDataConverters.AvroToRowDataConverter runtimeConverter,
 			TypeInformation<RowData> typeInfo) {
+		this(nestedSchema, runtimeConverter, typeInfo, false);
+	}
+
+	/**
+	 * Creates a Avro deserialization schema for the given logical type.
+	 *
+	 * @param nestedSchema     Deserialization schema to deserialize as {@link GenericRecord}
+	 * @param runtimeConverter Converter that transforms a {@link GenericRecord} into {@link RowData}
+	 * @param typeInfo         The TypeInformation to be used by
+	 *                         {@link AvroRowDataDeserializationSchema#getProducedType()}
+	 * @param ignoreParseError Indicate whether to skip rows with parsing error
+	 */
+	public AvroRowDataDeserializationSchema(

Review comment:
       I would suggest to refactor this class into builder pattern. It's hard to maintain to introduce a new constructor when adding a new parameter.

##########
File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataDeserializationSchema.java
##########
@@ -59,18 +59,21 @@
 	 */
 	private final AvroToRowDataConverters.AvroToRowDataConverter runtimeConverter;
 
+	private final Boolean ignoreParseError;

Review comment:
       Use primitive type `boolean` otherwise, it maybe `null`. 

##########
File path: flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java
##########
@@ -205,4 +207,26 @@ public void testSpecificType() throws Exception {
 		Assert.assertEquals("12:12:12", DataFormatConverters.LocalTimeConverter.INSTANCE.toExternal(
 				rowData.getInt(2)).toString());
 	}
+
+	@Test
+	public void testIgnoreParseErrors() throws Exception {
+		DataType serDataType = ROW(FIELD("type_int", INT().notNull())).notNull();
+		RowType serRowType = (RowType) serDataType.getLogicalType();
+		AvroRowDataSerializationSchema serializationSchema = new AvroRowDataSerializationSchema(serRowType);
+		serializationSchema.open(null);
+		RowData input = GenericRowData.of(1);
+		byte[] byteData = serializationSchema.serialize(input);
+
+		DataType deserDataType = ROW(FIELD("type_int", DOUBLE().notNull())).notNull();
+		RowType deserRowType = (RowType) deserDataType.getLogicalType();
+		AvroRowDataDeserializationSchema deserializationSchema = new AvroRowDataDeserializationSchema(
+				AvroDeserializationSchema.forGeneric(AvroSchemaConverter.convertToSchema(deserRowType)),
+				AvroToRowDataConverters.createRowConverter(deserRowType),
+				InternalTypeInfo.of(deserRowType),
+				true);
+
+		deserializationSchema.open(null);
+		RowData actual = deserializationSchema.deserialize(byteData);
+		assertThat(actual, org.hamcrest.core.IsNull.nullValue(RowData.class));

Review comment:
       Why not use `assertNull(actual)` here?

##########
File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataDeserializationSchema.java
##########
@@ -85,9 +88,27 @@ public AvroRowDataDeserializationSchema(
 			DeserializationSchema<GenericRecord> nestedSchema,
 			AvroToRowDataConverters.AvroToRowDataConverter runtimeConverter,
 			TypeInformation<RowData> typeInfo) {
+		this(nestedSchema, runtimeConverter, typeInfo, false);
+	}
+
+	/**
+	 * Creates a Avro deserialization schema for the given logical type.
+	 *
+	 * @param nestedSchema     Deserialization schema to deserialize as {@link GenericRecord}
+	 * @param runtimeConverter Converter that transforms a {@link GenericRecord} into {@link RowData}
+	 * @param typeInfo         The TypeInformation to be used by
+	 *                         {@link AvroRowDataDeserializationSchema#getProducedType()}
+	 * @param ignoreParseError Indicate whether to skip rows with parsing error
+	 */
+	public AvroRowDataDeserializationSchema(

Review comment:
       Besides, this class should be `@Internal` and not `@PublicEvolving`. 

##########
File path: flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java
##########
@@ -205,4 +207,26 @@ public void testSpecificType() throws Exception {
 		Assert.assertEquals("12:12:12", DataFormatConverters.LocalTimeConverter.INSTANCE.toExternal(
 				rowData.getInt(2)).toString());
 	}
+
+	@Test
+	public void testIgnoreParseErrors() throws Exception {
+		DataType serDataType = ROW(FIELD("type_int", INT().notNull())).notNull();
+		RowType serRowType = (RowType) serDataType.getLogicalType();
+		AvroRowDataSerializationSchema serializationSchema = new AvroRowDataSerializationSchema(serRowType);
+		serializationSchema.open(null);
+		RowData input = GenericRowData.of(1);
+		byte[] byteData = serializationSchema.serialize(input);

Review comment:
       Would be better to using numberic type to deseriazing string type to verify the parse error. For example:
   deserializing `[int: 1, string: abc]` with schema `(int, long)` should get `Row(1, null)`.
   
   I think deserializing null field is not a strong enough test. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org