You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2020/05/16 11:00:29 UTC

[flink] branch master updated (85bfb00 -> 8bc7442)

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 85bfb00  [FLINK-17718][avro] Integrate avro to file system connector
     new d25af46  [hotfix][table-common] Reduce conversion classes of BINARY/VARBINARY
     new 96a02ab  [hotfix][table-common] Make the usage of structured types easier
     new c2e27f0  [hotfix][table-common] Fix structured type field order
     new b02e7c0  [hotfix][table-common] Fix conversion class of element array data types
     new 8bc7442  [FLINK-16999][table-runtime-blink] Add converters for all data types and conversion classes

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/table/catalog/DataTypeFactoryImpl.java   |   1 +
 .../org/apache/flink/table/data/ArrayData.java     |   3 +
 .../java/org/apache/flink/table/data/RowData.java  |   3 +
 .../flink/table/types/CollectionDataType.java      |  13 +-
 .../table/types/extraction/ExtractionUtils.java    |  34 +
 .../flink/table/types/logical/BinaryType.java      |  10 +-
 .../flink/table/types/logical/DistinctType.java    |   8 +-
 .../flink/table/types/logical/StructuredType.java  |  28 +-
 .../flink/table/types/logical/UserDefinedType.java |   2 +-
 .../flink/table/types/logical/VarBinaryType.java   |  10 +-
 .../types/logical/utils/LogicalTypeDuplicator.java |   4 +-
 .../apache/flink/table/types/DataTypesTest.java    |  25 +-
 .../apache/flink/table/types/LogicalTypesTest.java |  17 +-
 flink-table/flink-table-runtime-blink/pom.xml      |   8 +
 .../conversion/ArrayBooleanArrayConverter.java     |  43 ++
 .../data/conversion/ArrayByteArrayConverter.java   |  43 ++
 .../data/conversion/ArrayDoubleArrayConverter.java |  43 ++
 .../data/conversion/ArrayFloatArrayConverter.java  |  43 ++
 .../data/conversion/ArrayIntArrayConverter.java    |  43 ++
 .../data/conversion/ArrayLongArrayConverter.java   |  43 ++
 .../data/conversion/ArrayObjectArrayConverter.java | 206 ++++++
 .../data/conversion/ArrayShortArrayConverter.java  |  43 ++
 .../data/conversion/DataStructureConverter.java    |  80 +++
 .../data/conversion/DataStructureConverters.java   | 224 +++++++
 .../table/data/conversion/DateDateConverter.java   |  42 ++
 .../data/conversion/DateLocalDateConverter.java    |  42 ++
 .../DayTimeIntervalDurationConverter.java          |  43 ++
 .../conversion/DecimalBigDecimalConverter.java     |  63 ++
 .../table/data/conversion/IdentityConverter.java   |  40 ++
 .../LocalZonedTimestampInstantConverter.java       |  42 ++
 .../LocalZonedTimestampIntConverter.java           |  42 ++
 .../LocalZonedTimestampLongConverter.java          |  42 ++
 .../table/data/conversion/MapMapConverter.java     | 125 ++++
 .../data/conversion/RawByteArrayConverter.java     |  59 ++
 .../table/data/conversion/RawObjectConverter.java  |  59 ++
 .../table/data/conversion/RowRowConverter.java     |  95 +++
 .../data/conversion/StringByteArrayConverter.java  |  43 ++
 .../data/conversion/StringStringConverter.java     |  43 ++
 .../data/conversion/StructuredObjectConverter.java | 276 ++++++++
 .../data/conversion/TimeLocalTimeConverter.java    |  42 ++
 .../table/data/conversion/TimeLongConverter.java   |  41 ++
 .../table/data/conversion/TimeTimeConverter.java   |  42 ++
 .../TimestampLocalDateTimeConverter.java           |  42 ++
 .../conversion/TimestampTimestampConverter.java    |  42 ++
 .../YearMonthIntervalPeriodConverter.java          |  78 +++
 .../flink/table/data/writer/BinaryWriter.java      |   1 +
 .../table/runtime/types/InternalSerializers.java   |  50 +-
 .../table/data/DataStructureConvertersTest.java    | 730 +++++++++++++++++++++
 48 files changed, 3034 insertions(+), 67 deletions(-)
 create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayBooleanArrayConverter.java
 create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayByteArrayConverter.java
 create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayDoubleArrayConverter.java
 create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayFloatArrayConverter.java
 create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayIntArrayConverter.java
 create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayLongArrayConverter.java
 create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java
 create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayShortArrayConverter.java
 create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverter.java
 create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverters.java
 create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DateDateConverter.java
 create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DateLocalDateConverter.java
 create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DayTimeIntervalDurationConverter.java
 create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DecimalBigDecimalConverter.java
 create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/IdentityConverter.java
 create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampInstantConverter.java
 create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampIntConverter.java
 create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampLongConverter.java
 create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/MapMapConverter.java
 create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RawByteArrayConverter.java
 create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RawObjectConverter.java
 create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RowRowConverter.java
 create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/StringByteArrayConverter.java
 create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/StringStringConverter.java
 create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/StructuredObjectConverter.java
 create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimeLocalTimeConverter.java
 create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimeLongConverter.java
 create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimeTimeConverter.java
 create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimestampLocalDateTimeConverter.java
 create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimestampTimestampConverter.java
 create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/YearMonthIntervalPeriodConverter.java
 create mode 100644 flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java


[flink] 05/05: [FLINK-16999][table-runtime-blink] Add converters for all data types and conversion classes

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8bc744285531bd0b650c799be0a591c3492124f2
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed May 13 18:59:11 2020 +0200

    [FLINK-16999][table-runtime-blink] Add converters for all data types and conversion classes
    
    This adds converters for all data types and all conversion classes. It refactors the
    DataFormatConverter for cleaner readability and adds a lot of tests.
    
    Among other conversions, it adds complete support for structured types
    (incl. immutable POJOs).
    
    The only not 100% unsupported types are TimeType and DayTimeIntervalType.
    
    This closes #12135.
---
 .../flink/table/catalog/DataTypeFactoryImpl.java   |   1 +
 .../org/apache/flink/table/data/ArrayData.java     |   3 +
 .../java/org/apache/flink/table/data/RowData.java  |   3 +
 .../table/types/extraction/ExtractionUtils.java    |  34 +
 flink-table/flink-table-runtime-blink/pom.xml      |   8 +
 .../conversion/ArrayBooleanArrayConverter.java     |  43 ++
 .../data/conversion/ArrayByteArrayConverter.java   |  43 ++
 .../data/conversion/ArrayDoubleArrayConverter.java |  43 ++
 .../data/conversion/ArrayFloatArrayConverter.java  |  43 ++
 .../data/conversion/ArrayIntArrayConverter.java    |  43 ++
 .../data/conversion/ArrayLongArrayConverter.java   |  43 ++
 .../data/conversion/ArrayObjectArrayConverter.java | 206 ++++++
 .../data/conversion/ArrayShortArrayConverter.java  |  43 ++
 .../data/conversion/DataStructureConverter.java    |  80 +++
 .../data/conversion/DataStructureConverters.java   | 224 +++++++
 .../table/data/conversion/DateDateConverter.java   |  42 ++
 .../data/conversion/DateLocalDateConverter.java    |  42 ++
 .../DayTimeIntervalDurationConverter.java          |  43 ++
 .../conversion/DecimalBigDecimalConverter.java     |  63 ++
 .../table/data/conversion/IdentityConverter.java   |  40 ++
 .../LocalZonedTimestampInstantConverter.java       |  42 ++
 .../LocalZonedTimestampIntConverter.java           |  42 ++
 .../LocalZonedTimestampLongConverter.java          |  42 ++
 .../table/data/conversion/MapMapConverter.java     | 125 ++++
 .../data/conversion/RawByteArrayConverter.java     |  59 ++
 .../table/data/conversion/RawObjectConverter.java  |  59 ++
 .../table/data/conversion/RowRowConverter.java     |  95 +++
 .../data/conversion/StringByteArrayConverter.java  |  43 ++
 .../data/conversion/StringStringConverter.java     |  43 ++
 .../data/conversion/StructuredObjectConverter.java | 276 ++++++++
 .../data/conversion/TimeLocalTimeConverter.java    |  42 ++
 .../table/data/conversion/TimeLongConverter.java   |  41 ++
 .../table/data/conversion/TimeTimeConverter.java   |  42 ++
 .../TimestampLocalDateTimeConverter.java           |  42 ++
 .../conversion/TimestampTimestampConverter.java    |  42 ++
 .../YearMonthIntervalPeriodConverter.java          |  78 +++
 .../flink/table/data/writer/BinaryWriter.java      |   1 +
 .../table/runtime/types/InternalSerializers.java   |  50 +-
 .../table/data/DataStructureConvertersTest.java    | 730 +++++++++++++++++++++
 39 files changed, 2961 insertions(+), 23 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DataTypeFactoryImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DataTypeFactoryImpl.java
index 9b0e76e..6ceb3f6 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DataTypeFactoryImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DataTypeFactoryImpl.java
@@ -151,6 +151,7 @@ final class DataTypeFactoryImpl implements DataTypeFactory {
 
 	private LogicalType resolveType(UnresolvedIdentifier identifier) {
 		assert identifier != null;
+		// TODO validate implementation class of structured types when converting from LogicalType to DataType
 		throw new TableException("User-defined types are not supported yet.");
 	}
 }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/ArrayData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/ArrayData.java
