You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/14 06:37:29 UTC

[GitHub] [flink] wuchong commented on a change in pull request #12133: [FLINK-17526][avro] Support AVRO serialization and deseriazation sche…

wuchong commented on a change in pull request #12133:
URL: https://github.com/apache/flink/pull/12133#discussion_r424864692



##########
File path: flink-formats/flink-avro/pom.xml
##########
@@ -93,6 +93,14 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
+		<!-- Arvo RowData schema test dependency -->

Review comment:
       ```suggestion
   		<!-- Avro RowData schema test dependency -->
   ```

##########
File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataDeserializationSchema.java
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.joda.time.DateTime;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TimeZone;
+
+/**
+ * Deserialization schema from Avro bytes to {@link RowData}.
+ *
+ * <p>Deserializes the <code>byte[]</code> messages into (nested) Flink RowData. It converts Avro types
+ * into types that are compatible with Flink's Table & SQL API.
+ *
+ * <p>Projects with Avro records containing logical date/time types need to add a JodaTime
+ * dependency.
+ *
+ * <p>Note: Changes in this class need to be kept in sync with the corresponding runtime
+ * class {@link AvroRowDataSerializationSchema} and schema converter {@link AvroSchemaConverter}.
+ */
+@PublicEvolving
+public class AvroRowDataDeserializationSchema extends AbstractDeserializationSchema<RowData> {

Review comment:
       Please add `serialVersionUID`.

##########
File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataDeserializationSchema.java
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.joda.time.DateTime;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TimeZone;
+
+/**
+ * Deserialization schema from Avro bytes to {@link RowData}.
+ *
+ * <p>Deserializes the <code>byte[]</code> messages into (nested) Flink RowData. It converts Avro types
+ * into types that are compatible with Flink's Table & SQL API.
+ *
+ * <p>Projects with Avro records containing logical date/time types need to add a JodaTime
+ * dependency.
+ *
+ * <p>Note: Changes in this class need to be kept in sync with the corresponding runtime
+ * class {@link AvroRowDataSerializationSchema} and schema converter {@link AvroSchemaConverter}.
+ */
+@PublicEvolving
+public class AvroRowDataDeserializationSchema extends AbstractDeserializationSchema<RowData> {
+
+	/**
+	 * Used for time conversions into SQL types.
+	 */
+	private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
+
+	/**
+	 * Logical type describing the result type.
+	 */
+	private RowType rowType;
+
+	/**
+	 * Type information describing the result type.
+	 */
+	private TypeInformation<RowData> typeInfo;
+
+	/**
+	 * Avro serialization schema.
+	 */
+	private transient Schema schema;
+
+	/**
+	 * Record to deserialize byte array.
+	 */
+	private transient IndexedRecord record;
+
+	/**
+	 * Reader that deserializes byte array into a record.
+	 */
+	private transient DatumReader<IndexedRecord> datumReader;
+
+	/**
+	 * Input stream to read message from.
+	 */
+	private transient MutableByteArrayInputStream inputStream;
+
+	/**
+	 * Avro decoder that decodes binary data.
+	 */
+	private transient Decoder decoder;
+
+	/**
+	 * Creates a Avro deserialization schema for the given logical type.
+	 *
+	 * @param rowType The logical type used to deserialize the data.
+	 * @param typeInfo The TypeInformation to be used by {@link AvroRowDataDeserializationSchema#getProducedType()}.
+	 */
+	public AvroRowDataDeserializationSchema(
+			RowType rowType,
+			TypeInformation<RowData> typeInfo) {
+		this.rowType = rowType;
+		this.typeInfo = typeInfo;
+		this.schema = AvroSchemaConverter.convertToSchema(rowType);
+		this.record = new GenericData.Record(schema);
+		this.datumReader = new SpecificDatumReader<>(schema);
+		this.inputStream = new MutableByteArrayInputStream();
+		this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
+	}
+
+	@Override
+	public RowData deserialize(byte[] message) throws IOException {
+		try {
+			inputStream.setBuffer(message);
+			record = datumReader.read(record, decoder);
+			return convertAvroRecordToRowData(schema, rowType, record);
+		} catch (Exception e) {
+			throw new IOException("Failed to deserialize Avro record.", e);
+		}
+	}
+
+	@Override
+	public TypeInformation<RowData> getProducedType() {
+		return typeInfo;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		final AvroRowDataDeserializationSchema that = (AvroRowDataDeserializationSchema) o;
+		return Objects.equals(rowType, that.rowType) &&
+			Objects.equals(typeInfo, that.typeInfo);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(rowType, typeInfo);
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private RowData convertAvroRecordToRowData(Schema schema, RowType rowType, IndexedRecord record) {
+		final List<Schema.Field> fields = schema.getFields();
+		final int length = fields.size();
+		final GenericRowData row = new GenericRowData(length);
+		for (int i = 0; i < length; i++) {
+			final Schema.Field field = fields.get(i);
+			row.setField(i, convertAvroType(field.schema(), rowType.getTypeAt(i), record.get(i)));
+		}
+		return row;
+	}
+
+	private Object convertAvroType(Schema schema, LogicalType logicalType, Object object) {
+		if (object == null) {
+			return null;
+		}
+		switch (schema.getType()) {
+			case RECORD:
+				if (object instanceof IndexedRecord) {
+					return convertAvroRecordToRowData(schema, (RowType) logicalType, (IndexedRecord) object);
+				}
+				throw new IllegalStateException("IndexedRecord expected but was: " + object.getClass());
+			case ENUM:
+			case STRING:
+				return StringData.fromString(object.toString());
+			case ARRAY:
+					final LogicalType elementType = ((ArrayType) logicalType).getElementType();
+					return convertToObjectArray(schema.getElementType(), elementType, object);
+			case MAP:
+				final Map<?, ?> mapData = (Map<?, ?>) object;
+				final Map<StringData, Object> convertedMap = new HashMap<>();
+
+				for (Map.Entry<?, ?> entry : mapData.entrySet()) {
+					convertedMap.put(
+						StringData.fromString(entry.getKey().toString()), // key must be String.
+						convertAvroType(
+							schema.getValueType(),
+							((MapType) logicalType).getValueType(),
+							entry.getValue()));
+				}
+				return new GenericMapData(convertedMap);
+			case UNION:
+				final List<Schema> types = schema.getTypes();
+				final int size = types.size();
+				if (size == 2 && types.get(0).getType() == Schema.Type.NULL) {
+					return convertAvroType(types.get(1), logicalType, object);
+				} else if (size == 2 && types.get(1).getType() == Schema.Type.NULL) {
+					return convertAvroType(types.get(0), logicalType, object);
+				} else if (size == 1) {
+					return convertAvroType(types.get(0), logicalType, object);
+				} else {
+					throw new FlinkRuntimeException("Avro union is not supported.");
+				}
+			case FIXED:
+				final byte[] fixedBytes = ((GenericFixed) object).bytes();
+				if (LogicalTypeChecks.hasRoot(logicalType, LogicalTypeRoot.DECIMAL)) {
+					return convertToDecimal(schema, fixedBytes);
+				}
+				return fixedBytes;
+			case BYTES:
+				final ByteBuffer byteBuffer = (ByteBuffer) object;
+				final byte[] bytes = new byte[byteBuffer.remaining()];
+				byteBuffer.get(bytes);
+				if (LogicalTypeChecks.hasRoot(logicalType, LogicalTypeRoot.DECIMAL)) {
+					return convertToDecimal(schema, bytes);
+				}
+				return bytes;
+			case LONG:
+				if (LogicalTypeChecks.hasRoot(logicalType, LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)) {
+					return convertToTimestamp(object);
+				}
+				return object;
+			case INT:
+			case FLOAT:
+			case DOUBLE:
+			case BOOLEAN:
+				return object;
+		}
+		throw new RuntimeException("Unsupported Avro type:" + schema);
+	}
+
+	private DecimalData convertToDecimal(Schema schema, byte[] bytes) {
+		final LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) schema.getLogicalType();
+		return DecimalData.fromUnscaledBytes(bytes, decimalType.getPrecision(), decimalType.getScale());
+	}
+
+	private TimestampData convertToTimestamp(Object object) {
+		final long millis;
+		if (object instanceof Long) {
+			millis = (Long) object;
+		} else {
+			// use 'provided' Joda time
+			final DateTime value = (DateTime) object;
+			millis = value.toDate().getTime();
+		}
+		return TimestampData.fromEpochMillis(millis - LOCAL_TZ.getOffset(millis));

Review comment:
       We don't need to minus LOCAL_TZ offset for `TimestampData`, the `millis` is already an epoch milliseconds.

##########
File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
##########
@@ -157,4 +167,204 @@ private AvroSchemaConverter() {
 		}
 		throw new IllegalArgumentException("Unsupported Avro type '" + schema.getType() + "'.");
 	}
+
+	/**
+	 * Converts an Avro class into a nested row structure with deterministic field order and data
+	 * types that are compatible with Flink's Table & SQL API.
+	 *
+	 * @param avroClass Avro specific record that contains schema information
+	 * @return logical type matching the schema
+	 */
+	public static <T extends SpecificRecord> LogicalType convertToLogicalType(Class<T> avroClass) {
+		Preconditions.checkNotNull(avroClass, "Avro specific record class must not be null.");
+		// determine schema to retrieve deterministic field order
+		final Schema schema = SpecificData.get().getSchema(avroClass);
+		return convertToLogicalType(schema);
+	}
+
+	/**
+	 * Converts an Avro schema string into a nested row structure with deterministic
+	 * field order and data types that are compatible with Flink's Table & SQL API.
+	 *
+	 * @param avroSchemaString Avro schema definition string
+	 * @return logical type matching the schema
+	 */
+	public static LogicalType convertToLogicalType(String avroSchemaString) {

Review comment:
       We don't need this for the new Avro RowData schema. Because the schema is always from table schema (i.e. derive schema). We can introduce it later if needed. 

##########
File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataDeserializationSchema.java
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.joda.time.DateTime;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TimeZone;
+
+/**
+ * Deserialization schema from Avro bytes to {@link RowData}.
+ *
+ * <p>Deserializes the <code>byte[]</code> messages into (nested) Flink RowData. It converts Avro types
+ * into types that are compatible with Flink's Table & SQL API.
+ *
+ * <p>Projects with Avro records containing logical date/time types need to add a JodaTime
+ * dependency.
+ *
+ * <p>Note: Changes in this class need to be kept in sync with the corresponding runtime
+ * class {@link AvroRowDataSerializationSchema} and schema converter {@link AvroSchemaConverter}.
+ */
+@PublicEvolving
+public class AvroRowDataDeserializationSchema extends AbstractDeserializationSchema<RowData> {
+
+	/**
+	 * Used for time conversions into SQL types.
+	 */
+	private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
+
+	/**
+	 * Logical type describing the result type.
+	 */
+	private RowType rowType;
+
+	/**
+	 * Type information describing the result type.
+	 */
+	private TypeInformation<RowData> typeInfo;
+
+	/**
+	 * Avro serialization schema.
+	 */
+	private transient Schema schema;
+
+	/**
+	 * Record to deserialize byte array.
+	 */
+	private transient IndexedRecord record;
+
+	/**
+	 * Reader that deserializes byte array into a record.
+	 */
+	private transient DatumReader<IndexedRecord> datumReader;
+
+	/**
+	 * Input stream to read message from.
+	 */
+	private transient MutableByteArrayInputStream inputStream;
+
+	/**
+	 * Avro decoder that decodes binary data.
+	 */
+	private transient Decoder decoder;
+
+	/**
+	 * Creates a Avro deserialization schema for the given logical type.
+	 *
+	 * @param rowType The logical type used to deserialize the data.
+	 * @param typeInfo The TypeInformation to be used by {@link AvroRowDataDeserializationSchema#getProducedType()}.
+	 */
+	public AvroRowDataDeserializationSchema(
+			RowType rowType,
+			TypeInformation<RowData> typeInfo) {
+		this.rowType = rowType;
+		this.typeInfo = typeInfo;
+		this.schema = AvroSchemaConverter.convertToSchema(rowType);
+		this.record = new GenericData.Record(schema);
+		this.datumReader = new SpecificDatumReader<>(schema);
+		this.inputStream = new MutableByteArrayInputStream();
+		this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
+	}
+
+	@Override
+	public RowData deserialize(byte[] message) throws IOException {
+		try {
+			inputStream.setBuffer(message);
+			record = datumReader.read(record, decoder);
+			return convertAvroRecordToRowData(schema, rowType, record);
+		} catch (Exception e) {
+			throw new IOException("Failed to deserialize Avro record.", e);
+		}
+	}
+
+	@Override
+	public TypeInformation<RowData> getProducedType() {
+		return typeInfo;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		final AvroRowDataDeserializationSchema that = (AvroRowDataDeserializationSchema) o;
+		return Objects.equals(rowType, that.rowType) &&
+			Objects.equals(typeInfo, that.typeInfo);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(rowType, typeInfo);
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private RowData convertAvroRecordToRowData(Schema schema, RowType rowType, IndexedRecord record) {

Review comment:
       We should avoid to perform analysis on the LogicalType during runtime. The type information is fixed. We can create a runtime converter before runtime which don't need to go with the big switch for every record. You can take `CsvRowDataDeserializationSchema` as an example.

##########
File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataDeserializationSchema.java
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.joda.time.DateTime;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TimeZone;
+
+/**
+ * Deserialization schema from Avro bytes to {@link RowData}.
+ *
+ * <p>Deserializes the <code>byte[]</code> messages into (nested) Flink RowData. It converts Avro types
+ * into types that are compatible with Flink's Table & SQL API.
+ *
+ * <p>Projects with Avro records containing logical date/time types need to add a JodaTime
+ * dependency.
+ *
+ * <p>Note: Changes in this class need to be kept in sync with the corresponding runtime
+ * class {@link AvroRowDataSerializationSchema} and schema converter {@link AvroSchemaConverter}.
+ */
+@PublicEvolving
+public class AvroRowDataDeserializationSchema extends AbstractDeserializationSchema<RowData> {
+
+	/**
+	 * Used for time conversions into SQL types.
+	 */
+	private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
+
+	/**
+	 * Logical type describing the result type.
+	 */
+	private RowType rowType;
+
+	/**
+	 * Type information describing the result type.
+	 */
+	private TypeInformation<RowData> typeInfo;
+
+	/**
+	 * Avro serialization schema.
+	 */
+	private transient Schema schema;
+
+	/**
+	 * Record to deserialize byte array.
+	 */
+	private transient IndexedRecord record;
+
+	/**
+	 * Reader that deserializes byte array into a record.
+	 */
+	private transient DatumReader<IndexedRecord> datumReader;
+
+	/**
+	 * Input stream to read message from.
+	 */
+	private transient MutableByteArrayInputStream inputStream;
+
+	/**
+	 * Avro decoder that decodes binary data.
+	 */
+	private transient Decoder decoder;
+
+	/**
+	 * Creates a Avro deserialization schema for the given logical type.
+	 *
+	 * @param rowType The logical type used to deserialize the data.
+	 * @param typeInfo The TypeInformation to be used by {@link AvroRowDataDeserializationSchema#getProducedType()}.
+	 */
+	public AvroRowDataDeserializationSchema(
+			RowType rowType,
+			TypeInformation<RowData> typeInfo) {
+		this.rowType = rowType;
+		this.typeInfo = typeInfo;
+		this.schema = AvroSchemaConverter.convertToSchema(rowType);
+		this.record = new GenericData.Record(schema);
+		this.datumReader = new SpecificDatumReader<>(schema);
+		this.inputStream = new MutableByteArrayInputStream();
+		this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
+	}
+
+	@Override
+	public RowData deserialize(byte[] message) throws IOException {
+		try {
+			inputStream.setBuffer(message);
+			record = datumReader.read(record, decoder);
+			return convertAvroRecordToRowData(schema, rowType, record);
+		} catch (Exception e) {
+			throw new IOException("Failed to deserialize Avro record.", e);
+		}
+	}
+
+	@Override
+	public TypeInformation<RowData> getProducedType() {
+		return typeInfo;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		final AvroRowDataDeserializationSchema that = (AvroRowDataDeserializationSchema) o;
+		return Objects.equals(rowType, that.rowType) &&
+			Objects.equals(typeInfo, that.typeInfo);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(rowType, typeInfo);
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private RowData convertAvroRecordToRowData(Schema schema, RowType rowType, IndexedRecord record) {
+		final List<Schema.Field> fields = schema.getFields();
+		final int length = fields.size();
+		final GenericRowData row = new GenericRowData(length);
+		for (int i = 0; i < length; i++) {
+			final Schema.Field field = fields.get(i);
+			row.setField(i, convertAvroType(field.schema(), rowType.getTypeAt(i), record.get(i)));
+		}
+		return row;
+	}
+
+	private Object convertAvroType(Schema schema, LogicalType logicalType, Object object) {

Review comment:
       I think we don't need the hint from `schema`, it's just used to get the `IndexedRecord`. The schema information is full in `logicalType`.
   It seems that we also miss some types like DATE, TIME .
   

##########
File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
##########
@@ -157,4 +167,204 @@ private AvroSchemaConverter() {
 		}
 		throw new IllegalArgumentException("Unsupported Avro type '" + schema.getType() + "'.");
 	}
+
+	/**
+	 * Converts an Avro class into a nested row structure with deterministic field order and data
+	 * types that are compatible with Flink's Table & SQL API.
+	 *
+	 * @param avroClass Avro specific record that contains schema information
+	 * @return logical type matching the schema
+	 */
+	public static <T extends SpecificRecord> LogicalType convertToLogicalType(Class<T> avroClass) {

Review comment:
       We don't need this for the new Avro RowData schema. Because the schema is always from table schema (i.e. derive schema). We can introduce it later if needed.

##########
File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
##########
@@ -157,4 +167,204 @@ private AvroSchemaConverter() {
 		}
 		throw new IllegalArgumentException("Unsupported Avro type '" + schema.getType() + "'.");
 	}
+
+	/**
+	 * Converts an Avro class into a nested row structure with deterministic field order and data
+	 * types that are compatible with Flink's Table & SQL API.
+	 *
+	 * @param avroClass Avro specific record that contains schema information
+	 * @return logical type matching the schema
+	 */
+	public static <T extends SpecificRecord> LogicalType convertToLogicalType(Class<T> avroClass) {
+		Preconditions.checkNotNull(avroClass, "Avro specific record class must not be null.");
+		// determine schema to retrieve deterministic field order
+		final Schema schema = SpecificData.get().getSchema(avroClass);
+		return convertToLogicalType(schema);
+	}
+
+	/**
+	 * Converts an Avro schema string into a nested row structure with deterministic
+	 * field order and data types that are compatible with Flink's Table & SQL API.
+	 *
+	 * @param avroSchemaString Avro schema definition string
+	 * @return logical type matching the schema
+	 */
+	public static LogicalType convertToLogicalType(String avroSchemaString) {
+		Preconditions.checkNotNull(avroSchemaString, "Avro schema must not be null.");
+		final Schema schema;
+		try {
+			schema = new Schema.Parser().parse(avroSchemaString);
+		} catch (SchemaParseException e) {
+			throw new IllegalArgumentException("Could not parse Avro schema string.", e);
+		}
+		return convertToLogicalType(schema);
+	}
+
+	/**
+	 * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro schema.
+	 *
+	 * @param logicalType logical type
+	 * @return Avro's {@link Schema} matching this logical type.
+	 */
+	public static Schema convertToSchema(LogicalType logicalType) {
+		return convertToSchema(logicalType, 0);
+	}
+
+	private static LogicalType convertToLogicalType(Schema schema) {
+		return convertToDataType(schema).getLogicalType();
+	}
+
+	private static DataType convertToDataType(Schema schema) {
+		switch (schema.getType()) {
+			case RECORD:
+				final List<Schema.Field> fields = schema.getFields();
+				final DataTypes.Field[] dataTypeFields = new DataTypes.Field[fields.size()];
+				for (int i = 0; i < fields.size(); i++) {
+					final Schema.Field field = fields.get(i);
+					dataTypeFields[i] = DataTypes.FIELD(
+						field.name(),
+						convertToDataType(field.schema()));
+				}
+				return DataTypes.ROW(dataTypeFields);
+			case ENUM:
+			case STRING:
+				// convert Avro's Utf8/CharSequence to String
+				return DataTypes.STRING();
+			case ARRAY:
+				// result type might either be ObjectArrayTypeInfo or BasicArrayTypeInfo for Strings
+				return DataTypes.ARRAY(convertToDataType(schema.getElementType()));
+			case MAP:
+				return DataTypes.MAP(DataTypes.STRING(), convertToDataType(schema.getValueType()));
+			case UNION:
+				final Schema actualSchema;
+				if (schema.getTypes().size() == 2 && schema.getTypes().get(0).getType() == Schema.Type.NULL) {
+					actualSchema = schema.getTypes().get(1);
+				} else if (schema.getTypes().size() == 2 && schema.getTypes().get(1).getType() == Schema.Type.NULL) {
+					actualSchema = schema.getTypes().get(0);
+				} else if (schema.getTypes().size() == 1) {
+					actualSchema = schema.getTypes().get(0);
+				} else {
+					// use Kryo for serialization
+					return DataTypes.RAW(Types.GENERIC(Object.class));
+				}
+				return convertToDataType(actualSchema);
+			case FIXED:
+			case BYTES:
+				// logical decimal type
+				if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
+					LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) schema.getLogicalType();
+					return DataTypes.DECIMAL(decimalType.getPrecision(), decimalType.getScale());
+				}
+				// convert fixed size binary data to primitive byte arrays
+				return DataTypes.BYTES();
+			case INT:
+				// logical date and time type
+				final org.apache.avro.LogicalType logicalType = schema.getLogicalType();
+				if (logicalType == LogicalTypes.date()) {
+					return DataTypes.DATE();
+				} else if (logicalType == LogicalTypes.timeMillis()) {
+					return DataTypes.TIME(3);
+				}
+				return DataTypes.INT();
+			case LONG:
+				// logical timestamp type
+				if (schema.getLogicalType() == LogicalTypes.timestampMillis()) {
+					return DataTypes.TIMESTAMP(3);
+				}
+				return DataTypes.BIGINT();
+			case FLOAT:
+				return DataTypes.FLOAT();
+			case DOUBLE:
+				return DataTypes.DOUBLE();
+			case BOOLEAN:
+				return DataTypes.BOOLEAN();
+			case NULL:
+				return DataTypes.NULL();
+		}
+		throw new IllegalArgumentException("Unsupported Avro type '" + schema.getType() + "'.");
+	}
+
+	private static Schema convertToSchema(LogicalType logicalType, int rowTypeCounter) {
+		switch (logicalType.getTypeRoot()) {
+			case NULL:
+				return SchemaBuilder.builder().nullType();
+			case BOOLEAN:
+				return SchemaBuilder.builder().nullable().booleanType();

Review comment:
       We don't need to hard code all avro type nullable.  We can use the nullablity in `LogicalType` to call `notNull()` or `nullable`. We can move this part to a separate method.

##########
File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataSerializationSchema.java
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.util.Utf8;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TimeZone;
+
+/**
+ * Serialization schema that serializes {@link RowData} into Avro bytes.
+ *
+ * <p>Serializes objects that are represented in (nested) Flink RowData. It support types that
+ * are compatible with Flink's Table & SQL API.
+ *
+ * <p>Note: Changes in this class need to be kept in sync with the corresponding runtime
+ * class {@link AvroRowDataDeserializationSchema} and schema converter {@link AvroSchemaConverter}.
+ */
+public class AvroRowDataSerializationSchema implements SerializationSchema<RowData> {

Review comment:
       Please add `serialVersionUID`.

##########
File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
##########
@@ -157,4 +167,204 @@ private AvroSchemaConverter() {
 		}
 		throw new IllegalArgumentException("Unsupported Avro type '" + schema.getType() + "'.");
 	}
+
+	/**
+	 * Converts an Avro class into a nested row structure with deterministic field order and data
+	 * types that are compatible with Flink's Table & SQL API.
+	 *
+	 * @param avroClass Avro specific record that contains schema information
+	 * @return logical type matching the schema
+	 */
+	public static <T extends SpecificRecord> LogicalType convertToLogicalType(Class<T> avroClass) {
+		Preconditions.checkNotNull(avroClass, "Avro specific record class must not be null.");
+		// determine schema to retrieve deterministic field order
+		final Schema schema = SpecificData.get().getSchema(avroClass);
+		return convertToLogicalType(schema);
+	}
+
+	/**
+	 * Converts an Avro schema string into a nested row structure with deterministic
+	 * field order and data types that are compatible with Flink's Table & SQL API.
+	 *
+	 * @param avroSchemaString Avro schema definition string
+	 * @return logical type matching the schema
+	 */
+	public static LogicalType convertToLogicalType(String avroSchemaString) {
+		Preconditions.checkNotNull(avroSchemaString, "Avro schema must not be null.");
+		final Schema schema;
+		try {
+			schema = new Schema.Parser().parse(avroSchemaString);
+		} catch (SchemaParseException e) {
+			throw new IllegalArgumentException("Could not parse Avro schema string.", e);
+		}
+		return convertToLogicalType(schema);
+	}
+
+	/**
+	 * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro schema.
+	 *
+	 * @param logicalType logical type
+	 * @return Avro's {@link Schema} matching this logical type.
+	 */
+	public static Schema convertToSchema(LogicalType logicalType) {
+		return convertToSchema(logicalType, 0);
+	}
+
+	private static LogicalType convertToLogicalType(Schema schema) {
+		return convertToDataType(schema).getLogicalType();
+	}
+
+	private static DataType convertToDataType(Schema schema) {
+		switch (schema.getType()) {
+			case RECORD:
+				final List<Schema.Field> fields = schema.getFields();
+				final DataTypes.Field[] dataTypeFields = new DataTypes.Field[fields.size()];
+				for (int i = 0; i < fields.size(); i++) {
+					final Schema.Field field = fields.get(i);
+					dataTypeFields[i] = DataTypes.FIELD(
+						field.name(),
+						convertToDataType(field.schema()));
+				}
+				return DataTypes.ROW(dataTypeFields);
+			case ENUM:
+			case STRING:
+				// convert Avro's Utf8/CharSequence to String
+				return DataTypes.STRING();
+			case ARRAY:
+				// result type might either be ObjectArrayTypeInfo or BasicArrayTypeInfo for Strings
+				return DataTypes.ARRAY(convertToDataType(schema.getElementType()));
+			case MAP:
+				return DataTypes.MAP(DataTypes.STRING(), convertToDataType(schema.getValueType()));
+			case UNION:
+				final Schema actualSchema;
+				if (schema.getTypes().size() == 2 && schema.getTypes().get(0).getType() == Schema.Type.NULL) {
+					actualSchema = schema.getTypes().get(1);
+				} else if (schema.getTypes().size() == 2 && schema.getTypes().get(1).getType() == Schema.Type.NULL) {
+					actualSchema = schema.getTypes().get(0);
+				} else if (schema.getTypes().size() == 1) {
+					actualSchema = schema.getTypes().get(0);
+				} else {
+					// use Kryo for serialization
+					return DataTypes.RAW(Types.GENERIC(Object.class));
+				}
+				return convertToDataType(actualSchema);
+			case FIXED:
+			case BYTES:
+				// logical decimal type
+				if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
+					LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) schema.getLogicalType();
+					return DataTypes.DECIMAL(decimalType.getPrecision(), decimalType.getScale());
+				}
+				// convert fixed size binary data to primitive byte arrays
+				return DataTypes.BYTES();
+			case INT:
+				// logical date and time type
+				final org.apache.avro.LogicalType logicalType = schema.getLogicalType();
+				if (logicalType == LogicalTypes.date()) {
+					return DataTypes.DATE();
+				} else if (logicalType == LogicalTypes.timeMillis()) {
+					return DataTypes.TIME(3);
+				}
+				return DataTypes.INT();
+			case LONG:
+				// logical timestamp type
+				if (schema.getLogicalType() == LogicalTypes.timestampMillis()) {
+					return DataTypes.TIMESTAMP(3);
+				}
+				return DataTypes.BIGINT();
+			case FLOAT:
+				return DataTypes.FLOAT();
+			case DOUBLE:
+				return DataTypes.DOUBLE();
+			case BOOLEAN:
+				return DataTypes.BOOLEAN();
+			case NULL:
+				return DataTypes.NULL();
+		}
+		throw new IllegalArgumentException("Unsupported Avro type '" + schema.getType() + "'.");
+	}
+
+	private static Schema convertToSchema(LogicalType logicalType, int rowTypeCounter) {
+		switch (logicalType.getTypeRoot()) {
+			case NULL:
+				return SchemaBuilder.builder().nullType();
+			case BOOLEAN:
+				return SchemaBuilder.builder().nullable().booleanType();
+			case INTEGER:
+				return SchemaBuilder.builder().nullable().intType();
+			case BIGINT:
+				return SchemaBuilder.builder().nullable().longType();
+			case FLOAT:
+				return SchemaBuilder.builder().nullable().floatType();
+			case DOUBLE:
+				return SchemaBuilder.builder().nullable().doubleType();
+			case CHAR:
+			case VARCHAR:
+				return SchemaBuilder.builder().nullable().stringType();
+			case BINARY:
+			case VARBINARY:
+				return SchemaBuilder.builder().nullable().bytesType();
+			case TIMESTAMP_WITHOUT_TIME_ZONE:
+				// use long to represents Timestamp
+				return LogicalTypes.timestampMillis().addToSchema(SchemaBuilder.builder().longType());
+			case DATE:
+				// use int to represents Date
+				return LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType());
+			case TIME_WITHOUT_TIME_ZONE:
+				// use int to represents Time, we only support millisecond when deserialization
+				return LogicalTypes.timeMillis().addToSchema(SchemaBuilder.builder().intType());
+			case DECIMAL:
+				DecimalType decimalType = (DecimalType) logicalType;
+				// store BigDecimal as byte[]
+				return LogicalTypes
+					.decimal(decimalType.getPrecision(), decimalType.getScale())
+					.addToSchema(SchemaBuilder.builder().bytesType());
+			case ROW:
+				RowType rowType = (RowType) logicalType;
+				List<String> fieldNames = rowType.getFieldNames();
+				// we have to make sure the record name is different in a Schema
+				SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder
+					.builder()
+					.record("row_" + rowTypeCounter)
+					.fields();
+				rowTypeCounter++;
+				for (int i = 0; i < rowType.getFieldCount(); i++) {
+					builder = builder
+						.name(fieldNames.get(i))
+						.type(convertToSchema(rowType.getTypeAt(i), rowTypeCounter))
+						.noDefault();
+				}
+				return builder.endRecord();
+			case MAP:
+				MapType mapType = (MapType) logicalType;
+				if (!hasFamily(mapType.getKeyType(), LogicalTypeFamily.CHARACTER_STRING)) {
+					throw new IllegalArgumentException(
+						"Avro assumes map keys are strings, " + mapType.getKeyType() +
+							" type as key type is not supported.");
+				}
+				return SchemaBuilder
+					.builder()
+					.nullable() // will be UNION of Array and null
+					.map()
+					.values(convertToSchema(mapType.getValueType(), rowTypeCounter));
+			case ARRAY:
+				ArrayType arrayType = (ArrayType) logicalType;
+				return SchemaBuilder
+					.builder()
+					.nullable() // wil be union of array and null
+					.array()
+					.items(convertToSchema(arrayType.getElementType(), rowTypeCounter));
+			case RAW:
+				// if the union type has more than 2 types, it will be recognized a generic type
+				// see AvroRowDeserializationSchema#convertAvroType and AvroRowSerializationSchema#convertFlinkType
+				return SchemaBuilder.builder().unionOf()
+					.nullType().and()
+					.booleanType().and()
+					.longType().and()
+					.doubleType()
+					.endUnion();
+			default:
+				throw new UnsupportedOperationException("Unsupport to derive Schema for type: " + logicalType);

Review comment:
       ```suggestion
   				throw new UnsupportedOperationException("Unsupported to derive Schema for type: " + logicalType);
   ```

##########
File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
##########
@@ -157,4 +167,204 @@ private AvroSchemaConverter() {
 		}
 		throw new IllegalArgumentException("Unsupported Avro type '" + schema.getType() + "'.");
 	}
+
+	/**
+	 * Converts an Avro class into a nested row structure with deterministic field order and data
+	 * types that are compatible with Flink's Table & SQL API.
+	 *
+	 * @param avroClass Avro specific record that contains schema information
+	 * @return logical type matching the schema
+	 */
+	public static <T extends SpecificRecord> LogicalType convertToLogicalType(Class<T> avroClass) {
+		Preconditions.checkNotNull(avroClass, "Avro specific record class must not be null.");
+		// determine schema to retrieve deterministic field order
+		final Schema schema = SpecificData.get().getSchema(avroClass);
+		return convertToLogicalType(schema);
+	}
+
+	/**
+	 * Converts an Avro schema string into a nested row structure with deterministic
+	 * field order and data types that are compatible with Flink's Table & SQL API.
+	 *
+	 * @param avroSchemaString Avro schema definition string
+	 * @return logical type matching the schema
+	 */
+	public static LogicalType convertToLogicalType(String avroSchemaString) {
+		Preconditions.checkNotNull(avroSchemaString, "Avro schema must not be null.");
+		final Schema schema;
+		try {
+			schema = new Schema.Parser().parse(avroSchemaString);
+		} catch (SchemaParseException e) {
+			throw new IllegalArgumentException("Could not parse Avro schema string.", e);
+		}
+		return convertToLogicalType(schema);
+	}
+
+	/**
+	 * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro schema.
+	 *
+	 * @param logicalType logical type
+	 * @return Avro's {@link Schema} matching this logical type.
+	 */
+	public static Schema convertToSchema(LogicalType logicalType) {
+		return convertToSchema(logicalType, 0);
+	}
+
+	private static LogicalType convertToLogicalType(Schema schema) {
+		return convertToDataType(schema).getLogicalType();
+	}
+
+	private static DataType convertToDataType(Schema schema) {
+		switch (schema.getType()) {
+			case RECORD:
+				final List<Schema.Field> fields = schema.getFields();
+				final DataTypes.Field[] dataTypeFields = new DataTypes.Field[fields.size()];
+				for (int i = 0; i < fields.size(); i++) {
+					final Schema.Field field = fields.get(i);
+					dataTypeFields[i] = DataTypes.FIELD(
+						field.name(),
+						convertToDataType(field.schema()));
+				}
+				return DataTypes.ROW(dataTypeFields);
+			case ENUM:
+			case STRING:
+				// convert Avro's Utf8/CharSequence to String
+				return DataTypes.STRING();
+			case ARRAY:
+				// result type might either be ObjectArrayTypeInfo or BasicArrayTypeInfo for Strings
+				return DataTypes.ARRAY(convertToDataType(schema.getElementType()));
+			case MAP:
+				return DataTypes.MAP(DataTypes.STRING(), convertToDataType(schema.getValueType()));
+			case UNION:
+				final Schema actualSchema;
+				if (schema.getTypes().size() == 2 && schema.getTypes().get(0).getType() == Schema.Type.NULL) {
+					actualSchema = schema.getTypes().get(1);
+				} else if (schema.getTypes().size() == 2 && schema.getTypes().get(1).getType() == Schema.Type.NULL) {
+					actualSchema = schema.getTypes().get(0);
+				} else if (schema.getTypes().size() == 1) {
+					actualSchema = schema.getTypes().get(0);
+				} else {
+					// use Kryo for serialization
+					return DataTypes.RAW(Types.GENERIC(Object.class));
+				}
+				return convertToDataType(actualSchema);
+			case FIXED:
+			case BYTES:
+				// logical decimal type
+				if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
+					LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) schema.getLogicalType();
+					return DataTypes.DECIMAL(decimalType.getPrecision(), decimalType.getScale());
+				}
+				// convert fixed size binary data to primitive byte arrays
+				return DataTypes.BYTES();
+			case INT:
+				// logical date and time type
+				final org.apache.avro.LogicalType logicalType = schema.getLogicalType();
+				if (logicalType == LogicalTypes.date()) {
+					return DataTypes.DATE();
+				} else if (logicalType == LogicalTypes.timeMillis()) {
+					return DataTypes.TIME(3);
+				}
+				return DataTypes.INT();
+			case LONG:
+				// logical timestamp type
+				if (schema.getLogicalType() == LogicalTypes.timestampMillis()) {
+					return DataTypes.TIMESTAMP(3);
+				}
+				return DataTypes.BIGINT();
+			case FLOAT:
+				return DataTypes.FLOAT();
+			case DOUBLE:
+				return DataTypes.DOUBLE();
+			case BOOLEAN:
+				return DataTypes.BOOLEAN();
+			case NULL:
+				return DataTypes.NULL();
+		}
+		throw new IllegalArgumentException("Unsupported Avro type '" + schema.getType() + "'.");
+	}
+
+	private static Schema convertToSchema(LogicalType logicalType, int rowTypeCounter) {
+		switch (logicalType.getTypeRoot()) {
+			case NULL:
+				return SchemaBuilder.builder().nullType();
+			case BOOLEAN:
+				return SchemaBuilder.builder().nullable().booleanType();
+			case INTEGER:
+				return SchemaBuilder.builder().nullable().intType();
+			case BIGINT:
+				return SchemaBuilder.builder().nullable().longType();
+			case FLOAT:
+				return SchemaBuilder.builder().nullable().floatType();
+			case DOUBLE:
+				return SchemaBuilder.builder().nullable().doubleType();
+			case CHAR:
+			case VARCHAR:
+				return SchemaBuilder.builder().nullable().stringType();
+			case BINARY:
+			case VARBINARY:
+				return SchemaBuilder.builder().nullable().bytesType();
+			case TIMESTAMP_WITHOUT_TIME_ZONE:
+				// use long to represents Timestamp
+				return LogicalTypes.timestampMillis().addToSchema(SchemaBuilder.builder().longType());

Review comment:
       Throw exception when precision is larger than 3 if we only support millisecond?

##########
File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataDeserializationSchema.java
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.joda.time.DateTime;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TimeZone;
+
+/**
+ * Deserialization schema from Avro bytes to {@link RowData}.
+ *
+ * <p>Deserializes the <code>byte[]</code> messages into (nested) Flink RowData. It converts Avro types
+ * into types that are compatible with Flink's Table & SQL API.
+ *
+ * <p>Projects with Avro records containing logical date/time types need to add a JodaTime
+ * dependency.
+ *
+ * <p>Note: Changes in this class need to be kept in sync with the corresponding runtime
+ * class {@link AvroRowDataSerializationSchema} and schema converter {@link AvroSchemaConverter}.
+ */
+@PublicEvolving
+public class AvroRowDataDeserializationSchema extends AbstractDeserializationSchema<RowData> {

Review comment:
       We don't need to extends `AbstractDeserializationSchema`, please extends `DeserializationSchema`, because we always provide produced type manually, not extract from generic type.

##########
File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataSerializationSchema.java
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.util.Utf8;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TimeZone;
+
+/**
+ * Serialization schema that serializes {@link RowData} into Avro bytes.
+ *
+ * <p>Serializes objects that are represented in (nested) Flink RowData. It support types that
+ * are compatible with Flink's Table & SQL API.
+ *
+ * <p>Note: Changes in this class need to be kept in sync with the corresponding runtime
+ * class {@link AvroRowDataDeserializationSchema} and schema converter {@link AvroSchemaConverter}.
+ */
+public class AvroRowDataSerializationSchema implements SerializationSchema<RowData> {

Review comment:
       The review comments for `AvroRowDataDeserializationSchema` also apply to this class. 

##########
File path: flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.formats.avro.utils.AvroTestUtils;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for the Avro serialization and deserialization schema.
+ */
+public class AvroRowDataDeSerializationSchemaTest {
+
+	@Test
+	public void testSerializeDeserialize() throws IOException {
+		final Tuple2<String, RowData> testData = AvroTestUtils.getGenericTestRowData();
+
+		final RowType rowType = (RowType) AvroSchemaConverter.convertToLogicalType(testData.f0);

Review comment:
       The schema should not come from a schema string, but a table schema. You can manually create a `RowType` like `JsonRowDataSerDeSchemaTest#testSerDe`.

##########
File path: flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.formats.avro.utils.AvroTestUtils;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for the Avro serialization and deserialization schema.
+ */
+public class AvroRowDataDeSerializationSchemaTest {
+
+	@Test
+	public void testSerializeDeserialize() throws IOException {
+		final Tuple2<String, RowData> testData = AvroTestUtils.getGenericTestRowData();
+
+		final RowType rowType = (RowType) AvroSchemaConverter.convertToLogicalType(testData.f0);
+		final TypeInformation<RowData> typeInfo = new RowDataTypeInfo(rowType);
+
+		final AvroRowDataSerializationSchema serializationSchema = new AvroRowDataSerializationSchema(rowType);
+		final AvroRowDataDeserializationSchema deserializationSchema =
+			new AvroRowDataDeserializationSchema(rowType, typeInfo);
+
+		final byte[] bytes = serializationSchema.serialize(testData.f1);
+		final RowData actual = deserializationSchema.deserialize(bytes);
+
+		assertEquals(testData.f1, actual);

Review comment:
       I don't suggest to equal on `RowData`, it's very tricky to performa equal on `RowData`, we can't support equal for `RawValueData` and `int` may mapping to many type values. I would suggest to assert on the Avro bytes or `Row` type.  

##########
File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataDeserializationSchema.java
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.joda.time.DateTime;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TimeZone;
+
+/**
+ * Deserialization schema from Avro bytes to {@link RowData}.
+ *
+ * <p>Deserializes the <code>byte[]</code> messages into (nested) Flink RowData. It converts Avro types
+ * into types that are compatible with Flink's Table & SQL API.
+ *
+ * <p>Projects with Avro records containing logical date/time types need to add a JodaTime
+ * dependency.
+ *
+ * <p>Note: Changes in this class need to be kept in sync with the corresponding runtime
+ * class {@link AvroRowDataSerializationSchema} and schema converter {@link AvroSchemaConverter}.
+ */
+@PublicEvolving
+public class AvroRowDataDeserializationSchema extends AbstractDeserializationSchema<RowData> {
+
+	/**
+	 * Used for time conversions into SQL types.
+	 */
+	private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
+
+	/**
+	 * Logical type describing the result type.
+	 */
+	private RowType rowType;
+
+	/**
+	 * Type information describing the result type.
+	 */
+	private TypeInformation<RowData> typeInfo;
+
+	/**
+	 * Avro serialization schema.
+	 */
+	private transient Schema schema;
+
+	/**
+	 * Record to deserialize byte array.
+	 */
+	private transient IndexedRecord record;
+
+	/**
+	 * Reader that deserializes byte array into a record.
+	 */
+	private transient DatumReader<IndexedRecord> datumReader;
+
+	/**
+	 * Input stream to read message from.
+	 */
+	private transient MutableByteArrayInputStream inputStream;
+
+	/**
+	 * Avro decoder that decodes binary data.
+	 */
+	private transient Decoder decoder;
+
+	/**
+	 * Creates a Avro deserialization schema for the given logical type.
+	 *
+	 * @param rowType The logical type used to deserialize the data.
+	 * @param typeInfo The TypeInformation to be used by {@link AvroRowDataDeserializationSchema#getProducedType()}.
+	 */
+	public AvroRowDataDeserializationSchema(
+			RowType rowType,
+			TypeInformation<RowData> typeInfo) {
+		this.rowType = rowType;
+		this.typeInfo = typeInfo;
+		this.schema = AvroSchemaConverter.convertToSchema(rowType);
+		this.record = new GenericData.Record(schema);
+		this.datumReader = new SpecificDatumReader<>(schema);
+		this.inputStream = new MutableByteArrayInputStream();
+		this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
+	}
+
+	@Override
+	public RowData deserialize(byte[] message) throws IOException {
+		try {
+			inputStream.setBuffer(message);
+			record = datumReader.read(record, decoder);
+			return convertAvroRecordToRowData(schema, rowType, record);
+		} catch (Exception e) {
+			throw new IOException("Failed to deserialize Avro record.", e);
+		}
+	}
+
+	@Override
+	public TypeInformation<RowData> getProducedType() {
+		return typeInfo;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		final AvroRowDataDeserializationSchema that = (AvroRowDataDeserializationSchema) o;
+		return Objects.equals(rowType, that.rowType) &&
+			Objects.equals(typeInfo, that.typeInfo);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(rowType, typeInfo);
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private RowData convertAvroRecordToRowData(Schema schema, RowType rowType, IndexedRecord record) {
+		final List<Schema.Field> fields = schema.getFields();
+		final int length = fields.size();
+		final GenericRowData row = new GenericRowData(length);
+		for (int i = 0; i < length; i++) {
+			final Schema.Field field = fields.get(i);
+			row.setField(i, convertAvroType(field.schema(), rowType.getTypeAt(i), record.get(i)));
+		}
+		return row;
+	}
+
+	private Object convertAvroType(Schema schema, LogicalType logicalType, Object object) {
+		if (object == null) {
+			return null;
+		}
+		switch (schema.getType()) {
+			case RECORD:
+				if (object instanceof IndexedRecord) {
+					return convertAvroRecordToRowData(schema, (RowType) logicalType, (IndexedRecord) object);
+				}
+				throw new IllegalStateException("IndexedRecord expected but was: " + object.getClass());
+			case ENUM:
+			case STRING:
+				return StringData.fromString(object.toString());
+			case ARRAY:
+					final LogicalType elementType = ((ArrayType) logicalType).getElementType();
+					return convertToObjectArray(schema.getElementType(), elementType, object);
+			case MAP:
+				final Map<?, ?> mapData = (Map<?, ?>) object;
+				final Map<StringData, Object> convertedMap = new HashMap<>();
+
+				for (Map.Entry<?, ?> entry : mapData.entrySet()) {
+					convertedMap.put(
+						StringData.fromString(entry.getKey().toString()), // key must be String.
+						convertAvroType(
+							schema.getValueType(),
+							((MapType) logicalType).getValueType(),
+							entry.getValue()));
+				}
+				return new GenericMapData(convertedMap);
+			case UNION:
+				final List<Schema> types = schema.getTypes();
+				final int size = types.size();
+				if (size == 2 && types.get(0).getType() == Schema.Type.NULL) {
+					return convertAvroType(types.get(1), logicalType, object);
+				} else if (size == 2 && types.get(1).getType() == Schema.Type.NULL) {
+					return convertAvroType(types.get(0), logicalType, object);
+				} else if (size == 1) {
+					return convertAvroType(types.get(0), logicalType, object);
+				} else {
+					throw new FlinkRuntimeException("Avro union is not supported.");
+				}
+			case FIXED:
+				final byte[] fixedBytes = ((GenericFixed) object).bytes();
+				if (LogicalTypeChecks.hasRoot(logicalType, LogicalTypeRoot.DECIMAL)) {
+					return convertToDecimal(schema, fixedBytes);
+				}
+				return fixedBytes;
+			case BYTES:
+				final ByteBuffer byteBuffer = (ByteBuffer) object;
+				final byte[] bytes = new byte[byteBuffer.remaining()];
+				byteBuffer.get(bytes);
+				if (LogicalTypeChecks.hasRoot(logicalType, LogicalTypeRoot.DECIMAL)) {
+					return convertToDecimal(schema, bytes);
+				}
+				return bytes;
+			case LONG:
+				if (LogicalTypeChecks.hasRoot(logicalType, LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)) {
+					return convertToTimestamp(object);
+				}
+				return object;
+			case INT:
+			case FLOAT:
+			case DOUBLE:
+			case BOOLEAN:
+				return object;
+		}
+		throw new RuntimeException("Unsupported Avro type:" + schema);
+	}
+
+	private DecimalData convertToDecimal(Schema schema, byte[] bytes) {
+		final LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) schema.getLogicalType();
+		return DecimalData.fromUnscaledBytes(bytes, decimalType.getPrecision(), decimalType.getScale());
+	}
+
+	private TimestampData convertToTimestamp(Object object) {
+		final long millis;
+		if (object instanceof Long) {
+			millis = (Long) object;
+		} else {
+			// use 'provided' Joda time
+			final DateTime value = (DateTime) object;
+			millis = value.toDate().getTime();
+		}
+		return TimestampData.fromEpochMillis(millis - LOCAL_TZ.getOffset(millis));
+	}
+
+	private ArrayData convertToObjectArray(Schema elementSchema, LogicalType elementType, Object object) {
+		final List<?> list = (List<?>) object;
+		final Object[] convertedArray = new Object[list.size()];
+		for (int i = 0; i < list.size(); i++) {
+			convertedArray[i] = convertAvroType(elementSchema, elementType, list.get(i));
+		}
+		return new GenericArrayData(convertedArray);
+	}
+
+	private void writeObject(ObjectOutputStream outputStream) throws IOException {
+		outputStream.writeObject(rowType);
+		outputStream.writeObject(typeInfo);
+	}
+
+	@SuppressWarnings("unchecked")
+	private void readObject(ObjectInputStream inputStream) throws ClassNotFoundException, IOException {

Review comment:
       `DeserializationSchema` already supports `open` now, so we don't need this hack to initialize member fields. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org