You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/06/08 02:05:01 UTC

[flink] 01/03: [FLINK-18073][avro] Fix AvroRowDataSerializationSchema is not serializable

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8f858f398de3c47fe5045fbe0f1497022bfb1c15
Author: Jark Wu <ja...@apache.org>
AuthorDate: Thu Jun 4 20:01:47 2020 +0800

    [FLINK-18073][avro] Fix AvroRowDataSerializationSchema is not serializable
    
    This closes #12471
---
 .../formats/avro/AvroFileSystemFormatFactory.java  |  3 +-
 .../avro/AvroRowDataSerializationSchema.java       | 92 +++++++++++++++++-----
 .../avro/typeutils/AvroSchemaConverter.java        | 23 +++---
 .../avro/typeutils/AvroSchemaConverterTest.java    | 42 ++++++++++
 4 files changed, 127 insertions(+), 33 deletions(-)

diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFileSystemFormatFactory.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFileSystemFormatFactory.java
index a033739..c60c42c 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFileSystemFormatFactory.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFileSystemFormatFactory.java
@@ -243,11 +243,12 @@ public class AvroFileSystemFormatFactory implements FileSystemFormatFactory {
 			BulkWriter<GenericRecord> writer = factory.create(out);
 			AvroRowDataSerializationSchema.SerializationRuntimeConverter converter =
 					AvroRowDataSerializationSchema.createRowConverter(rowType);
+			Schema schema = AvroSchemaConverter.convertToSchema(rowType);
 			return new BulkWriter<RowData>() {
 
 				@Override
 				public void addElement(RowData element) throws IOException {
-					GenericRecord record = (GenericRecord) converter.convert(element);
+					GenericRecord record = (GenericRecord) converter.convert(schema, element);
 					writer.addElement(record);
 				}
 
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataSerializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataSerializationSchema.java
index 00b7ac5..5b1fbbe 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataSerializationSchema.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataSerializationSchema.java
@@ -75,6 +75,11 @@ public class AvroRowDataSerializationSchema implements SerializationSchema<RowDa
 	private final SerializationRuntimeConverter runtimeConverter;
 
 	/**
+	 * Avro serialization schema.
+	 */
+	private transient Schema schema;
+
+	/**
 	 * Writer to serialize Avro record into a Avro bytes.
 	 */
 	private transient DatumWriter<IndexedRecord> datumWriter;
@@ -99,7 +104,7 @@ public class AvroRowDataSerializationSchema implements SerializationSchema<RowDa
 
 	@Override
 	public void open(InitializationContext context) throws Exception {
-		final Schema schema = AvroSchemaConverter.convertToSchema(rowType);
+		this.schema = AvroSchemaConverter.convertToSchema(rowType);
 		datumWriter = new SpecificDatumWriter<>(schema);
 		arrayOutputStream = new ByteArrayOutputStream();
 		encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
@@ -109,7 +114,7 @@ public class AvroRowDataSerializationSchema implements SerializationSchema<RowDa
 	public byte[] serialize(RowData row) {
 		try {
 			// convert to record
-			final GenericRecord record = (GenericRecord) runtimeConverter.convert(row);
+			final GenericRecord record = (GenericRecord) runtimeConverter.convert(schema, row);
 			arrayOutputStream.reset();
 			datumWriter.write(record, encoder);
 			encoder.flush();
@@ -145,33 +150,43 @@ public class AvroRowDataSerializationSchema implements SerializationSchema<RowDa
 	 * to corresponding Avro data structures.
 	 */
 	interface SerializationRuntimeConverter extends Serializable {
-		Object convert(Object object);
+		Object convert(Schema schema, Object object);
 	}
 
 	static SerializationRuntimeConverter createRowConverter(RowType rowType) {
 		final SerializationRuntimeConverter[] fieldConverters = rowType.getChildren().stream()
 			.map(AvroRowDataSerializationSchema::createConverter)
 			.toArray(SerializationRuntimeConverter[]::new);
-		final Schema schema = AvroSchemaConverter.convertToSchema(rowType);
 		final LogicalType[] fieldTypes = rowType.getFields().stream()
 			.map(RowType.RowField::getType)
 			.toArray(LogicalType[]::new);
+		final RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[fieldTypes.length];
+		for (int i = 0; i < fieldTypes.length; i++) {
+			fieldGetters[i] = RowData.createFieldGetter(fieldTypes[i], i);
+		}
 		final int length = rowType.getFieldCount();
 
-		return object -> {
+		return (schema, object) -> {
 			final RowData row = (RowData) object;
+			final List<Schema.Field> fields = schema.getFields();
 			final GenericRecord record = new GenericData.Record(schema);
 			for (int i = 0; i < length; ++i) {
-				record.put(i, fieldConverters[i].convert(RowData.get(row, i, fieldTypes[i])));
+				final Schema.Field schemaField = fields.get(i);
+				Object avroObject = fieldConverters[i].convert(
+					schemaField.schema(),
+					fieldGetters[i].getFieldOrNull(row));
+				record.put(i, avroObject);
 			}
 			return record;
 		};
 	}
 
 	private static SerializationRuntimeConverter createConverter(LogicalType type) {
+		final SerializationRuntimeConverter converter;
 		switch (type.getTypeRoot()) {
 			case NULL:
-				return object -> null;
+				converter = (schema, object) -> null;
+				break;
 			case BOOLEAN: // boolean
 			case INTEGER: // int
 			case INTERVAL_YEAR_MONTH: // long
@@ -181,39 +196,74 @@ public class AvroRowDataSerializationSchema implements SerializationSchema<RowDa
 			case DOUBLE: // double
 			case TIME_WITHOUT_TIME_ZONE: // int
 			case DATE: // int
-				return avroObject -> avroObject;
+				converter = (schema, object) -> object;
+				break;
 			case CHAR:
 			case VARCHAR:
-				return object -> new Utf8(object.toString());
+				converter = (schema, object) -> new Utf8(object.toString());
+				break;
 			case BINARY:
 			case VARBINARY:
-				return object -> ByteBuffer.wrap((byte[]) object);
+				converter = (schema, object) -> ByteBuffer.wrap((byte[]) object);
+				break;
 			case TIMESTAMP_WITHOUT_TIME_ZONE:
-				return object -> ((TimestampData) object).toTimestamp().getTime();
+				converter = (schema, object) -> ((TimestampData) object).toTimestamp().getTime();
+				break;
 			case DECIMAL:
-				return object -> ByteBuffer.wrap(((DecimalData) object).toUnscaledBytes());
+				converter = (schema, object) -> ByteBuffer.wrap(((DecimalData) object).toUnscaledBytes());
+				break;
 			case ARRAY:
-				return createArrayConverter((ArrayType) type);
+				converter = createArrayConverter((ArrayType) type);
+				break;
 			case ROW:
-				return createRowConverter((RowType) type);
+				converter = createRowConverter((RowType) type);
+				break;
 			case MAP:
 			case MULTISET:
-				return createMapConverter(type);
+				converter = createMapConverter(type);
+				break;
 			case RAW:
 			default:
 				throw new UnsupportedOperationException("Unsupported type: " + type);
 		}
+
+		// wrap into nullable converter
+		return (schema, object) -> {
+			if (object == null) {
+				return null;
+			}
+
+			// get actual schema if it is a nullable schema
+			Schema actualSchema;
+			if (schema.getType() == Schema.Type.UNION) {
+				List<Schema> types = schema.getTypes();
+				int size = types.size();
+				if (size == 2 && types.get(1).getType() == Schema.Type.NULL) {
+					actualSchema = types.get(0);
+				} else if (size == 2 && types.get(0).getType() == Schema.Type.NULL) {
+					actualSchema = types.get(1);
+				} else {
+					throw new IllegalArgumentException(
+						"The Avro schema is not a nullable type: " + schema.toString());
+				}
+			} else {
+				actualSchema = schema;
+			}
+			return converter.convert(actualSchema, object);
+		};
 	}
 
 	private static SerializationRuntimeConverter createArrayConverter(ArrayType arrayType) {
+		LogicalType elementType = arrayType.getElementType();
+		final ArrayData.ElementGetter elementGetter = ArrayData.createElementGetter(elementType);
 		final SerializationRuntimeConverter elementConverter = createConverter(arrayType.getElementType());
-		final LogicalType elementType = arrayType.getElementType();
 
-		return object -> {
+		return (schema, object) -> {
+			final Schema elementSchema = schema.getElementType();
 			ArrayData arrayData = (ArrayData) object;
 			List<Object> list = new ArrayList<>();
 			for (int i = 0; i < arrayData.size(); ++i) {
-				list.add(elementConverter.convert(ArrayData.get(arrayData, i, elementType)));
+				list.add(elementConverter.convert(elementSchema, elementGetter.getElementOrNull(arrayData, i)));
 			}
 			return list;
 		};
@@ -221,16 +271,18 @@ public class AvroRowDataSerializationSchema implements SerializationSchema<RowDa
 
 	private static SerializationRuntimeConverter createMapConverter(LogicalType type) {
 		LogicalType valueType = extractValueTypeToAvroMap(type);
+		final ArrayData.ElementGetter valueGetter = ArrayData.createElementGetter(valueType);
 		final SerializationRuntimeConverter valueConverter = createConverter(valueType);
 
-		return object -> {
+		return (schema, object) -> {
+			final Schema valueSchema = schema.getValueType();
 			final MapData mapData = (MapData) object;
 			final ArrayData keyArray = mapData.keyArray();
 			final ArrayData valueArray = mapData.valueArray();
 			final Map<Object, Object> map = new HashMap<>(mapData.size());
 			for (int i = 0; i < mapData.size(); ++i) {
 				final String key = keyArray.getString(i).toString();
-				final Object value = valueConverter.convert(ArrayData.get(valueArray, i, valueType));
+				final Object value = valueConverter.convert(valueSchema, valueGetter.getElementOrNull(valueArray, i));
 				map.put(key, value);
 			}
 			return map;
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
index 774fadf..37745e5 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
@@ -31,6 +31,7 @@ import org.apache.flink.table.types.logical.LogicalTypeFamily;
 import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.MultisetType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimeType;
 import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
 import org.apache.flink.types.Row;
@@ -179,6 +180,7 @@ public class AvroSchemaConverter {
 	}
 
 	public static Schema convertToSchema(LogicalType logicalType, int rowTypeCounter) {
+		int precision;
 		switch (logicalType.getTypeRoot()) {
 			case NULL:
 				return SchemaBuilder.builder().nullType();
@@ -201,20 +203,25 @@ public class AvroSchemaConverter {
 			case TIMESTAMP_WITHOUT_TIME_ZONE:
 				// use long to represents Timestamp
 				final TimestampType timestampType = (TimestampType) logicalType;
-				int precision = timestampType.getPrecision();
+				precision = timestampType.getPrecision();
 				org.apache.avro.LogicalType avroLogicalType;
 				if (precision <= 3) {
 					avroLogicalType = LogicalTypes.timestampMillis();
 				} else {
-					throw new IllegalArgumentException("Avro Timestamp does not support Timestamp with precision: " +
-						precision +
-						", it only supports precision of 3 or 9.");
+					throw new IllegalArgumentException("Avro does not support TIMESTAMP type " +
+						"with precision: " + precision + ", it only supports precision less than 3.");
 				}
 				return avroLogicalType.addToSchema(SchemaBuilder.builder().longType());
 			case DATE:
 				// use int to represents Date
 				return LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType());
 			case TIME_WITHOUT_TIME_ZONE:
+				precision = ((TimeType) logicalType).getPrecision();
+				if (precision > 3) {
+					throw new IllegalArgumentException(
+						"Avro does not support TIME type with precision: " + precision +
+						", it only supports precision less than 3.");
+				}
 				// use int to represents Time, we only support millisecond when deserialization
 				return LogicalTypes.timeMillis().addToSchema(SchemaBuilder.builder().intType());
 			case DECIMAL:
@@ -254,14 +261,6 @@ public class AvroSchemaConverter {
 					.array()
 					.items(convertToSchema(arrayType.getElementType(), rowTypeCounter));
 			case RAW:
-				// if the union type has more than 2 types, it will be recognized a generic type
-				// see AvroRowDeserializationSchema#convertAvroType and AvroRowSerializationSchema#convertFlinkType
-				return SchemaBuilder.builder().unionOf()
-					.nullType().and()
-					.booleanType().and()
-					.longType().and()
-					.doubleType()
-					.endUnion();
 			case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
 			default:
 				throw new UnsupportedOperationException("Unsupported to derive Schema for type: " + logicalType);
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
index be0ddc4..fa499b7 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
@@ -22,9 +22,14 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.formats.avro.generated.User;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.Row;
 
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -34,6 +39,9 @@ import static org.junit.Assert.assertTrue;
  */
 public class AvroSchemaConverterTest {
 
+	@Rule
+	public ExpectedException thrown = ExpectedException.none();
+
 	@Test
 	public void testAvroClassConversion() {
 		validateUserSchema(AvroSchemaConverter.convertToTypeInfo(User.class));
@@ -45,6 +53,40 @@ public class AvroSchemaConverterTest {
 		validateUserSchema(AvroSchemaConverter.convertToTypeInfo(schema));
 	}
 
+	@Test
+	public void testInvalidRawTypeAvroSchemaConversion() {
+		RowType rowType = (RowType) TableSchema.builder()
+			.field("a", DataTypes.STRING())
+			.field("b", DataTypes.RAW(Types.GENERIC(AvroSchemaConverterTest.class)))
+			.build().toRowDataType().getLogicalType();
+		thrown.expect(UnsupportedOperationException.class);
+		thrown.expectMessage("Unsupported to derive Schema for type: RAW");
+		AvroSchemaConverter.convertToSchema(rowType);
+	}
+
+	@Test
+	public void testInvalidTimestampTypeAvroSchemaConversion() {
+		RowType rowType = (RowType) TableSchema.builder()
+			.field("a", DataTypes.STRING())
+			.field("b", DataTypes.TIMESTAMP(9))
+			.build().toRowDataType().getLogicalType();
+		thrown.expect(IllegalArgumentException.class);
+		thrown.expectMessage("Avro does not support TIMESTAMP type with precision: 9, " +
+			"it only supports precision less than 3.");
+		AvroSchemaConverter.convertToSchema(rowType);
+	}
+
+	@Test
+	public void testInvalidTimeTypeAvroSchemaConversion() {
+		RowType rowType = (RowType) TableSchema.builder()
+			.field("a", DataTypes.STRING())
+			.field("b", DataTypes.TIME(6))
+			.build().toRowDataType().getLogicalType();
+		thrown.expect(IllegalArgumentException.class);
+		thrown.expectMessage("Avro does not support TIME type with precision: 6, it only supports precision less than 3.");
+		AvroSchemaConverter.convertToSchema(rowType);
+	}
+
 	private void validateUserSchema(TypeInformation<?> actual) {
 		final TypeInformation<Row> address = Types.ROW_NAMED(
 			new String[]{