index 59c6b2f..5132458 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/ArrayData.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/ArrayData.java
@@ -217,6 +217,9 @@ public interface ArrayData {
 				return array.getMap(pos);
 			case ROW:
 				return array.getRow(pos, ((RowType) elementType).getFieldCount());
+			case STRUCTURED_TYPE:
+				// not the most efficient code but ok for a deprecated method
+				return array.getRow(pos, getFieldCount(elementType));
 			case BINARY:
 			case VARBINARY:
 				return array.getBinary(pos);
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RowData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RowData.java
index e4f2c5a..e8c5afb 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RowData.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RowData.java
@@ -281,6 +281,9 @@ public interface RowData {
 				return row.getMap(pos);
 			case ROW:
 				return row.getRow(pos, ((RowType) fieldType).getFieldCount());
+			case STRUCTURED_TYPE:
+				// not the most efficient code but ok for a deprecated method
+				return row.getRow(pos, getFieldCount(fieldType));
 			case BINARY:
 			case VARBINARY:
 				return row.getBinary(pos);
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java
index b3ed7a3..56e5266 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java
@@ -146,6 +146,25 @@ public final class ExtractionUtils {
 	}
 
 	/**
+	 * Returns the field of a structured type. The logic is as broad as possible to support
+	 * both Java and Scala in different flavors.
+	 */
+	public static Field getStructuredField(Class<?> clazz, String fieldName) {
+		final String normalizedFieldName = fieldName.toUpperCase();
+
+		final List<Field> fields = collectStructuredFields(clazz);
+		for (Field field : fields) {
+			if (field.getName().toUpperCase().equals(normalizedFieldName)) {
+				return field;
+			}
+		}
+		throw extractionError(
+			"Could not to find a field named '%s' in class '%s' for structured type.",
+			fieldName,
+			clazz.getName());
+	}
+
+	/**
 	 * Checks for a field getter of a structured type. The logic is as broad as possible to support
 	 * both Java and Scala in different flavors.
 	 */
@@ -260,6 +279,21 @@ public final class ExtractionUtils {
 		return Modifier.isPublic(m);
 	}
 
+	/**
+	 * Checks whether a field is directly writable without a setter or constructor.
+	 */
+	public static boolean isStructuredFieldDirectlyWritable(Field field) {
+		final int m = field.getModifiers();
+
+		// field is immutable
+		if (Modifier.isFinal(m)) {
+			return false;
+		}
+
+		// field is directly writable
+		return Modifier.isPublic(m);
+	}
+
 	// --------------------------------------------------------------------------------------------
 	// Methods intended for this package
 	// --------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-runtime-blink/pom.xml b/flink-table/flink-table-runtime-blink/pom.xml
index 62115b9..53d745c 100644
--- a/flink-table/flink-table-runtime-blink/pom.xml
+++ b/flink-table/flink-table-runtime-blink/pom.xml
@@ -153,6 +153,14 @@ under the License.
 			<type>test-jar</type>
 			<scope>test</scope>
 		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-common</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
 	</dependencies>
 
 	<build>
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayBooleanArrayConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayBooleanArrayConverter.java
new file mode 100644
index 0000000..3939413
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayBooleanArrayConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.types.logical.ArrayType;
+
+/**
+ * Converter for {@link ArrayType} of {@code boolean[]} external type.
+ */
+@Internal
+class ArrayBooleanArrayConverter implements DataStructureConverter<ArrayData, boolean[]> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public ArrayData toInternal(boolean[] external) {
+		return new GenericArrayData(external);
+	}
+
+	@Override
+	public boolean[] toExternal(ArrayData internal) {
+		return internal.toBooleanArray();
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayByteArrayConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayByteArrayConverter.java
new file mode 100644
index 0000000..6c8b665
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayByteArrayConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.types.logical.ArrayType;
+
+/**
+ * Converter for {@link ArrayType} of {@code byte[]} external type.
+ */
+@Internal
+class ArrayByteArrayConverter implements DataStructureConverter<ArrayData, byte[]> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public ArrayData toInternal(byte[] external) {
+		return new GenericArrayData(external);
+	}
+
+	@Override
+	public byte[] toExternal(ArrayData internal) {
+		return internal.toByteArray();
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayDoubleArrayConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayDoubleArrayConverter.java
new file mode 100644
index 0000000..b442ff9
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayDoubleArrayConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.types.logical.ArrayType;
+
+/**
+ * Converter for {@link ArrayType} of {@code double[]} external type.
+ */
+@Internal
+class ArrayDoubleArrayConverter implements DataStructureConverter<ArrayData, double[]> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public ArrayData toInternal(double[] external) {
+		return new GenericArrayData(external);
+	}
+
+	@Override
+	public double[] toExternal(ArrayData internal) {
+		return internal.toDoubleArray();
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayFloatArrayConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayFloatArrayConverter.java
new file mode 100644
index 0000000..3b8bf15
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayFloatArrayConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.types.logical.ArrayType;
+
+/**
+ * Converter for {@link ArrayType} of {@code float[]} external type.
+ */
+@Internal
+class ArrayFloatArrayConverter implements DataStructureConverter<ArrayData, float[]> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public ArrayData toInternal(float[] external) {
+		return new GenericArrayData(external);
+	}
+
+	@Override
+	public float[] toExternal(ArrayData internal) {
+		return internal.toFloatArray();
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayIntArrayConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayIntArrayConverter.java
new file mode 100644
index 0000000..fe8880a
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayIntArrayConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.types.logical.ArrayType;
+
+/**
+ * Converter for {@link ArrayType} of {@code int[]} external type.
+ */
+@Internal
+class ArrayIntArrayConverter implements DataStructureConverter<ArrayData, int[]> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public ArrayData toInternal(int[] external) {
+		return new GenericArrayData(external);
+	}
+
+	@Override
+	public int[] toExternal(ArrayData internal) {
+		return internal.toIntArray();
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayLongArrayConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayLongArrayConverter.java
new file mode 100644
index 0000000..963d146
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayLongArrayConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.types.logical.ArrayType;
+
+/**
+ * Converter for {@link ArrayType} of {@code long[]} external type.
+ */
+@Internal
+class ArrayLongArrayConverter implements DataStructureConverter<ArrayData, long[]> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public ArrayData toInternal(long[] external) {
+		return new GenericArrayData(external);
+	}
+
+	@Override
+	public long[] toExternal(ArrayData internal) {
+		return internal.toLongArray();
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java
new file mode 100644
index 0000000..761d758
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.binary.BinaryArrayData;
+import org.apache.flink.table.data.writer.BinaryArrayWriter;
+import org.apache.flink.table.data.writer.BinaryWriter;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.Serializable;
+import java.lang.reflect.Array;
+
+/**
+ * Converter for {@link ArrayType} of nested primitive or object arrays external types.
+ */
+@Internal
+@SuppressWarnings("unchecked")
+class ArrayObjectArrayConverter<E> implements DataStructureConverter<ArrayData, E[]> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final Class<E> elementClass;
+
+	private final int elementSize;
+
+	private final BinaryArrayWriter.NullSetter  writerNullSetter;
+
+	private final BinaryWriter.ValueSetter writerValueSetter;
+
+	private final GenericToJavaArrayConverter<E> genericToJavaArrayConverter;
+
+	private transient BinaryArrayData reuseArray;
+
+	private transient BinaryArrayWriter reuseWriter;
+
+	final boolean hasInternalElements;
+
+	final ArrayData.ElementGetter elementGetter;
+
+	final DataStructureConverter<Object, E> elementConverter;
+
+	private ArrayObjectArrayConverter(
+			Class<E> elementClass,
+			int elementSize,
+			BinaryArrayWriter.NullSetter writerNullSetter,
+			BinaryWriter.ValueSetter writerValueSetter,
+			GenericToJavaArrayConverter<E> genericToJavaArrayConverter,
+			ArrayData.ElementGetter elementGetter,
+			DataStructureConverter<Object, E> elementConverter) {
+		this.elementClass = elementClass;
+		this.elementSize = elementSize;
+		this.writerNullSetter = writerNullSetter;
+		this.writerValueSetter = writerValueSetter;
+		this.genericToJavaArrayConverter = genericToJavaArrayConverter;
+		this.hasInternalElements = elementConverter instanceof IdentityConverter;
+		this.elementGetter = elementGetter;
+		this.elementConverter = elementConverter;
+	}
+
+	@Override
+	public void open(ClassLoader classLoader) {
+		reuseArray = new BinaryArrayData();
+		reuseWriter = new BinaryArrayWriter(reuseArray, 0, elementSize);
+		elementConverter.open(classLoader);
+	}
+
+	@Override
+	public ArrayData toInternal(E[] external) {
+		return hasInternalElements ? new GenericArrayData(external) : toBinaryArrayData(external);
+	}
+
+	@Override
+	public E[] toExternal(ArrayData internal) {
+		if (hasInternalElements && internal instanceof GenericArrayData) {
+			final GenericArrayData genericArray = (GenericArrayData) internal;
+			if (genericArray.isPrimitiveArray()) {
+				return genericToJavaArrayConverter.convert((GenericArrayData) internal);
+			}
+			return (E[]) genericArray.toObjectArray();
+		}
+		return toJavaArray(internal);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Runtime helper methods
+	// --------------------------------------------------------------------------------------------
+
+	private ArrayData toBinaryArrayData(E[] external) {
+		final int length = external.length;
+		allocateWriter(length);
+		for (int pos = 0; pos < length; pos++) {
+			writeElement(pos, external[pos]);
+		}
+		return completeWriter();
+	}
+
+	private E[] toJavaArray(ArrayData internal) {
+		final int size = internal.size();
+		final E[] values = (E[]) Array.newInstance(elementClass, size);
+		for (int pos = 0; pos < size; pos++) {
+			final Object value = elementGetter.getElementOrNull(internal, pos);
+			values[pos] = elementConverter.toExternalOrNull(value);
+		}
+		return values;
+	}
+
+	interface GenericToJavaArrayConverter<E> extends Serializable {
+		E[] convert(GenericArrayData internal);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Shared code
+	// --------------------------------------------------------------------------------------------
+
+	void allocateWriter(int length) {
+		if (reuseWriter.getNumElements() != length) {
+			reuseWriter = new BinaryArrayWriter(reuseArray, length, elementSize);
+		} else {
+			reuseWriter.reset();
+		}
+	}
+
+	void writeElement(int pos, E element) {
+		if (element == null) {
+			writerNullSetter.setNull(reuseWriter, pos);
+		} else {
+			writerValueSetter.setValue(reuseWriter, pos, elementConverter.toInternal(element));
+		}
+	}
+
+	BinaryArrayData completeWriter() {
+		reuseWriter.complete();
+		return reuseArray;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Factory method
+	// --------------------------------------------------------------------------------------------
+
+	public static ArrayObjectArrayConverter<?> create(DataType dataType) {
+		return createForElement(dataType.getChildren().get(0));
+	}
+
+	public static <E> ArrayObjectArrayConverter<E> createForElement(DataType elementDataType) {
+		final LogicalType elementType = elementDataType.getLogicalType();
+		return new ArrayObjectArrayConverter<>(
+			(Class<E>) elementDataType.getConversionClass(),
+			BinaryArrayData.calculateFixLengthPartSize(elementType),
+			BinaryArrayWriter.createNullSetter(elementType),
+			BinaryWriter.createValueSetter(elementType),
+			createGenericToJavaArrayConverter(elementType),
+			ArrayData.createElementGetter(elementType),
+			(DataStructureConverter<Object, E>) DataStructureConverters.getConverter(elementDataType)
+		);
+	}
+
+	@SuppressWarnings("unchecked")
+	private static <E> GenericToJavaArrayConverter<E> createGenericToJavaArrayConverter(LogicalType elementType) {
+		switch (elementType.getTypeRoot()) {
+			case BOOLEAN:
+				return internal -> (E[]) ArrayUtils.toObject(internal.toBooleanArray());
+			case TINYINT:
+				return internal -> (E[]) ArrayUtils.toObject(internal.toByteArray());
+			case SMALLINT:
+				return internal -> (E[]) ArrayUtils.toObject(internal.toShortArray());
+			case INTEGER:
+				return internal -> (E[]) ArrayUtils.toObject(internal.toIntArray());
+			case BIGINT:
+				return internal -> (E[]) ArrayUtils.toObject(internal.toLongArray());
+			case FLOAT:
+				return internal -> (E[]) ArrayUtils.toObject(internal.toFloatArray());
+			case DOUBLE:
+				return internal -> (E[]) ArrayUtils.toObject(internal.toDoubleArray());
+			case DISTINCT_TYPE:
+				return createGenericToJavaArrayConverter(((DistinctType) elementType).getSourceType());
+			default:
+				return internal -> {
+					throw new IllegalStateException();
+				};
+		}
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayShortArrayConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayShortArrayConverter.java
new file mode 100644
index 0000000..3b48ea4
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayShortArrayConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.types.logical.ArrayType;
+
+/**
+ * Converter for {@link ArrayType} of {@code short[]} external type.
+ */
+@Internal
+class ArrayShortArrayConverter implements DataStructureConverter<ArrayData, short[]> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public ArrayData toInternal(short[] external) {
+		return new GenericArrayData(external);
+	}
+
+	@Override
+	public short[] toExternal(ArrayData internal) {
+		return internal.toShortArray();
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverter.java
new file mode 100644
index 0000000..d2df1e5
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverter.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+import java.io.Serializable;
+
+/**
+ * Converter between internal and external data structure.
+ *
+ * <p>Converters are serializable and can be passed to runtime operators.
+ *
+ * @param <I> internal data structure (see {@link RowData})
+ * @param <E> external data structure (see {@link DataType#getConversionClass()})
+ */
+@Internal
+public interface DataStructureConverter<I, E> extends Serializable {
+
+	default void open(ClassLoader classLoader) {
+		assert classLoader != null;
+		// nothing to do
+	}
+
+	/**
+	 * Converts to internal data structure.
+	 *
+	 * <p>Note: Parameter must not be null. Output must not be null.
+	 */
+	I toInternal(E external);
+
+	/**
+	 * Converts to internal data structure or {@code null}.
+	 *
+	 * <p>The nullability could be derived from the data type. However, this method reduces null checks.
+	 */
+	default I toInternalOrNull(E external) {
+		if (external == null) {
+			return null;
+		}
+		return toInternal(external);
+	}
+
+	/**
+	 * Converts to external data structure.
+	 *
+	 * <p>Note: Parameter must not be null. Output must not be null.
+	 */
+	E toExternal(I internal);
+
+	/**
+	 * Converts to external data structure or {@code null}.
+	 *
+	 * <p>The nullability could be derived from the data type. However, this method reduces null checks.
+	 */
+	default E toExternalOrNull(I internal) {
+		if (internal == null) {
+			return null;
+		}
+		return toExternal(internal);
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverters.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverters.java
new file mode 100644
index 0000000..1d07d24
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverters.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.types.Row;
+
+import java.math.BigDecimal;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+/**
+ * Registry of available data structure converters.
+ *
+ * <p>Data structure converters are used at the edges for the API for converting between internal
+ * structures (see {@link RowData}) and external structures (see {@link DataType#getConversionClass()}).
+ *
+ * <p>This is useful for UDFs, sources, sinks, or exposing data in the API (e.g. via a {@code collect()}).
+ *
+ * <p>Note: It is NOT the responsibility of a converter to normalize the data. Thus, a converter does
+ * neither change the precision of a timestamp nor prune/expand strings to their defined length. This
+ * might be the responsibility of data classes that are called transitively.
+ */
+@Internal
+public final class DataStructureConverters {
+
+	private static final Map<ConverterIdentifier<?>, DataStructureConverterFactory> converters = new HashMap<>();
+	static {
+		// ordered by type root and conversion class definition
+		putConverter(LogicalTypeRoot.CHAR, String.class, constructor(StringStringConverter::new));
+		putConverter(LogicalTypeRoot.CHAR, byte[].class, constructor(StringByteArrayConverter::new));
+		putConverter(LogicalTypeRoot.CHAR, StringData.class, identity());
+		putConverter(LogicalTypeRoot.VARCHAR, String.class, constructor(StringStringConverter::new));
+		putConverter(LogicalTypeRoot.VARCHAR, byte[].class, constructor(StringByteArrayConverter::new));
+		putConverter(LogicalTypeRoot.VARCHAR, StringData.class, identity());
+		putConverter(LogicalTypeRoot.BOOLEAN, Boolean.class, identity());
+		putConverter(LogicalTypeRoot.BOOLEAN, boolean.class, identity());
+		putConverter(LogicalTypeRoot.BINARY, byte[].class, identity());
+		putConverter(LogicalTypeRoot.VARBINARY, byte[].class, identity());
+		putConverter(LogicalTypeRoot.DECIMAL, BigDecimal.class, DecimalBigDecimalConverter::create);
+		putConverter(LogicalTypeRoot.DECIMAL, DecimalData.class, identity());
+		putConverter(LogicalTypeRoot.TINYINT, Byte.class, identity());
+		putConverter(LogicalTypeRoot.TINYINT, byte.class, identity());
+		putConverter(LogicalTypeRoot.SMALLINT, Short.class, identity());
+		putConverter(LogicalTypeRoot.SMALLINT, short.class, identity());
+		putConverter(LogicalTypeRoot.INTEGER, Integer.class, identity());
+		putConverter(LogicalTypeRoot.INTEGER, int.class, identity());
+		putConverter(LogicalTypeRoot.BIGINT, Long.class, identity());
+		putConverter(LogicalTypeRoot.BIGINT, long.class, identity());
+		putConverter(LogicalTypeRoot.FLOAT, Float.class, identity());
+		putConverter(LogicalTypeRoot.FLOAT, float.class, identity());
+		putConverter(LogicalTypeRoot.DOUBLE, Double.class, identity());
+		putConverter(LogicalTypeRoot.DOUBLE, double.class, identity());
+		putConverter(LogicalTypeRoot.DATE, java.sql.Date.class, constructor(DateDateConverter::new));
+		putConverter(LogicalTypeRoot.DATE, java.time.LocalDate.class, constructor(DateLocalDateConverter::new));
+		putConverter(LogicalTypeRoot.DATE, Integer.class, identity());
+		putConverter(LogicalTypeRoot.DATE, int.class, identity());
+		putConverter(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE, java.sql.Time.class, constructor(TimeTimeConverter::new));
+		putConverter(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE, java.time.LocalTime.class, constructor(TimeLocalTimeConverter::new));
+		putConverter(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE, Integer.class, identity());
+		putConverter(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE, int.class, identity());
+		putConverter(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE, Long.class, constructor(TimeLongConverter::new));
+		putConverter(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE, long.class, constructor(TimeLongConverter::new));
+		putConverter(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, java.sql.Timestamp.class, constructor(TimestampTimestampConverter::new));
+		putConverter(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, java.time.LocalDateTime.class, constructor(TimestampLocalDateTimeConverter::new));
+		putConverter(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, TimestampData.class, identity());
+		putConverter(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE, java.time.ZonedDateTime.class, unsupported());
+		putConverter(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE, java.time.OffsetDateTime.class, unsupported());
+		putConverter(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, java.time.Instant.class, constructor(LocalZonedTimestampInstantConverter::new));
+		putConverter(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, Integer.class, constructor(LocalZonedTimestampIntConverter::new));
+		putConverter(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, int.class, constructor(LocalZonedTimestampIntConverter::new));
+		putConverter(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, Long.class, constructor(LocalZonedTimestampLongConverter::new));
+		putConverter(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, long.class, constructor(LocalZonedTimestampLongConverter::new));
+		putConverter(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, TimestampData.class, identity());
+		putConverter(LogicalTypeRoot.INTERVAL_YEAR_MONTH, java.time.Period.class, YearMonthIntervalPeriodConverter::create);
+		putConverter(LogicalTypeRoot.INTERVAL_YEAR_MONTH, Integer.class, identity());
+		putConverter(LogicalTypeRoot.INTERVAL_YEAR_MONTH, int.class, identity());
+		putConverter(LogicalTypeRoot.INTERVAL_DAY_TIME, java.time.Duration.class, constructor(DayTimeIntervalDurationConverter::new));
+		putConverter(LogicalTypeRoot.INTERVAL_DAY_TIME, Long.class, identity());
+		putConverter(LogicalTypeRoot.INTERVAL_DAY_TIME, long.class, identity());
+		putConverter(LogicalTypeRoot.ARRAY, ArrayData.class, identity());
+		putConverter(LogicalTypeRoot.ARRAY, boolean[].class, constructor(ArrayBooleanArrayConverter::new));
+		putConverter(LogicalTypeRoot.ARRAY, byte[].class, constructor(ArrayByteArrayConverter::new));
+		putConverter(LogicalTypeRoot.ARRAY, short[].class, constructor(ArrayShortArrayConverter::new));
+		putConverter(LogicalTypeRoot.ARRAY, int[].class, constructor(ArrayIntArrayConverter::new));
+		putConverter(LogicalTypeRoot.ARRAY, long[].class, constructor(ArrayLongArrayConverter::new));
+		putConverter(LogicalTypeRoot.ARRAY, float[].class, constructor(ArrayFloatArrayConverter::new));
+		putConverter(LogicalTypeRoot.ARRAY, double[].class, constructor(ArrayDoubleArrayConverter::new));
+		putConverter(LogicalTypeRoot.MAP, Map.class, MapMapConverter::createForMapType);
+		putConverter(LogicalTypeRoot.MAP, MapData.class, identity());
+		putConverter(LogicalTypeRoot.MULTISET, Map.class, MapMapConverter::createForMultisetType);
+		putConverter(LogicalTypeRoot.MULTISET, MapData.class, identity());
+		putConverter(LogicalTypeRoot.ROW, Row.class, RowRowConverter::create);
+		putConverter(LogicalTypeRoot.ROW, RowData.class, identity());
+		putConverter(LogicalTypeRoot.STRUCTURED_TYPE, Row.class, RowRowConverter::create);
+		putConverter(LogicalTypeRoot.STRUCTURED_TYPE, RowData.class, identity());
+		putConverter(LogicalTypeRoot.RAW, byte[].class, RawByteArrayConverter::create);
+		putConverter(LogicalTypeRoot.RAW, RawValueData.class, identity());
+	}
+
+	/**
+	 * Returns a converter for the given {@link DataType}.
+	 */
+	@SuppressWarnings("unchecked")
+	public static DataStructureConverter<Object, Object> getConverter(DataType dataType) {
+		// cast to Object for ease of use
+		return (DataStructureConverter<Object, Object>) getConverterInternal(dataType);
+	}
+
+	private static DataStructureConverter<?, ?> getConverterInternal(DataType dataType) {
+		final LogicalType logicalType = dataType.getLogicalType();
+		final DataStructureConverterFactory factory = converters.get(
+			new ConverterIdentifier<>(
+				logicalType.getTypeRoot(),
+				dataType.getConversionClass()));
+		if (factory != null) {
+			return factory.createConverter(dataType);
+		}
+		// special cases
+		switch (logicalType.getTypeRoot()) {
+			case ARRAY:
+				return ArrayObjectArrayConverter.create(dataType);
+			case DISTINCT_TYPE:
+				return getConverterInternal(dataType.getChildren().get(0));
+			case STRUCTURED_TYPE:
+				return StructuredObjectConverter.create(dataType);
+			case RAW:
+				return RawObjectConverter.create(dataType);
+			default:
+				throw new TableException("Could not find converter for data type: " + dataType);
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Helper methods
+	// --------------------------------------------------------------------------------------------
+
+	private static <E> void putConverter(
+			LogicalTypeRoot root,
+			Class<E> conversionClass,
+			DataStructureConverterFactory factory) {
+		converters.put(new ConverterIdentifier<>(root, conversionClass), factory);
+	}
+
+	private static DataStructureConverterFactory identity() {
+		return constructor(IdentityConverter::new);
+	}
+
+	private static DataStructureConverterFactory constructor(Supplier<DataStructureConverter<?, ?>> supplier) {
+		return dataType -> supplier.get();
+	}
+
+	private static DataStructureConverterFactory unsupported() {
+		return dataType -> {
+			throw new TableException("Unsupported data type: " + dataType);
+		};
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Helper classes
+	// --------------------------------------------------------------------------------------------
+
+	private static class ConverterIdentifier<E> {
+
+		final LogicalTypeRoot root;
+
+		final Class<E> conversionClass;
+
+		ConverterIdentifier(LogicalTypeRoot root, Class<E> conversionClass) {
+			this.root = root;
+			this.conversionClass = conversionClass;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			ConverterIdentifier<?> that = (ConverterIdentifier<?>) o;
+			return root == that.root && conversionClass.equals(that.conversionClass);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(root, conversionClass);
+		}
+	}
+
+	private interface DataStructureConverterFactory {
+		DataStructureConverter<?, ?> createConverter(DataType dt);
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DateDateConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DateDateConverter.java
new file mode 100644
index 0000000..e980891
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DateDateConverter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.runtime.functions.SqlDateTimeUtils;
+import org.apache.flink.table.types.logical.DateType;
+
+/**
+ * Converter for {@link DateType} of {@link java.sql.Date} external type.
+ */
+@Internal
+class DateDateConverter implements DataStructureConverter<Integer, java.sql.Date> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public Integer toInternal(java.sql.Date external) {
+		return SqlDateTimeUtils.dateToInternal(external);
+	}
+
+	@Override
+	public java.sql.Date toExternal(Integer internal) {
+		return SqlDateTimeUtils.internalToDate(internal);
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DateLocalDateConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DateLocalDateConverter.java
new file mode 100644
index 0000000..6df885d
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DateLocalDateConverter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.runtime.functions.SqlDateTimeUtils;
+import org.apache.flink.table.types.logical.DateType;
+
+/**
+ * Converter for {@link DateType} of {@link java.time.LocalDate} external type.
+ */
+@Internal
+class DateLocalDateConverter implements DataStructureConverter<Integer, java.time.LocalDate> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public Integer toInternal(java.time.LocalDate external) {
+		return SqlDateTimeUtils.localDateToUnixDate(external);
+	}
+
+	@Override
+	public java.time.LocalDate toExternal(Integer internal) {
+		return SqlDateTimeUtils.unixDateToLocalDate(internal);
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DayTimeIntervalDurationConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DayTimeIntervalDurationConverter.java
new file mode 100644
index 0000000..1ea01c5
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DayTimeIntervalDurationConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+
+import java.time.Duration;
+
+/**
+ * Converter for {@link DayTimeIntervalType} of {@link java.time.Duration} external type.
+ */
+@Internal
+class DayTimeIntervalDurationConverter implements DataStructureConverter<Long, java.time.Duration> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public Long toInternal(java.time.Duration external) {
+		return external.toMillis();
+	}
+
+	@Override
+	public java.time.Duration toExternal(Long internal) {
+		return Duration.ofMillis(internal);
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DecimalBigDecimalConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DecimalBigDecimalConverter.java
new file mode 100644
index 0000000..850b6a0
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DecimalBigDecimalConverter.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DecimalType;
+
+import java.math.BigDecimal;
+
+/**
+ * Converter for {@link DecimalType} of {@link BigDecimal} external type.
+ */
+@Internal
+class DecimalBigDecimalConverter implements DataStructureConverter<DecimalData, BigDecimal> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final int precision;
+
+	private final int scale;
+
+	private DecimalBigDecimalConverter(int precision, int scale) {
+		this.precision = precision;
+		this.scale = scale;
+	}
+
+	@Override
+	public DecimalData toInternal(BigDecimal external) {
+		return DecimalData.fromBigDecimal(external, precision, scale);
+	}
+
+	@Override
+	public BigDecimal toExternal(DecimalData internal) {
+		return internal.toBigDecimal();
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Factory method
+	// --------------------------------------------------------------------------------------------
+
+	static DecimalBigDecimalConverter create(DataType dataType) {
+		final DecimalType decimalType = (DecimalType) dataType.getLogicalType();
+		return new DecimalBigDecimalConverter(decimalType.getPrecision(), decimalType.getScale());
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/IdentityConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/IdentityConverter.java
new file mode 100644
index 0000000..8d9c874
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/IdentityConverter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * No-op converter that just forwards its input.
+ */
+@Internal
+class IdentityConverter<I> implements DataStructureConverter<I, I> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public I toInternal(I external) {
+		return external;
+	}
+
+	@Override
+	public I toExternal(I internal) {
+		return internal;
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampInstantConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampInstantConverter.java
new file mode 100644
index 0000000..fffefbe
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampInstantConverter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+
+/**
+ * Converter for {@link LocalZonedTimestampType} of {@link java.time.Instant} external type.
+ */
+@Internal
+class LocalZonedTimestampInstantConverter implements DataStructureConverter<TimestampData, java.time.Instant> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public TimestampData toInternal(java.time.Instant external) {
+		return TimestampData.fromInstant(external);
+	}
+
+	@Override
+	public java.time.Instant toExternal(TimestampData internal) {
+		return internal.toInstant();
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampIntConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampIntConverter.java
new file mode 100644
index 0000000..349d34e
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampIntConverter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+
+/**
+ * Converter for {@link LocalZonedTimestampType} of {@link Integer} external type.
+ */
+@Internal
+class LocalZonedTimestampIntConverter implements DataStructureConverter<TimestampData, Integer> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public TimestampData toInternal(Integer external) {
+		return TimestampData.fromEpochMillis(((long) external) * 1000);
+	}
+
+	@Override
+	public Integer toExternal(TimestampData internal) {
+		return ((int) internal.getMillisecond() / 1000);
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampLongConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampLongConverter.java
new file mode 100644
index 0000000..6ddb7eb
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampLongConverter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+
+/**
+ * Converter for {@link LocalZonedTimestampType} of {@link Long} external type.
+ */
+@Internal
+class LocalZonedTimestampLongConverter implements DataStructureConverter<TimestampData, Long> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public TimestampData toInternal(Long external) {
+		return TimestampData.fromEpochMillis(external);
+	}
+
+	@Override
+	public Long toExternal(TimestampData internal) {
+		return internal.getMillisecond();
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/MapMapConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/MapMapConverter.java
new file mode 100644
index 0000000..24131b0
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/MapMapConverter.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.binary.BinaryMapData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Converter for {@link MapType}/{@link MultisetType} of {@link Map} external type.
+ */
+@Internal
+class MapMapConverter<K, V> implements DataStructureConverter<MapData, Map<K, V>> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final ArrayObjectArrayConverter<K> keyConverter;
+
+	private final ArrayObjectArrayConverter<V> valueConverter;
+
+	private final boolean hasInternalEntries;
+
+	private MapMapConverter(
+			ArrayObjectArrayConverter<K> keyConverter,
+			ArrayObjectArrayConverter<V> valueConverter) {
+		this.keyConverter = keyConverter;
+		this.valueConverter = valueConverter;
+		this.hasInternalEntries = keyConverter.hasInternalElements && valueConverter.hasInternalElements;
+	}
+
+	@Override
+	public void open(ClassLoader classLoader) {
+		keyConverter.open(classLoader);
+		valueConverter.open(classLoader);
+	}
+
+	@Override
+	public MapData toInternal(Map<K, V> external) {
+		if (hasInternalEntries) {
+			return new GenericMapData(external);
+		}
+		return toBinaryMapData(external);
+	}
+
+	@Override
+	public Map<K, V> toExternal(MapData internal) {
+		final ArrayData keyArray = internal.keyArray();
+		final ArrayData valueArray = internal.valueArray();
+
+		final int length = internal.size();
+		final Map<K, V> map = new HashMap<>();
+		for (int pos = 0; pos < length; pos++) {
+			final Object keyValue = keyConverter.elementGetter.getElementOrNull(keyArray, pos);
+			final Object valueValue = valueConverter.elementGetter.getElementOrNull(valueArray, pos);
+			map.put(
+				keyConverter.elementConverter.toExternalOrNull(keyValue),
+				valueConverter.elementConverter.toExternalOrNull(valueValue));
+		}
+		return map;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Runtime helper methods
+	// --------------------------------------------------------------------------------------------
+
+	private MapData toBinaryMapData(Map<K, V> external) {
+		final int length = external.size();
+		keyConverter.allocateWriter(length);
+		valueConverter.allocateWriter(length);
+		int pos = 0;
+		for (Map.Entry<K, V> entry : external.entrySet()) {
+			keyConverter.writeElement(pos, entry.getKey());
+			valueConverter.writeElement(pos, entry.getValue());
+			pos++;
+		}
+		return BinaryMapData.valueOf(keyConverter.completeWriter(), valueConverter.completeWriter());
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Factory method
+	// --------------------------------------------------------------------------------------------
+
+	public static MapMapConverter<?, ?> createForMapType(DataType dataType) {
+		final DataType keyDataType = dataType.getChildren().get(0);
+		final DataType valueDataType = dataType.getChildren().get(1);
+		return new MapMapConverter<>(
+			ArrayObjectArrayConverter.createForElement(keyDataType),
+			ArrayObjectArrayConverter.createForElement(valueDataType)
+		);
+	}
+
+	public static MapMapConverter<?, ?> createForMultisetType(DataType dataType) {
+		final DataType keyDataType = dataType.getChildren().get(0);
+		final DataType valueDataType = DataTypes.INT().notNull();
+		return new MapMapConverter<>(
+			ArrayObjectArrayConverter.createForElement(keyDataType),
+			ArrayObjectArrayConverter.createForElement(valueDataType)
+		);
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RawByteArrayConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RawByteArrayConverter.java
new file mode 100644
index 0000000..db24407
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RawByteArrayConverter.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RawType;
+
+/**
+ * Converter for {@link RawType} of {@code byte[]} external type.
+ */
+@Internal
+class RawByteArrayConverter<T> implements DataStructureConverter<RawValueData<T>, byte[]> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final TypeSerializer<T> serializer;
+
+	private RawByteArrayConverter(TypeSerializer<T> serializer) {
+		this.serializer = serializer;
+	}
+
+	@Override
+	public RawValueData<T> toInternal(byte[] external) {
+		return RawValueData.fromBytes(external);
+	}
+
+	@Override
+	public byte[] toExternal(RawValueData<T> internal) {
+		return internal.toBytes(serializer);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Factory method
+	// --------------------------------------------------------------------------------------------
+
+	public static RawByteArrayConverter<?> create(DataType dataType) {
+		final TypeSerializer<?> serializer = ((RawType<?>) dataType.getLogicalType()).getTypeSerializer();
+		return new RawByteArrayConverter<>(serializer);
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RawObjectConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RawObjectConverter.java
new file mode 100644
index 0000000..d93756a
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RawObjectConverter.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RawType;
+
+/**
+ * Converter for {@link RawType} of object external type.
+ */
+@Internal
+class RawObjectConverter<T> implements DataStructureConverter<RawValueData<T>, T> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final TypeSerializer<T> serializer;
+
+	private RawObjectConverter(TypeSerializer<T> serializer) {
+		this.serializer = serializer;
+	}
+
+	@Override
+	public RawValueData<T> toInternal(T external) {
+		return RawValueData.fromObject(external);
+	}
+
+	@Override
+	public T toExternal(RawValueData<T> internal) {
+		return internal.toObject(serializer);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Factory method
+	// --------------------------------------------------------------------------------------------
+
+	public static RawObjectConverter<?> create(DataType dataType) {
+		final TypeSerializer<?> serializer = ((RawType<?>) dataType.getLogicalType()).getTypeSerializer();
+		return new RawObjectConverter<>(serializer);
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RowRowConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RowRowConverter.java
new file mode 100644
index 0000000..468efcd
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RowRowConverter.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.stream.IntStream;
+
+/**
+ * Converter for {@link RowType} of {@link Row} external type.
+ */
+@Internal
+class RowRowConverter implements DataStructureConverter<RowData, Row> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final DataStructureConverter<Object, Object>[] fieldConverters;
+
+	private final RowData.FieldGetter[] fieldGetters;
+
+	private RowRowConverter(
+			DataStructureConverter<Object, Object>[] fieldConverters,
+			RowData.FieldGetter[] fieldGetters) {
+		this.fieldConverters = fieldConverters;
+		this.fieldGetters = fieldGetters;
+	}
+
+	@Override
+	public void open(ClassLoader classLoader) {
+		for (DataStructureConverter<Object, Object> fieldConverter : fieldConverters) {
+			fieldConverter.open(classLoader);
+		}
+	}
+
+	@Override
+	public RowData toInternal(Row external) {
+		final int length = fieldConverters.length;
+		final GenericRowData genericRow = new GenericRowData(length);
+		for (int pos = 0; pos < length; pos++) {
+			final Object value = external.getField(pos);
+			genericRow.setField(pos, fieldConverters[pos].toInternalOrNull(value));
+		}
+		return genericRow;
+	}
+
+	@Override
+	public Row toExternal(RowData internal) {
+		final int length = fieldConverters.length;
+		final Row row = new Row(length);
+		for (int pos = 0; pos < length; pos++) {
+			final Object value = fieldGetters[pos].getFieldOrNull(internal);
+			row.setField(pos, fieldConverters[pos].toExternalOrNull(value));
+		}
+		return row;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Factory method
+	// --------------------------------------------------------------------------------------------
+
+	@SuppressWarnings({"unchecked", "Convert2MethodRef"})
+	public static RowRowConverter create(DataType dataType) {
+		final List<DataType> fields = dataType.getChildren();
+		final DataStructureConverter<Object, Object>[] fieldConverters = fields.stream()
+			.map(dt -> DataStructureConverters.getConverter(dt))
+			.toArray(DataStructureConverter[]::new);
+		final RowData.FieldGetter[] fieldGetters = IntStream
+			.range(0, fields.size())
+			.mapToObj(pos -> RowData.createFieldGetter(fields.get(pos).getLogicalType(), pos))
+			.toArray(RowData.FieldGetter[]::new);
+		return new RowRowConverter(fieldConverters, fieldGetters);
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/StringByteArrayConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/StringByteArrayConverter.java
new file mode 100644
index 0000000..3a7736c
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/StringByteArrayConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+/**
+ * Converter for {@link CharType}/{@link VarCharType} of {@code byte[]} external type.
+ */
+@Internal
+class StringByteArrayConverter implements DataStructureConverter<StringData, byte[]> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public StringData toInternal(byte[] external) {
+		return StringData.fromBytes(external);
+	}
+
+	@Override
+	public byte[] toExternal(StringData internal) {
+		return internal.toBytes();
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/StringStringConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/StringStringConverter.java
new file mode 100644
index 0000000..290758b
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/StringStringConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+/**
+ * Converter for {@link CharType}/{@link VarCharType} of {@link String} external type.
+ */
+@Internal
+class StringStringConverter implements DataStructureConverter<StringData, String> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public StringData toInternal(String external) {
+		return StringData.fromString(external);
+	}
+
+	@Override
+	public String toExternal(StringData internal) {
+		return internal.toString();
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/StructuredObjectConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/StructuredObjectConverter.java
new file mode 100644
index 0000000..2d21db8
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/StructuredObjectConverter.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.CompileUtils;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.StructuredType;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.types.extraction.ExtractionUtils.getStructuredField;
+import static org.apache.flink.table.types.extraction.ExtractionUtils.getStructuredFieldGetter;
+import static org.apache.flink.table.types.extraction.ExtractionUtils.getStructuredFieldSetter;
+import static org.apache.flink.table.types.extraction.ExtractionUtils.hasInvokableConstructor;
+import static org.apache.flink.table.types.extraction.ExtractionUtils.isStructuredFieldDirectlyReadable;
+import static org.apache.flink.table.types.extraction.ExtractionUtils.isStructuredFieldDirectlyWritable;
+import static org.apache.flink.table.types.extraction.ExtractionUtils.primitiveToWrapper;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldNames;
+
+/**
+ * Converter for {@link StructuredType} of its implementation class.
+ */
+@Internal
+@SuppressWarnings("unchecked")
+class StructuredObjectConverter<T> implements DataStructureConverter<RowData, T> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final DataStructureConverter<Object, Object>[] fieldConverters;
+
+	private final RowData.FieldGetter[] fieldGetters;
+
+	private final String generatedName;
+
+	private final String generatedCode;
+
+	private transient DataStructureConverter<RowData, T> generatedConverter;
+
+	private StructuredObjectConverter(
+			DataStructureConverter<Object, Object>[] fieldConverters,
+			RowData.FieldGetter[] fieldGetters,
+			String generatedName,
+			String generatedCode) {
+		this.fieldConverters = fieldConverters;
+		this.fieldGetters = fieldGetters;
+		this.generatedName = generatedName;
+		this.generatedCode = generatedCode;
+	}
+
+	@Override
+	public void open(ClassLoader classLoader) {
+		for (DataStructureConverter<Object, Object> fieldConverter : fieldConverters) {
+			fieldConverter.open(classLoader);
+		}
+		try {
+			final Class<?> compiledConverter = CompileUtils.compile(classLoader, generatedName, generatedCode);
+			generatedConverter = (DataStructureConverter<RowData, T>) compiledConverter
+				.getConstructor(RowData.FieldGetter[].class, DataStructureConverter[].class)
+				.newInstance(fieldGetters, fieldConverters);
+		} catch (Throwable t) {
+			throw new TableException("Error while generating structured type converter.", t);
+		}
+		generatedConverter.open(classLoader);
+	}
+
+	@Override
+	public RowData toInternal(T external) {
+		return generatedConverter.toInternal(external);
+	}
+
+	@Override
+	public T toExternal(RowData internal) {
+		return generatedConverter.toExternal(internal);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Factory method
+	// --------------------------------------------------------------------------------------------
+
+	public static StructuredObjectConverter<?> create(DataType dataType) {
+		try {
+			return createOrError(dataType);
+		} catch (Throwable t) {
+			throw new TableException(
+				String.format(
+					"Could not create converter for structured type '%s'.",
+					dataType),
+				t);
+		}
+	}
+
+	/**
+	 * Creates a {@link DataStructureConverter} for the given structured type.
+	 *
+	 * <p>Note: We do not perform validation if data type and structured type implementation match. This
+	 * must have been done earlier in the {@link DataTypeFactory}.
+	 */
+	@SuppressWarnings("RedundantCast")
+	private static StructuredObjectConverter<?> createOrError(DataType dataType) {
+		final List<DataType> fields = dataType.getChildren();
+
+		final DataStructureConverter<Object, Object>[] fieldConverters = fields.stream()
+			.map(dt -> (DataStructureConverter<Object, Object>) DataStructureConverters.getConverter(dt))
+			.toArray(DataStructureConverter[]::new);
+
+		final RowData.FieldGetter[] fieldGetters = IntStream
+			.range(0, fields.size())
+			.mapToObj(pos -> RowData.createFieldGetter(fields.get(pos).getLogicalType(), pos))
+			.toArray(RowData.FieldGetter[]::new);
+
+		final Class<?>[] fieldClasses = fields.stream()
+			.map(DataType::getConversionClass)
+			.toArray(Class[]::new);
+
+		final StructuredType structuredType = (StructuredType) dataType.getLogicalType();
+
+		final Class<?> implementationClass = structuredType.getImplementationClass()
+			.orElseThrow(IllegalStateException::new);
+
+		final String converterName = implementationClass.getName().replace('.', '$') + "$Converter";
+		final String converterCode = generateCode(
+				converterName,
+				implementationClass,
+				getFieldNames(structuredType).toArray(new String[0]),
+				fieldClasses);
+
+		return new StructuredObjectConverter<>(
+			fieldConverters,
+			fieldGetters,
+			converterName,
+			converterCode
+		);
+	}
+
+	private static String generateCode(
+			String converterName,
+			Class<?> clazz,
+			String[] fieldNames,
+			Class<?>[] fieldClasses) {
+		final int fieldCount = fieldClasses.length;
+		final StringBuilder sb = new StringBuilder();
+
+		// we ignore checkstyle here for readability and preserving indention
+
+		line(sb, "public class ", converterName, " implements ", DataStructureConverter.class, " {");
+		line(sb, "    private final ", RowData.FieldGetter.class, "[] fieldGetters;");
+		line(sb, "    private final ", DataStructureConverter.class, "[] fieldConverters;");
+
+		line(sb, "    public ", converterName, "(", RowData.FieldGetter.class, "[] fieldGetters, ", DataStructureConverter.class, "[] fieldConverters) {");
+		line(sb, "        this.fieldGetters = fieldGetters;");
+		line(sb, "        this.fieldConverters = fieldConverters;");
+		line(sb, "    }");
+
+		line(sb, "    public ", Object.class, " toInternal(", Object.class, " o) {");
+		line(sb, "        final ", clazz, " external = (", clazz, ") o;");
+		line(sb, "        final ", GenericRowData.class, " genericRow = new ", GenericRowData.class, "(", fieldCount, ");");
+		for (int pos = 0; pos < fieldCount; pos++) {
+		line(sb, "        ", getterExpr(clazz, pos, fieldNames[pos], fieldClasses[pos]), ";");
+		}
+		line(sb, "        return genericRow;");
+		line(sb, "    }");
+
+		line(sb, "    public ", Object.class, " toExternal(", Object.class, " o) {");
+		line(sb, "        final ", RowData.class, " internal = (", RowData.class, ") o;");
+		if (hasInvokableConstructor(clazz, fieldClasses)) {
+		line(sb, "        final ", clazz, " structured = new ", clazz, "(");
+		for (int pos = 0; pos < fieldCount; pos++) {
+		line(sb, "            ", parameterExpr(pos, fieldClasses[pos]), (pos < fieldCount - 1) ? ", " : "");
+		}
+		line(sb, "        );");
+		} else {
+		line(sb, "        final ", clazz, " structured = new ", clazz, "();");
+		for (int pos = 0; pos < fieldCount; pos++) {
+		line(sb, "        ", setterExpr(clazz, pos, fieldNames[pos]), ";");
+		}
+		}
+		line(sb, "        return structured;");
+		line(sb, "    }");
+		line(sb, "}");
+		return sb.toString();
+	}
+
+	private static String getterExpr(
+			Class<?> implementationClass,
+			int pos,
+			String fieldName,
+			Class<?> fieldClass) {
+		final Field field = getStructuredField(implementationClass, fieldName);
+		String accessExpr;
+		if (isStructuredFieldDirectlyReadable(field)) {
+			// field is accessible without getter
+			accessExpr = expr("external.", field.getName());
+		} else {
+			// field is accessible with a getter
+			final Method getter = getStructuredFieldGetter(implementationClass, field)
+				.orElseThrow(IllegalStateException::new);
+			accessExpr = expr("external.", getter.getName(), "()");
+		}
+		accessExpr = castExpr(accessExpr, fieldClass);
+		return expr("genericRow.setField(", pos, ", fieldConverters[", pos , "].toInternalOrNull(", accessExpr, "))");
+	}
+
+	private static String parameterExpr(int pos, Class<?> fieldClass) {
+		final String conversionExpr = expr("fieldConverters[", pos , "].toExternalOrNull(fieldGetters[", pos, "].getFieldOrNull(internal))");
+		return castExpr(conversionExpr, fieldClass);
+	}
+
+	private static String setterExpr(
+			Class<?> implementationClass,
+			int pos,
+			String fieldName) {
+		final Field field = getStructuredField(implementationClass, fieldName);
+		final String conversionExpr = expr("fieldConverters[", pos , "].toExternalOrNull(fieldGetters[", pos, "].getFieldOrNull(internal))");
+		if (isStructuredFieldDirectlyWritable(field)) {
+			// field is accessible without setter
+			return expr("structured.", field.getName(), " = ", castExpr(conversionExpr, field.getType()));
+		} else {
+			// field is accessible with a setter
+			final Method setter = getStructuredFieldSetter(implementationClass, field)
+				.orElseThrow(IllegalStateException::new);
+			return expr("structured.", setter.getName(), "(", castExpr(conversionExpr, setter.getParameterTypes()[0]), ")");
+		}
+	}
+
+	private static String castExpr(String expr, Class<?> clazz) {
+		// help Janino to box primitive types and fix missing generics
+		return expr("((", primitiveToWrapper(clazz), ") ", expr, ")");
+	}
+
+	private static String expr(Object... parts) {
+		final StringBuilder sb = new StringBuilder();
+		for (Object part : parts) {
+			if (part instanceof Class) {
+				sb.append(((Class<?>) part).getCanonicalName());
+			} else {
+				sb.append(part);
+			}
+		}
+		return sb.toString();
+	}
+
+	private static void line(StringBuilder sb, Object... parts) {
+		for (Object part : parts) {
+			if (part instanceof Class) {
+				sb.append(((Class<?>) part).getCanonicalName());
+			} else {
+				sb.append(part);
+			}
+		}
+		sb.append("\n");
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimeLocalTimeConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimeLocalTimeConverter.java
new file mode 100644
index 0000000..d418f25
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimeLocalTimeConverter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.runtime.functions.SqlDateTimeUtils;
+import org.apache.flink.table.types.logical.TimeType;
+
+/**
+ * Converter for {@link TimeType} of {@link java.time.LocalTime} external type.
+ */
+@Internal
+class TimeLocalTimeConverter implements DataStructureConverter<Integer, java.time.LocalTime> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public Integer toInternal(java.time.LocalTime external) {
+		return SqlDateTimeUtils.localTimeToUnixDate(external);
+	}
+
+	@Override
+	public java.time.LocalTime toExternal(Integer internal) {
+		return SqlDateTimeUtils.unixTimeToLocalTime(internal);
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimeLongConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimeLongConverter.java
new file mode 100644
index 0000000..6cc79fb
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimeLongConverter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.types.logical.TimeType;
+
+/**
+ * Converter for {@link TimeType} of {@link Long} external type.
+ */
+@Internal
+class TimeLongConverter implements DataStructureConverter<Integer, Long> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public Integer toInternal(Long external) {
+		return (int) (external / 1000 / 1000);
+	}
+
+	@Override
+	public Long toExternal(Integer internal) {
+		return ((long) internal) * 1000 * 1000;
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimeTimeConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimeTimeConverter.java
new file mode 100644
index 0000000..1c8b34a
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimeTimeConverter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.runtime.functions.SqlDateTimeUtils;
+import org.apache.flink.table.types.logical.TimeType;
+
+/**
+ * Converter for {@link TimeType} of {@link java.sql.Time} external type.
+ */
+@Internal
+class TimeTimeConverter implements DataStructureConverter<Integer, java.sql.Time> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public Integer toInternal(java.sql.Time external) {
+		return SqlDateTimeUtils.timeToInternal(external);
+	}
+
+	@Override
+	public java.sql.Time toExternal(Integer internal) {
+		return SqlDateTimeUtils.internalToTime(internal);
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimestampLocalDateTimeConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimestampLocalDateTimeConverter.java
new file mode 100644
index 0000000..c156715
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimestampLocalDateTimeConverter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.TimestampType;
+
+/**
+ * Converter for {@link TimestampType} of {@link java.time.LocalDateTime} external type.
+ */
+@Internal
+class TimestampLocalDateTimeConverter implements DataStructureConverter<TimestampData, java.time.LocalDateTime> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public TimestampData toInternal(java.time.LocalDateTime external) {
+		return TimestampData.fromLocalDateTime(external);
+	}
+
+	@Override
+	public java.time.LocalDateTime toExternal(TimestampData internal) {
+		return internal.toLocalDateTime();
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimestampTimestampConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimestampTimestampConverter.java
new file mode 100644
index 0000000..f9a72f0
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimestampTimestampConverter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.TimestampType;
+
+/**
+ * Converter for {@link TimestampType} of {@link java.sql.Timestamp} external type.
+ */
+@Internal
+class TimestampTimestampConverter implements DataStructureConverter<TimestampData, java.sql.Timestamp> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public TimestampData toInternal(java.sql.Timestamp external) {
+		return TimestampData.fromTimestamp(external);
+	}
+
+	@Override
+	public java.sql.Timestamp toExternal(TimestampData internal) {
+		return internal.toTimestamp();
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/YearMonthIntervalPeriodConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/YearMonthIntervalPeriodConverter.java
new file mode 100644
index 0000000..50f8d01
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/YearMonthIntervalPeriodConverter.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType.YearMonthResolution;
+
+import java.io.Serializable;
+import java.time.Period;
+
+/**
+ * Converter for {@link YearMonthIntervalType} of {@link java.time.Period} external type.
+ */
+@Internal
+class YearMonthIntervalPeriodConverter implements DataStructureConverter<Integer, java.time.Period> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final PeriodConstructor periodConstructor;
+
+	private YearMonthIntervalPeriodConverter(PeriodConstructor periodConstructor) {
+		this.periodConstructor = periodConstructor;
+	}
+
+	@Override
+	public Integer toInternal(java.time.Period external) {
+		return (int) external.toTotalMonths();
+	}
+
+	@Override
+	public java.time.Period toExternal(Integer internal) {
+		return periodConstructor.construct(internal);
+	}
+
+	private interface PeriodConstructor extends Serializable {
+		java.time.Period construct(Integer internal);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Factory method
+	// --------------------------------------------------------------------------------------------
+
+	public static YearMonthIntervalPeriodConverter create(DataType dataType) {
+		final YearMonthIntervalType intervalType = (YearMonthIntervalType) dataType.getLogicalType();
+		return new YearMonthIntervalPeriodConverter(createPeriodConstructor(intervalType.getResolution()));
+	}
+
+	private static PeriodConstructor createPeriodConstructor(YearMonthResolution resolution) {
+		switch (resolution) {
+			case YEAR:
+				return internal -> java.time.Period.ofYears(internal / 12);
+			case YEAR_TO_MONTH:
+				return internal -> java.time.Period.of(internal / 12, internal % 12, 0);
+			case MONTH:
+				return Period::ofMonths;
+			default:
+				throw new IllegalStateException();
+		}
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/writer/BinaryWriter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/writer/BinaryWriter.java
index 24d903c..cae1232 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/writer/BinaryWriter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/writer/BinaryWriter.java
@@ -153,6 +153,7 @@ public interface BinaryWriter {
 				writer.writeMap(pos, (MapData) o, (MapDataSerializer) serializer);
 				break;
 			case ROW:
+			case STRUCTURED_TYPE:
 				writer.writeRow(pos, (RowData) o, (RowDataSerializer) serializer);
 				break;
 			case RAW:
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/types/InternalSerializers.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/types/InternalSerializers.java
index 462a1ae..7779e98 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/types/InternalSerializers.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/types/InternalSerializers.java
@@ -36,17 +36,17 @@ import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.runtime.typeutils.StringDataSerializer;
 import org.apache.flink.table.runtime.typeutils.TimestampDataSerializer;
 import org.apache.flink.table.types.logical.ArrayType;
-import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DistinctType;
 import org.apache.flink.table.types.logical.IntType;
-import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.MultisetType;
 import org.apache.flink.table.types.logical.RawType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.table.types.logical.TypeInformationRawType;
 
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale;
+
 /**
  * {@link TypeSerializer} of {@link LogicalType} for internal sql engine execution data formats.
  */
@@ -63,9 +63,18 @@ public class InternalSerializers {
 	 * Creates a {@link TypeSerializer} for internal data structures of the given {@link LogicalType}.
 	 */
 	public static TypeSerializer create(LogicalType type, ExecutionConfig config) {
+		// ordered by type root definition
 		switch (type.getTypeRoot()) {
+			case CHAR:
+			case VARCHAR:
+				return StringDataSerializer.INSTANCE;
 			case BOOLEAN:
 				return BooleanSerializer.INSTANCE;
+			case BINARY:
+			case VARBINARY:
+				return BytePrimitiveArraySerializer.INSTANCE;
+			case DECIMAL:
+				return new DecimalDataSerializer(getPrecision(type), getScale(type));
 			case TINYINT:
 				return ByteSerializer.INSTANCE;
 			case SMALLINT:
@@ -78,35 +87,27 @@ public class InternalSerializers {
 			case BIGINT:
 			case INTERVAL_DAY_TIME:
 				return LongSerializer.INSTANCE;
-			case TIMESTAMP_WITHOUT_TIME_ZONE:
-				TimestampType timestampType = (TimestampType) type;
-				return new TimestampDataSerializer(timestampType.getPrecision());
-			case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-				LocalZonedTimestampType lzTs = (LocalZonedTimestampType) type;
-				return new TimestampDataSerializer(lzTs.getPrecision());
 			case FLOAT:
 				return FloatSerializer.INSTANCE;
 			case DOUBLE:
 				return DoubleSerializer.INSTANCE;
-			case CHAR:
-			case VARCHAR:
-				return StringDataSerializer.INSTANCE;
-			case DECIMAL:
-				DecimalType decimalType = (DecimalType) type;
-				return new DecimalDataSerializer(decimalType.getPrecision(), decimalType.getScale());
+			case TIMESTAMP_WITHOUT_TIME_ZONE:
+			case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+				return new TimestampDataSerializer(getPrecision(type));
+			case TIMESTAMP_WITH_TIME_ZONE:
+				throw new UnsupportedOperationException();
 			case ARRAY:
 				return new ArrayDataSerializer(((ArrayType) type).getElementType(), config);
+			case MULTISET:
+				return new MapDataSerializer(((MultisetType) type).getElementType(), new IntType(false), config);
 			case MAP:
 				MapType mapType = (MapType) type;
 				return new MapDataSerializer(mapType.getKeyType(), mapType.getValueType(), config);
-			case MULTISET:
-				return new MapDataSerializer(((MultisetType) type).getElementType(), new IntType(), config);
 			case ROW:
-				RowType rowType = (RowType) type;
-				return new RowDataSerializer(config, rowType);
-			case BINARY:
-			case VARBINARY:
-				return BytePrimitiveArraySerializer.INSTANCE;
+			case STRUCTURED_TYPE:
+				return new RowDataSerializer(config, type.getChildren().toArray(new LogicalType[0]));
+			case DISTINCT_TYPE:
+				return create(((DistinctType) type).getSourceType(), config);
 			case RAW:
 				if (type instanceof RawType) {
 					final RawType<?> rawType = (RawType<?>) type;
@@ -114,6 +115,9 @@ public class InternalSerializers {
 				}
 				return new RawValueDataSerializer<>(
 					((TypeInformationRawType<?>) type).getTypeInformation().createSerializer(config));
+			case NULL:
+			case SYMBOL:
+			case UNRESOLVED:
 			default:
 				throw new UnsupportedOperationException(
 					"Unsupported type '" + type + "' to get internal serializer");
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
new file mode 100644
index 0000000..8af45a4
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
@@ -0,0 +1,730 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.types.AbstractDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeFactoryMock;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+import javax.annotation.Nullable;
+
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.Period;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+import static java.util.Arrays.asList;
+import static org.apache.flink.table.api.DataTypes.ARRAY;
+import static org.apache.flink.table.api.DataTypes.BINARY;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.CHAR;
+import static org.apache.flink.table.api.DataTypes.DATE;
+import static org.apache.flink.table.api.DataTypes.DAY;
+import static org.apache.flink.table.api.DataTypes.DECIMAL;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.INTERVAL;
+import static org.apache.flink.table.api.DataTypes.MAP;
+import static org.apache.flink.table.api.DataTypes.MONTH;
+import static org.apache.flink.table.api.DataTypes.MULTISET;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.SECOND;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIME;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_TIME_ZONE;
+import static org.apache.flink.table.api.DataTypes.VARBINARY;
+import static org.apache.flink.table.api.DataTypes.VARCHAR;
+import static org.apache.flink.table.api.DataTypes.YEAR;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * Tests for {@link DataStructureConverters}.
+ */
+@RunWith(Parameterized.class)
+public class DataStructureConvertersTest {
+
+	@Parameters(name = "{index}: {0}")
+	public static List<TestSpec> testData() {
+		// ordered by definition in DataStructureConverters
+		return asList(
+			TestSpec
+				.forDataType(CHAR(5))
+				.convertedTo(String.class, "12345")
+				.convertedTo(byte[].class, "12345".getBytes(StandardCharsets.UTF_8))
+				.convertedTo(StringData.class, StringData.fromString("12345")),
+
+			TestSpec
+				.forDataType(VARCHAR(100))
+				.convertedTo(String.class, "12345")
+				.convertedTo(byte[].class, "12345".getBytes(StandardCharsets.UTF_8))
+				.convertedTo(StringData.class, StringData.fromString("12345")),
+
+			TestSpec
+				.forDataType(BOOLEAN().notNull())
+				.convertedTo(Boolean.class, true)
+				.convertedTo(boolean.class, true),
+
+			TestSpec
+				.forDataType(BINARY(5))
+				.convertedTo(byte[].class, new byte[]{1, 2, 3, 4, 5}),
+
+			TestSpec
+				.forDataType(VARBINARY(100))
+				.convertedTo(byte[].class, new byte[]{1, 2, 3, 4, 5}),
+
+			TestSpec
+				.forDataType(DECIMAL(3, 2))
+				.convertedTo(BigDecimal.class, new BigDecimal("1.23"))
+				.convertedTo(DecimalData.class, DecimalData.fromUnscaledLong(123, 3, 2)),
+
+			// TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE are skipped for simplicity
+
+			TestSpec
+				.forDataType(DATE())
+				.convertedTo(Date.class, Date.valueOf("2010-11-12"))
+				.convertedTo(LocalDate.class, LocalDate.parse("2010-11-12"))
+				.convertedTo(Integer.class, 14_925),
+
+			TestSpec
+				.forDataType(TIME(0))
+				.convertedTo(java.sql.Time.class, java.sql.Time.valueOf("12:34:56"))
+				.convertedTo(LocalTime.class, LocalTime.parse("12:34:56"))
+				.convertedTo(Integer.class, 45_296_000)
+				.convertedTo(Long.class, 45_296_000_000_000L),
+
+			TestSpec
+				.forDataType(TIME(3)) // TODO support precision of 9
+				.convertedTo(LocalTime.class, LocalTime.parse("12:34:56.001"))
+				.convertedTo(Integer.class, 45_296_001),
+
+			TestSpec
+				.forDataType(TIMESTAMP(9))
+				.convertedTo(Timestamp.class, Timestamp.valueOf("2010-11-12 12:34:56.000000001"))
+				.convertedTo(LocalDateTime.class, LocalDateTime.parse("2010-11-12T12:34:56.000000001"))
+				.convertedTo(TimestampData.class, TimestampData.fromEpochMillis(1_289_565_296_000L, 1)),
+
+			TestSpec
+				.forDataType(TIMESTAMP_WITH_TIME_ZONE(0))
+				.convertedTo(
+					ZonedDateTime.class,
+					ZonedDateTime.ofInstant(Instant.EPOCH, ZoneId.of("UTC")))
+				.convertedTo(
+					java.time.OffsetDateTime.class,
+					ZonedDateTime.ofInstant(Instant.EPOCH, ZoneId.of("UTC")).toOffsetDateTime())
+				.expectErrorMessage("Unsupported data type: TIMESTAMP(0) WITH TIME ZONE"),
+
+			TestSpec
+				.forDataType(TIMESTAMP_WITH_LOCAL_TIME_ZONE(0))
+				.convertedTo(Instant.class, Instant.ofEpochSecond(12_345))
+				.convertedTo(Integer.class, 12_345)
+				.convertedTo(Long.class, 12_345_000L)
+				.convertedTo(TimestampData.class, TimestampData.fromEpochMillis(12_345_000)),
+
+			TestSpec
+				.forDataType(TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))
+				.convertedTo(Instant.class, Instant.ofEpochSecond(12_345, 1_000_000))
+				.convertedTo(Long.class, 12_345_001L)
+				.convertedTo(TimestampData.class, TimestampData.fromEpochMillis(12_345_001)),
+
+			TestSpec
+				.forDataType(TIMESTAMP_WITH_LOCAL_TIME_ZONE(9))
+				.convertedTo(Instant.class, Instant.ofEpochSecond(12_345, 1))
+				.convertedTo(TimestampData.class, TimestampData.fromEpochMillis(12_345_000, 1)),
+
+			TestSpec
+				.forDataType(INTERVAL(YEAR(2), MONTH()))
+				.convertedTo(Period.class, Period.of(2, 6, 0))
+				.convertedTo(Integer.class, 30),
+
+			TestSpec
+				.forDataType(INTERVAL(MONTH()))
+				.convertedTo(Period.class, Period.of(0, 30, 0))
+				.convertedTo(Integer.class, 30),
+
+			TestSpec
+				.forDataType(INTERVAL(DAY(), SECOND(3))) // TODO support precision of 9
+				.convertedTo(Duration.class, Duration.ofMillis(123))
+				.convertedTo(Long.class, 123L),
+
+			TestSpec
+				.forDataType(ARRAY(BOOLEAN()))
+				.convertedTo(boolean[].class, new boolean[]{true, false, true, true})
+				.convertedTo(ArrayData.class, new GenericArrayData(new boolean[]{true, false, true, true})),
+
+			// arrays of TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE are skipped for simplicity
+
+			TestSpec
+				.forDataType(ARRAY(DATE()))
+				.convertedTo(
+					LocalDate[].class,
+					new LocalDate[]{null, LocalDate.parse("2010-11-12"), null, LocalDate.parse("2010-11-12")}),
+
+			TestSpec
+				.forDataType(MAP(INT(), BOOLEAN()))
+				.convertedTo(Map.class, createIdentityMap())
+				.convertedTo(MapData.class, new GenericMapData(createIdentityMap())),
+
+			TestSpec
+				.forDataType(MAP(DATE(), BOOLEAN()))
+				.convertedTo(Map.class, createLocalDateMap()),
+
+			TestSpec
+				.forDataType(MULTISET(BOOLEAN()))
+				.convertedTo(Map.class, createIdentityMultiset())
+				.convertedTo(MapData.class, new GenericMapData(createIdentityMultiset())),
+
+			TestSpec
+				.forDataType(MULTISET(DATE()))
+				.convertedTo(Map.class, createLocalDateMultiset()),
+
+			TestSpec
+				.forDataType(
+					ROW(
+						FIELD("a", INT()),
+						FIELD("b",
+							ROW(
+								FIELD("b_1", DOUBLE()),
+								FIELD("b_2", BOOLEAN())))))
+				.convertedTo(Row.class, Row.of(12, Row.of(2.0, null)))
+				.convertedTo(RowData.class, GenericRowData.of(12, GenericRowData.of(2.0, null))),
+
+			TestSpec
+				.forDataType(
+					ROW(
+						FIELD("a", INT()),
+						FIELD("b",
+							ROW(
+								FIELD("b_1", DATE()),
+								FIELD("b_2", DATE())))))
+				.convertedTo(Row.class, Row.of(12, Row.of(LocalDate.ofEpochDay(1), null))),
+
+			TestSpec
+				.forClass(PojoWithMutableFields.class)
+				.convertedToSupplier(
+					PojoWithMutableFields.class,
+					() -> {
+						final PojoWithMutableFields pojo = new PojoWithMutableFields();
+						pojo.age = 42;
+						pojo.name = "Bob";
+						return pojo;
+					})
+				.convertedTo(Row.class, Row.of(42, "Bob"))
+				.convertedTo(RowData.class, GenericRowData.of(42, StringData.fromString("Bob"))),
+
+			TestSpec
+				.forClass(PojoWithImmutableFields.class)
+				.convertedTo(PojoWithImmutableFields.class, new PojoWithImmutableFields(42, "Bob"))
+				.convertedTo(Row.class, Row.of(42, "Bob"))
+				.convertedTo(RowData.class, GenericRowData.of(42, StringData.fromString("Bob"))),
+
+			TestSpec
+				.forClass(PojoWithGettersAndSetters.class)
+				.convertedToSupplier(
+					PojoWithGettersAndSetters.class,
+					() -> {
+						final PojoWithGettersAndSetters pojo = new PojoWithGettersAndSetters();
+						pojo.setAge(42);
+						pojo.setName("Bob");
+						return pojo;
+					})
+				.convertedTo(Row.class, Row.of(42, "Bob"))
+				.convertedTo(RowData.class, GenericRowData.of(42, StringData.fromString("Bob"))),
+
+			TestSpec
+				.forClass(ComplexPojo.class)
+				.convertedToSupplier(
+					ComplexPojo.class,
+					() -> {
+						final ComplexPojo pojo = new ComplexPojo();
+						pojo.setTimestamp(Timestamp.valueOf("2010-11-12 13:14:15.000000001"));
+						pojo.setPreferences(Row.of(42, "Bob", new Boolean[]{true, null, false}));
+						pojo.setBalance(new BigDecimal("1.23"));
+						return pojo;
+					})
+				.convertedTo(
+					Row.class,
+					Row.of(
+						Timestamp.valueOf("2010-11-12 13:14:15.000000001"),
+						Row.of(42, "Bob", new Boolean[]{true, null, false}),
+						new BigDecimal("1.23"))),
+
+			TestSpec
+				.forClass(PojoAsSuperclass.class)
+				.convertedToSupplier(
+					PojoWithMutableFields.class,
+					() -> {
+						final PojoWithMutableFields pojo = new PojoWithMutableFields();
+						pojo.age = 42;
+						pojo.name = "Bob";
+						return pojo;
+					})
+				.convertedTo(Row.class, Row.of(42)),
+
+			TestSpec
+				.forDataType(MAP(STRING(), DataTypes.of(PojoWithImmutableFields.class)))
+				.convertedTo(Map.class, createPojoWithImmutableFieldsMap()),
+
+			TestSpec
+				.forDataType(ARRAY(DataTypes.of(PojoWithNestedPojo.class)))
+				.convertedTo(PojoWithNestedPojo[].class, createPojoWithNestedPojoArray())
+				.convertedTo(
+					Row[].class,
+					new Row[]{
+						Row.of(
+							new PojoWithImmutableFields(42, "Bob"),
+							new PojoWithImmutableFields[]{new PojoWithImmutableFields(42, "Bob"), null}
+						),
+						null,
+						Row.of(
+							null,
+							new PojoWithImmutableFields[3]
+						),
+						Row.of(
+							null,
+							null
+						)
+					})
+		);
+	}
+
+	@Parameter
+	public TestSpec testSpec;
+
+	@Rule
+	public ExpectedException thrown = ExpectedException.none();
+
+	@Test
+	public void testConversions() {
+		if (testSpec.expectedErrorMessage != null) {
+			thrown.expect(TableException.class);
+			thrown.expectMessage(equalTo(testSpec.expectedErrorMessage));
+		}
+		for (Map.Entry<Class<?>, Object> from : testSpec.conversions.entrySet()) {
+			final DataType fromDataType = testSpec.dataType.bridgedTo(from.getKey());
+
+			final DataStructureConverter<Object, Object> fromConverter =
+				simulateSerialization(DataStructureConverters.getConverter(fromDataType));
+			fromConverter.open(DataStructureConvertersTest.class.getClassLoader());
+
+			final Object internalValue = fromConverter.toInternalOrNull(from.getValue());
+
+			for (Map.Entry<Class<?>, Object> to : testSpec.conversions.entrySet()) {
+				final DataType toDataType = testSpec.dataType.bridgedTo(to.getKey());
+
+				final DataStructureConverter<Object, Object> toConverter =
+					simulateSerialization(DataStructureConverters.getConverter(toDataType));
+				toConverter.open(DataStructureConvertersTest.class.getClassLoader());
+
+				assertArrayEquals(
+					new Object[]{to.getValue()},
+					new Object[]{toConverter.toExternalOrNull(internalValue)});
+			}
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Test utilities
+	// --------------------------------------------------------------------------------------------
+
+	private static class TestSpec {
+
+		private final String description;
+
+		private final DataType dataType;
+
+		private final Map<Class<?>, Object> conversions;
+
+		private @Nullable String expectedErrorMessage;
+
+		private TestSpec(String description, DataType dataType) {
+			this.description = description;
+			this.dataType = dataType;
+			this.conversions = new LinkedHashMap<>();
+		}
+
+		static TestSpec forDataType(AbstractDataType<?> dataType) {
+			final DataTypeFactoryMock factoryMock = new DataTypeFactoryMock();
+			final DataType resolvedDataType = factoryMock.createDataType(dataType);
+			return new TestSpec(
+				resolvedDataType.toString(),
+				resolvedDataType);
+		}
+
+		static TestSpec forClass(Class<?> clazz) {
+			return forDataType(DataTypes.of(clazz));
+		}
+
+		<T> TestSpec convertedTo(Class<T> clazz, T value) {
+			conversions.put(clazz, value);
+			return this;
+		}
+
+		<T> TestSpec convertedToSupplier(Class<T> clazz, Supplier<T> supplier) {
+			conversions.put(clazz, supplier.get());
+			return this;
+		}
+
+		TestSpec expectErrorMessage(String expectedErrorMessage) {
+			this.expectedErrorMessage = expectedErrorMessage;
+			return this;
+		}
+
+		@Override
+		public String toString() {
+			return description;
+		}
+	}
+
+	private static DataStructureConverter<Object, Object> simulateSerialization(DataStructureConverter<Object, Object> converter) {
+		try {
+			final byte[] bytes = InstantiationUtil.serializeObject(converter);
+			return InstantiationUtil.deserializeObject(bytes, DataStructureConverter.class.getClassLoader());
+		} catch (Exception e) {
+			throw new AssertionError("Serialization failed.", e);
+		}
+	}
+
+	private static Map<Integer, Boolean> createIdentityMap() {
+		final Map<Integer, Boolean> map = new HashMap<>();
+		map.put(1, true);
+		map.put(2, false);
+		map.put(3, null);
+		map.put(null, true);
+		return map;
+	}
+
+	private static Map<LocalDate, Boolean> createLocalDateMap() {
+		final Map<LocalDate, Boolean> map = new HashMap<>();
+		map.put(LocalDate.ofEpochDay(0), true);
+		map.put(LocalDate.ofEpochDay(1), false);
+		map.put(LocalDate.ofEpochDay(3), null);
+		map.put(null, true);
+		return map;
+	}
+
+	private static Map<String, PojoWithImmutableFields> createPojoWithImmutableFieldsMap() {
+		final Map<String, PojoWithImmutableFields> map = new HashMap<>();
+		map.put("Alice", new PojoWithImmutableFields(12, "Alice"));
+		map.put("Bob", new PojoWithImmutableFields(42, "Bob"));
+		map.put("Unknown", null);
+		return map;
+	}
+
+	private static PojoWithNestedPojo[] createPojoWithNestedPojoArray() {
+		final PojoWithNestedPojo pojo1 = new PojoWithNestedPojo();
+		pojo1.inner = new PojoWithImmutableFields(42, "Bob");
+		pojo1.innerArray = new PojoWithImmutableFields[]{new PojoWithImmutableFields(42, "Bob"), null};
+
+		final PojoWithNestedPojo pojo2 = new PojoWithNestedPojo();
+		pojo2.inner = null;
+		pojo2.innerArray = new PojoWithImmutableFields[3];
+
+		final PojoWithNestedPojo pojo3 = new PojoWithNestedPojo();
+
+		return new PojoWithNestedPojo[]{
+			pojo1,
+			null,
+			pojo2,
+			pojo3
+		};
+	}
+
+	private static Map<Boolean, Integer> createIdentityMultiset() {
+		final Map<Boolean, Integer> map = new HashMap<>();
+		map.put(true, 1);
+		map.put(false, 2);
+		map.put(null, 3);
+		return map;
+	}
+
+	private static Map<LocalDate, Integer> createLocalDateMultiset() {
+		final Map<LocalDate, Integer> map = new HashMap<>();
+		map.put(LocalDate.ofEpochDay(0), 1);
+		map.put(LocalDate.ofEpochDay(1), 2);
+		map.put(null, 3);
+		return map;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Structured types
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * POJO as superclass.
+	 */
+	public static class PojoAsSuperclass {
+		public int age;
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (!(o instanceof PojoAsSuperclass)) {
+				return false;
+			}
+			PojoAsSuperclass that = (PojoAsSuperclass) o;
+			return age == that.age;
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(age);
+		}
+	}
+
+	/**
+	 * POJO with public mutable fields.
+	 */
+	public static class PojoWithMutableFields extends PojoAsSuperclass {
+		public String name;
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null) {
+				return false;
+			}
+			if (!super.equals(o)) {
+				return false;
+			}
+			// modified to support PojoAsSuperclass
+			if (o.getClass() == PojoAsSuperclass.class) {
+				return true;
+			}
+			PojoWithMutableFields that = (PojoWithMutableFields) o;
+			return Objects.equals(name, that.name);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(super.hashCode(), name);
+		}
+	}
+
+	/**
+	 * POJO with immutable fields.
+	 */
+	public static class PojoWithImmutableFields {
+		public final int age;
+		public final String name;
+
+		public PojoWithImmutableFields(int age, String name) {
+			this.age = age;
+			this.name = name;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			PojoWithImmutableFields that = (PojoWithImmutableFields) o;
+			return age == that.age && Objects.equals(name, that.name);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(age, name);
+		}
+	}
+
+	/**
+	 * POJO with default constructor and private fields.
+	 */
+	public static class PojoWithGettersAndSetters {
+		private int age;
+		private String name;
+
+		public int getAge() {
+			return age;
+		}
+
+		public void setAge(int age) {
+			this.age = age;
+		}
+
+		public String getName() {
+			return name;
+		}
+
+		public void setName(String name) {
+			this.name = name;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			PojoWithGettersAndSetters that = (PojoWithGettersAndSetters) o;
+			return age == that.age && Objects.equals(name, that.name);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(age, name);
+		}
+	}
+
+	/**
+	 * POJO with annotations, nested types, and custom field order.
+	 */
+	public static class ComplexPojo {
+		private Timestamp timestamp;
+		private @DataTypeHint("ROW<age INT, name STRING, mask ARRAY<BOOLEAN>>") Row preferences;
+		private @DataTypeHint("DECIMAL(3, 2)") BigDecimal balance;
+
+		public ComplexPojo() {
+			// default constructor
+		}
+
+		// determines the order of the fields
+		public ComplexPojo(Timestamp timestamp, Row preferences, BigDecimal balance) {
+			this.timestamp = timestamp;
+			this.preferences = preferences;
+			this.balance = balance;
+		}
+
+		public Timestamp getTimestamp() {
+			return timestamp;
+		}
+
+		public void setTimestamp(Timestamp timestamp) {
+			this.timestamp = timestamp;
+		}
+
+		public Row getPreferences() {
+			return preferences;
+		}
+
+		public void setPreferences(Row preferences) {
+			this.preferences = preferences;
+		}
+
+		public BigDecimal getBalance() {
+			return balance;
+		}
+
+		public void setBalance(BigDecimal balance) {
+			this.balance = balance;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			ComplexPojo that = (ComplexPojo) o;
+			return Objects.equals(timestamp, that.timestamp) &&
+				Objects.equals(preferences, that.preferences) &&
+				Objects.equals(balance, that.balance);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(timestamp, preferences, balance);
+		}
+	}
+
+	/**
+	 * POJO with nested fields.
+	 */
+	public static class PojoWithNestedPojo {
+		public PojoWithImmutableFields inner;
+
+		public PojoWithImmutableFields[] innerArray;
+
+		public  PojoWithNestedPojo() {
+			// default constructor
+		}
+
+		public PojoWithNestedPojo(
+				PojoWithImmutableFields inner,
+				PojoWithImmutableFields[] innerArray) {
+			this.inner = inner;
+			this.innerArray = innerArray;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			PojoWithNestedPojo that = (PojoWithNestedPojo) o;
+			return Objects.equals(inner, that.inner) &&
+				Arrays.equals(innerArray, that.innerArray);
+		}
+
+		@Override
+		public int hashCode() {
+			int result = Objects.hash(inner);
+			result = 31 * result + Arrays.hashCode(innerArray);
+			return result;
+		}
+	}
+}


[flink] 04/05: [hotfix][table-common] Fix conversion class of element array data types

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b02e7c0937248a5b5348fe176b166a313313abcf
Author: Timo Walther <tw...@apache.org>
AuthorDate: Fri May 15 09:50:08 2020 +0200

    [hotfix][table-common] Fix conversion class of element array data types
---
 .../flink/table/types/CollectionDataType.java      | 13 ++++++++++-
 .../apache/flink/table/types/DataTypesTest.java    | 25 +++++++++++++++++++++-
 2 files changed, 36 insertions(+), 2 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java
index b69feaf..f896a94 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java
@@ -80,7 +80,7 @@ public final class CollectionDataType extends DataType {
 		return new CollectionDataType(
 			logicalType,
 			Preconditions.checkNotNull(newConversionClass, "New conversion class must not be null."),
-			elementDataType);
+			ensureElementConversionClass(elementDataType, newConversionClass));
 	}
 
 	@Override
@@ -126,4 +126,15 @@ public final class CollectionDataType extends DataType {
 		}
 		return clazz;
 	}
+
+	private DataType ensureElementConversionClass(
+			DataType elementDataType,
+			Class<?> clazz) {
+		// arrays are a special case because their element conversion class depends on the
+		// outer conversion class
+		if (logicalType.getTypeRoot() == LogicalTypeRoot.ARRAY && clazz.isArray()) {
+			return elementDataType.bridgedTo(clazz.getComponentType());
+		}
+		return elementDataType;
+	}
 }
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java
index ec37360..072534b 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java
@@ -247,6 +247,12 @@ public class DataTypesTest {
 				.expectConversionClass(Integer[][].class),
 
 			TestSpec
+				.forDataType(ARRAY(ARRAY(INT().notNull())).bridgedTo(int[][].class))
+				.expectLogicalType(new ArrayType(new ArrayType(new IntType(false))))
+				.expectConversionClass(int[][].class)
+				.expectChildren(DataTypes.ARRAY(INT().notNull()).bridgedTo(int[].class)),
+
+			TestSpec
 				.forDataType(MULTISET(MULTISET(INT())))
 				.expectLogicalType(new MultisetType(new MultisetType(new IntType())))
 				.expectConversionClass(Map.class),
@@ -365,7 +371,9 @@ public class DataTypesTest {
 
 			assertThat(dataType, hasLogicalType(testSpec.expectedLogicalType));
 
-			assertThat(toDataType(testSpec.expectedLogicalType), equalTo(dataType));
+			assertThat(
+				toDataType(testSpec.expectedLogicalType).bridgedTo(dataType.getConversionClass()),
+				equalTo(dataType));
 
 			assertThat(toLogicalType(dataType), equalTo(testSpec.expectedLogicalType));
 		}
@@ -380,6 +388,14 @@ public class DataTypesTest {
 	}
 
 	@Test
+	public void testChildren() {
+		if (testSpec.expectedChildren != null) {
+			final DataType dataType = testSpec.typeFactory.createDataType(testSpec.abstractDataType);
+			assertThat(dataType.getChildren(), equalTo(testSpec.expectedChildren));
+		}
+	}
+
+	@Test
 	public void testUnresolvedString() {
 		if (testSpec.expectedUnresolvedString != null) {
 			assertThat(testSpec.abstractDataType.toString(), equalTo(testSpec.expectedUnresolvedString));
@@ -406,6 +422,8 @@ public class DataTypesTest {
 
 		private @Nullable Class<?> expectedConversionClass;
 
+		private @Nullable List<DataType> expectedChildren;
+
 		private @Nullable String expectedUnresolvedString;
 
 		private @Nullable DataType expectedResolvedDataType;
@@ -432,6 +450,11 @@ public class DataTypesTest {
 			return this;
 		}
 
+		TestSpec expectChildren(DataType... expectedChildren) {
+			this.expectedChildren = Arrays.asList(expectedChildren);
+			return this;
+		}
+
 		TestSpec expectUnresolvedString(String expectedUnresolvedString) {
 			this.expectedUnresolvedString = expectedUnresolvedString;
 			return this;


[flink] 02/05: [hotfix][table-common] Make the usage of structured types easier

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 96a02ab0d016f915cd6cc5c3952948bfc1387990
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed May 13 17:07:10 2020 +0200

    [hotfix][table-common] Make the usage of structured types easier
---
 .../org/apache/flink/table/types/logical/DistinctType.java     |  8 +-------
 .../org/apache/flink/table/types/logical/StructuredType.java   | 10 ++++++----
 .../org/apache/flink/table/types/logical/UserDefinedType.java  |  2 +-
 .../flink/table/types/logical/utils/LogicalTypeDuplicator.java |  4 ++--
 .../java/org/apache/flink/table/types/LogicalTypesTest.java    |  2 +-
 5 files changed, 11 insertions(+), 15 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DistinctType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DistinctType.java
index 3c6f261..54c4b93 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DistinctType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DistinctType.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.types.logical;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.util.Preconditions;
 
@@ -104,15 +103,10 @@ public final class DistinctType extends UserDefinedType {
 		return sourceType;
 	}
 
-	public ObjectIdentifier getObjectIdentifier() {
-		return getOptionalObjectIdentifier()
-			.orElseThrow(() -> new TableException("Object identifier expected."));
-	}
-
 	@Override
 	public LogicalType copy(boolean isNullable) {
 		return new DistinctType(
-			getObjectIdentifier(),
+			getObjectIdentifier().orElseThrow(IllegalStateException::new),
 			sourceType.copy(isNullable),
 			getDescription().orElse(null));
 	}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/StructuredType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/StructuredType.java
index cf8cfda..add2c62 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/StructuredType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/StructuredType.java
@@ -319,8 +319,8 @@ public final class StructuredType extends UserDefinedType {
 	public LogicalType copy(boolean isNullable) {
 		return new StructuredType(
 			isNullable,
-			getOptionalObjectIdentifier().orElse(null),
-			attributes.stream().map(StructuredAttribute::copy).collect(Collectors.toList()),
+			getObjectIdentifier().orElse(null),
+			attributes,
 			isFinal(),
 			isInstantiable,
 			comparision,
@@ -331,11 +331,13 @@ public final class StructuredType extends UserDefinedType {
 
 	@Override
 	public String asSummaryString() {
-		if (getOptionalObjectIdentifier().isPresent()) {
+		if (getObjectIdentifier().isPresent()) {
 			return asSerializableString();
 		}
 		assert implementationClass != null;
-		return implementationClass.getName();
+		// we use *class* to make it visible that this type is unregistered and not confuse it
+		// with catalog types
+		return "*" + implementationClass.getName() + "*";
 	}
 
 	@Override
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/UserDefinedType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/UserDefinedType.java
index f727cbd..cb1babd 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/UserDefinedType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/UserDefinedType.java
@@ -62,7 +62,7 @@ public abstract class UserDefinedType extends LogicalType {
 		this.description = description;
 	}
 
-	public Optional<ObjectIdentifier> getOptionalObjectIdentifier() {
+	public Optional<ObjectIdentifier> getObjectIdentifier() {
 		return Optional.ofNullable(objectIdentifier);
 	}
 
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeDuplicator.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeDuplicator.java
index 341669d..f6088d8 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeDuplicator.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeDuplicator.java
@@ -88,7 +88,7 @@ public class LogicalTypeDuplicator extends LogicalTypeDefaultVisitor<LogicalType
 	@Override
 	public LogicalType visit(DistinctType distinctType) {
 		final DistinctType.Builder builder = DistinctType.newBuilder(
-			distinctType.getObjectIdentifier(),
+			distinctType.getObjectIdentifier().orElseThrow(IllegalStateException::new),
 			distinctType.getSourceType().accept(this));
 		distinctType.getDescription().ifPresent(builder::description);
 		return builder.build();
@@ -121,7 +121,7 @@ public class LogicalTypeDuplicator extends LogicalTypeDefaultVisitor<LogicalType
 	// --------------------------------------------------------------------------------------------
 
 	private StructuredType.Builder instantiateStructuredBuilder(StructuredType structuredType) {
-		final Optional<ObjectIdentifier> identifier = structuredType.getOptionalObjectIdentifier();
+		final Optional<ObjectIdentifier> identifier = structuredType.getObjectIdentifier();
 		final Optional<Class<?>> implementationClass = structuredType.getImplementationClass();
 		if (identifier.isPresent() && implementationClass.isPresent()) {
 			return StructuredType.newBuilder(identifier.get(), implementationClass.get());
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
index d60059e..b79e2ae 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
@@ -674,7 +674,7 @@ public class LogicalTypesTest {
 
 		testInvalidStringSerializability(structuredType);
 
-		testStringSummary(structuredType, User.class.getName());
+		testStringSummary(structuredType, "*" + User.class.getName() + "*");
 
 		testConversions(
 			structuredType,


[flink] 01/05: [hotfix][table-common] Reduce conversion classes of BINARY/VARBINARY

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d25af465f696ee8767bb9f6f8e5dd6c60f713cb4
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed May 13 16:54:39 2020 +0200

    [hotfix][table-common] Reduce conversion classes of BINARY/VARBINARY
---
 .../java/org/apache/flink/table/types/logical/BinaryType.java  | 10 +++-------
 .../org/apache/flink/table/types/logical/VarBinaryType.java    | 10 +++-------
 2 files changed, 6 insertions(+), 14 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/BinaryType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/BinaryType.java
index c8844c0..bf4d0c4 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/BinaryType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/BinaryType.java
@@ -21,12 +21,10 @@ package org.apache.flink.table.types.logical;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.data.ArrayData;
 
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
-import java.util.Set;
 
 /**
  * Logical type of a fixed-length binary string (=a sequence of bytes).
@@ -51,9 +49,7 @@ public final class BinaryType extends LogicalType {
 
 	private static final String FORMAT = "BINARY(%d)";
 
-	private static final Set<String> INPUT_OUTPUT_CONVERSION = conversionSet(
-		byte[].class.getName(),
-		ArrayData.class.getName());
+	private static final Class<?> INPUT_OUTPUT_CONVERSION = byte[].class;
 
 	private static final Class<?> DEFAULT_CONVERSION = byte[].class;
 
@@ -125,12 +121,12 @@ public final class BinaryType extends LogicalType {
 
 	@Override
 	public boolean supportsInputConversion(Class<?> clazz) {
-		return INPUT_OUTPUT_CONVERSION.contains(clazz.getName());
+		return INPUT_OUTPUT_CONVERSION == clazz;
 	}
 
 	@Override
 	public boolean supportsOutputConversion(Class<?> clazz) {
-		return INPUT_OUTPUT_CONVERSION.contains(clazz.getName());
+		return INPUT_OUTPUT_CONVERSION == clazz;
 	}
 
 	@Override
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/VarBinaryType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/VarBinaryType.java
index 2253f7e..77b063a 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/VarBinaryType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/VarBinaryType.java
@@ -21,12 +21,10 @@ package org.apache.flink.table.types.logical;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.data.ArrayData;
 
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
-import java.util.Set;
 
 /**
  * Logical type of a variable-length binary string (=a sequence of bytes).
@@ -54,9 +52,7 @@ public final class VarBinaryType extends LogicalType {
 
 	private static final String MAX_FORMAT = "BYTES";
 
-	private static final Set<String> INPUT_OUTPUT_CONVERSION = conversionSet(
-		byte[].class.getName(),
-		ArrayData.class.getName());
+	private static final Class<?> INPUT_OUTPUT_CONVERSION = byte[].class;
 
 	private static final Class<?> DEFAULT_CONVERSION = byte[].class;
 
@@ -133,12 +129,12 @@ public final class VarBinaryType extends LogicalType {
 
 	@Override
 	public boolean supportsInputConversion(Class<?> clazz) {
-		return INPUT_OUTPUT_CONVERSION.contains(clazz.getName());
+		return INPUT_OUTPUT_CONVERSION == clazz;
 	}
 
 	@Override
 	public boolean supportsOutputConversion(Class<?> clazz) {
-		return INPUT_OUTPUT_CONVERSION.contains(clazz.getName());
+		return INPUT_OUTPUT_CONVERSION == clazz;
 	}
 
 	@Override


[flink] 03/05: [hotfix][table-common] Fix structured type field order

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c2e27f01874fca33a7b8aab1eb463e725cf54744
Author: Timo Walther <tw...@apache.org>
AuthorDate: Thu May 14 18:53:27 2020 +0200

    [hotfix][table-common] Fix structured type field order
---
 .../flink/table/types/logical/StructuredType.java      | 18 +++++++++---------
 .../org/apache/flink/table/types/LogicalTypesTest.java | 15 +++++++++++----
 2 files changed, 20 insertions(+), 13 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/StructuredType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/StructuredType.java
index add2c62..d8aa593 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/StructuredType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/StructuredType.java
@@ -368,16 +368,16 @@ public final class StructuredType extends UserDefinedType {
 
 	@Override
 	public List<LogicalType> getChildren() {
-		final ArrayList<LogicalType> children = new ArrayList<>();
-		StructuredType currentType = this;
-		while (currentType != null) {
-			children.addAll(
-				currentType.attributes.stream()
-					.map(StructuredAttribute::getType)
-					.collect(Collectors.toList()));
-			currentType = currentType.superType;
+		final List<LogicalType> children = new ArrayList<>();
+		// add super fields first
+		if (superType != null) {
+			children.addAll(superType.getChildren());
 		}
-		Collections.reverse(children);
+		// then specific fields
+		children.addAll(
+			attributes.stream()
+				.map(StructuredAttribute::getType)
+				.collect(Collectors.toList()));
 		return Collections.unmodifiableList(children);
 	}
 
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
index b79e2ae..eac4014 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
@@ -67,6 +67,7 @@ import org.apache.flink.util.InstantiationUtil;
 import org.junit.Test;
 
 import java.math.BigDecimal;
+import java.time.LocalDateTime;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -520,7 +521,7 @@ public class LogicalTypesTest {
 			"`cat`.`db`.`User`",
 			new Class[]{Row.class, User.class},
 			new Class[]{Row.class, Human.class, User.class},
-			new LogicalType[]{UDT_NAME_TYPE, UDT_SETTING_TYPE},
+			new LogicalType[]{UDT_NAME_TYPE, UDT_SETTING_TYPE, UDT_TIMESTAMP_TYPE},
 			createUserType(true, false)
 		);
 
@@ -681,7 +682,9 @@ public class LogicalTypesTest {
 			new Class[]{Row.class, User.class},
 			new Class[]{Row.class, Human.class, User.class});
 
-		testChildren(structuredType, new LogicalType[]{UDT_NAME_TYPE, UDT_SETTING_TYPE});
+		testChildren(
+			structuredType,
+			new LogicalType[]{UDT_NAME_TYPE, UDT_SETTING_TYPE, UDT_TIMESTAMP_TYPE});
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -793,6 +796,8 @@ public class LogicalTypesTest {
 
 	private static final LogicalType UDT_SETTING_TYPE = new IntType();
 
+	private static final LogicalType UDT_TIMESTAMP_TYPE = new TimestampType();
+
 	private StructuredType createHumanType(boolean useDifferentImplementation) {
 		return StructuredType.newBuilder(
 				ObjectIdentifier.of("cat", "db", "Human"),
@@ -819,8 +824,9 @@ public class LogicalTypesTest {
 		}
 		return builder
 			.attributes(
-				Collections.singletonList(
-					new StructuredType.StructuredAttribute("setting", UDT_SETTING_TYPE)))
+				Arrays.asList(
+					new StructuredType.StructuredAttribute("setting", UDT_SETTING_TYPE),
+					new StructuredType.StructuredAttribute("timestamp", UDT_TIMESTAMP_TYPE)))
 			.description("User type desc.")
 			.setFinal(isFinal)
 			.setInstantiable(true)
@@ -838,5 +844,6 @@ public class LogicalTypesTest {
 
 	private static final class User extends Human {
 		public int setting;
+		public LocalDateTime timestamp;
 	}
 }