You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2020/05/16 11:00:34 UTC

[flink] 05/05: [FLINK-16999][table-runtime-blink] Add converters for all data types and conversion classes

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8bc744285531bd0b650c799be0a591c3492124f2
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed May 13 18:59:11 2020 +0200

    [FLINK-16999][table-runtime-blink] Add converters for all data types and conversion classes
    
    This adds converters for all data types and all conversion classes. It refactors the
    DataFormatConverter for cleaner readability and adds a lot of tests.
    
    Among other conversions, it adds complete support for structured types
    (incl. immutable POJOs).
    
    The only not 100% unsupported types are TimeType and DayTimeIntervalType.
    
    This closes #12135.
---
 .../flink/table/catalog/DataTypeFactoryImpl.java   |   1 +
 .../org/apache/flink/table/data/ArrayData.java     |   3 +
 .../java/org/apache/flink/table/data/RowData.java  |   3 +
 .../table/types/extraction/ExtractionUtils.java    |  34 +
 flink-table/flink-table-runtime-blink/pom.xml      |   8 +
 .../conversion/ArrayBooleanArrayConverter.java     |  43 ++
 .../data/conversion/ArrayByteArrayConverter.java   |  43 ++
 .../data/conversion/ArrayDoubleArrayConverter.java |  43 ++
 .../data/conversion/ArrayFloatArrayConverter.java  |  43 ++
 .../data/conversion/ArrayIntArrayConverter.java    |  43 ++
 .../data/conversion/ArrayLongArrayConverter.java   |  43 ++
 .../data/conversion/ArrayObjectArrayConverter.java | 206 ++++++
 .../data/conversion/ArrayShortArrayConverter.java  |  43 ++
 .../data/conversion/DataStructureConverter.java    |  80 +++
 .../data/conversion/DataStructureConverters.java   | 224 +++++++
 .../table/data/conversion/DateDateConverter.java   |  42 ++
 .../data/conversion/DateLocalDateConverter.java    |  42 ++
 .../DayTimeIntervalDurationConverter.java          |  43 ++
 .../conversion/DecimalBigDecimalConverter.java     |  63 ++
 .../table/data/conversion/IdentityConverter.java   |  40 ++
 .../LocalZonedTimestampInstantConverter.java       |  42 ++
 .../LocalZonedTimestampIntConverter.java           |  42 ++
 .../LocalZonedTimestampLongConverter.java          |  42 ++
 .../table/data/conversion/MapMapConverter.java     | 125 ++++
 .../data/conversion/RawByteArrayConverter.java     |  59 ++
 .../table/data/conversion/RawObjectConverter.java  |  59 ++
 .../table/data/conversion/RowRowConverter.java     |  95 +++
 .../data/conversion/StringByteArrayConverter.java  |  43 ++
 .../data/conversion/StringStringConverter.java     |  43 ++
 .../data/conversion/StructuredObjectConverter.java | 276 ++++++++
 .../data/conversion/TimeLocalTimeConverter.java    |  42 ++
 .../table/data/conversion/TimeLongConverter.java   |  41 ++
 .../table/data/conversion/TimeTimeConverter.java   |  42 ++
 .../TimestampLocalDateTimeConverter.java           |  42 ++
 .../conversion/TimestampTimestampConverter.java    |  42 ++
 .../YearMonthIntervalPeriodConverter.java          |  78 +++
 .../flink/table/data/writer/BinaryWriter.java      |   1 +
 .../table/runtime/types/InternalSerializers.java   |  50 +-
 .../table/data/DataStructureConvertersTest.java    | 730 +++++++++++++++++++++
 39 files changed, 2961 insertions(+), 23 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DataTypeFactoryImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DataTypeFactoryImpl.java
index 9b0e76e..6ceb3f6 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DataTypeFactoryImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DataTypeFactoryImpl.java
@@ -151,6 +151,7 @@ final class DataTypeFactoryImpl implements DataTypeFactory {
 
 	private LogicalType resolveType(UnresolvedIdentifier identifier) {
 		assert identifier != null;
+		// TODO validate implementation class of structured types when converting from LogicalType to DataType
 		throw new TableException("User-defined types are not supported yet.");
 	}
 }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/ArrayData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/ArrayData.java
index 59c6b2f..5132458 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/ArrayData.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/ArrayData.java
@@ -217,6 +217,9 @@ public interface ArrayData {
 				return array.getMap(pos);
 			case ROW:
 				return array.getRow(pos, ((RowType) elementType).getFieldCount());
+			case STRUCTURED_TYPE:
+				// not the most efficient code but ok for a deprecated method
+				return array.getRow(pos, getFieldCount(elementType));
 			case BINARY:
 			case VARBINARY:
 				return array.getBinary(pos);
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RowData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RowData.java
index e4f2c5a..e8c5afb 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RowData.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RowData.java
@@ -281,6 +281,9 @@ public interface RowData {
 				return row.getMap(pos);
 			case ROW:
 				return row.getRow(pos, ((RowType) fieldType).getFieldCount());
+			case STRUCTURED_TYPE:
+				// not the most efficient code but ok for a deprecated method
+				return row.getRow(pos, getFieldCount(fieldType));
 			case BINARY:
 			case VARBINARY:
 				return row.getBinary(pos);
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java
index b3ed7a3..56e5266 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java
@@ -146,6 +146,25 @@ public final class ExtractionUtils {
 	}
 
 	/**
+	 * Returns the field of a structured type. The logic is as broad as possible to support
+	 * both Java and Scala in different flavors.
+	 */
+	public static Field getStructuredField(Class<?> clazz, String fieldName) {
+		final String normalizedFieldName = fieldName.toUpperCase();
+
+		final List<Field> fields = collectStructuredFields(clazz);
+		for (Field field : fields) {
+			if (field.getName().toUpperCase().equals(normalizedFieldName)) {
+				return field;
+			}
+		}
+		throw extractionError(
+			"Could not to find a field named '%s' in class '%s' for structured type.",
+			fieldName,
+			clazz.getName());
+	}
+
+	/**
 	 * Checks for a field getter of a structured type. The logic is as broad as possible to support
 	 * both Java and Scala in different flavors.
 	 */
@@ -260,6 +279,21 @@ public final class ExtractionUtils {
 		return Modifier.isPublic(m);
 	}
 
+	/**
+	 * Checks whether a field is directly writable without a setter or constructor.
+	 */
+	public static boolean isStructuredFieldDirectlyWritable(Field field) {
+		final int m = field.getModifiers();
+
+		// field is immutable
+		if (Modifier.isFinal(m)) {
+			return false;
+		}
+
+		// field is directly writable
+		return Modifier.isPublic(m);
+	}
+
 	// --------------------------------------------------------------------------------------------
 	// Methods intended for this package
 	// --------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-runtime-blink/pom.xml b/flink-table/flink-table-runtime-blink/pom.xml
index 62115b9..53d745c 100644
--- a/flink-table/flink-table-runtime-blink/pom.xml
+++ b/flink-table/flink-table-runtime-blink/pom.xml
@@ -153,6 +153,14 @@ under the License.
 			<type>test-jar</type>
 			<scope>test</scope>
 		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-common</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
 	</dependencies>
 
 	<build>
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayBooleanArrayConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayBooleanArrayConverter.java
new file mode 100644
index 0000000..3939413
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayBooleanArrayConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.types.logical.ArrayType;
+
+/**
+ * Converter for {@link ArrayType} of {@code boolean[]} external type.
+ */
+@Internal
+class ArrayBooleanArrayConverter implements DataStructureConverter<ArrayData, boolean[]> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public ArrayData toInternal(boolean[] external) {
+		return new GenericArrayData(external);
+	}
+
+	@Override
+	public boolean[] toExternal(ArrayData internal) {
+		return internal.toBooleanArray();
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayByteArrayConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayByteArrayConverter.java
new file mode 100644
index 0000000..6c8b665
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayByteArrayConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.types.logical.ArrayType;
+
+/**
+ * Converter for {@link ArrayType} of {@code byte[]} external type.
+ */
+@Internal
+class ArrayByteArrayConverter implements DataStructureConverter<ArrayData, byte[]> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public ArrayData toInternal(byte[] external) {
+		return new GenericArrayData(external);
+	}
+
+	@Override
+	public byte[] toExternal(ArrayData internal) {
+		return internal.toByteArray();
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayDoubleArrayConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayDoubleArrayConverter.java
new file mode 100644
index 0000000..b442ff9
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayDoubleArrayConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.types.logical.ArrayType;
+
+/**
+ * Converter for {@link ArrayType} of {@code double[]} external type.
+ */
+@Internal
+class ArrayDoubleArrayConverter implements DataStructureConverter<ArrayData, double[]> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public ArrayData toInternal(double[] external) {
+		return new GenericArrayData(external);
+	}
+
+	@Override
+	public double[] toExternal(ArrayData internal) {
+		return internal.toDoubleArray();
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayFloatArrayConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayFloatArrayConverter.java
new file mode 100644
index 0000000..3b8bf15
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayFloatArrayConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.types.logical.ArrayType;
+
+/**
+ * Converter for {@link ArrayType} of {@code float[]} external type.
+ */
+@Internal
+class ArrayFloatArrayConverter implements DataStructureConverter<ArrayData, float[]> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public ArrayData toInternal(float[] external) {
+		return new GenericArrayData(external);
+	}
+
+	@Override
+	public float[] toExternal(ArrayData internal) {
+		return internal.toFloatArray();
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayIntArrayConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayIntArrayConverter.java
new file mode 100644
index 0000000..fe8880a
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayIntArrayConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.types.logical.ArrayType;
+
+/**
+ * Converter for {@link ArrayType} of {@code int[]} external type.
+ */
+@Internal
+class ArrayIntArrayConverter implements DataStructureConverter<ArrayData, int[]> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public ArrayData toInternal(int[] external) {
+		return new GenericArrayData(external);
+	}
+
+	@Override
+	public int[] toExternal(ArrayData internal) {
+		return internal.toIntArray();
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayLongArrayConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayLongArrayConverter.java
new file mode 100644
index 0000000..963d146
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayLongArrayConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.types.logical.ArrayType;
+
+/**
+ * Converter for {@link ArrayType} of {@code long[]} external type.
+ */
+@Internal
+class ArrayLongArrayConverter implements DataStructureConverter<ArrayData, long[]> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public ArrayData toInternal(long[] external) {
+		return new GenericArrayData(external);
+	}
+
+	@Override
+	public long[] toExternal(ArrayData internal) {
+		return internal.toLongArray();
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java
new file mode 100644
index 0000000..761d758
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.binary.BinaryArrayData;
+import org.apache.flink.table.data.writer.BinaryArrayWriter;
+import org.apache.flink.table.data.writer.BinaryWriter;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.Serializable;
+import java.lang.reflect.Array;
+
+/**
+ * Converter for {@link ArrayType} of nested primitive or object arrays external types.
+ */
+@Internal
+@SuppressWarnings("unchecked")
+class ArrayObjectArrayConverter<E> implements DataStructureConverter<ArrayData, E[]> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final Class<E> elementClass;
+
+	private final int elementSize;
+
+	private final BinaryArrayWriter.NullSetter  writerNullSetter;
+
+	private final BinaryWriter.ValueSetter writerValueSetter;
+
+	private final GenericToJavaArrayConverter<E> genericToJavaArrayConverter;
+
+	private transient BinaryArrayData reuseArray;
+
+	private transient BinaryArrayWriter reuseWriter;
+
+	final boolean hasInternalElements;
+
+	final ArrayData.ElementGetter elementGetter;
+
+	final DataStructureConverter<Object, E> elementConverter;
+
+	private ArrayObjectArrayConverter(
+			Class<E> elementClass,
+			int elementSize,
+			BinaryArrayWriter.NullSetter writerNullSetter,
+			BinaryWriter.ValueSetter writerValueSetter,
+			GenericToJavaArrayConverter<E> genericToJavaArrayConverter,
+			ArrayData.ElementGetter elementGetter,
+			DataStructureConverter<Object, E> elementConverter) {
+		this.elementClass = elementClass;
+		this.elementSize = elementSize;
+		this.writerNullSetter = writerNullSetter;
+		this.writerValueSetter = writerValueSetter;
+		this.genericToJavaArrayConverter = genericToJavaArrayConverter;
+		this.hasInternalElements = elementConverter instanceof IdentityConverter;
+		this.elementGetter = elementGetter;
+		this.elementConverter = elementConverter;
+	}
+
+	@Override
+	public void open(ClassLoader classLoader) {
+		reuseArray = new BinaryArrayData();
+		reuseWriter = new BinaryArrayWriter(reuseArray, 0, elementSize);
+		elementConverter.open(classLoader);
+	}
+
+	@Override
+	public ArrayData toInternal(E[] external) {
+		return hasInternalElements ? new GenericArrayData(external) : toBinaryArrayData(external);
+	}
+
+	@Override
+	public E[] toExternal(ArrayData internal) {
+		if (hasInternalElements && internal instanceof GenericArrayData) {
+			final GenericArrayData genericArray = (GenericArrayData) internal;
+			if (genericArray.isPrimitiveArray()) {
+				return genericToJavaArrayConverter.convert((GenericArrayData) internal);
+			}
+			return (E[]) genericArray.toObjectArray();
+		}
+		return toJavaArray(internal);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Runtime helper methods
+	// --------------------------------------------------------------------------------------------
+
+	private ArrayData toBinaryArrayData(E[] external) {
+		final int length = external.length;
+		allocateWriter(length);
+		for (int pos = 0; pos < length; pos++) {
+			writeElement(pos, external[pos]);
+		}
+		return completeWriter();
+	}
+
+	private E[] toJavaArray(ArrayData internal) {
+		final int size = internal.size();
+		final E[] values = (E[]) Array.newInstance(elementClass, size);
+		for (int pos = 0; pos < size; pos++) {
+			final Object value = elementGetter.getElementOrNull(internal, pos);
+			values[pos] = elementConverter.toExternalOrNull(value);
+		}
+		return values;
+	}
+
+	interface GenericToJavaArrayConverter<E> extends Serializable {
+		E[] convert(GenericArrayData internal);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Shared code
+	// --------------------------------------------------------------------------------------------
+
+	void allocateWriter(int length) {
+		if (reuseWriter.getNumElements() != length) {
+			reuseWriter = new BinaryArrayWriter(reuseArray, length, elementSize);
+		} else {
+			reuseWriter.reset();
+		}
+	}
+
+	void writeElement(int pos, E element) {
+		if (element == null) {
+			writerNullSetter.setNull(reuseWriter, pos);
+		} else {
+			writerValueSetter.setValue(reuseWriter, pos, elementConverter.toInternal(element));
+		}
+	}
+
+	BinaryArrayData completeWriter() {
+		reuseWriter.complete();
+		return reuseArray;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Factory method
+	// --------------------------------------------------------------------------------------------
+
+	public static ArrayObjectArrayConverter<?> create(DataType dataType) {
+		return createForElement(dataType.getChildren().get(0));
+	}
+
+	public static <E> ArrayObjectArrayConverter<E> createForElement(DataType elementDataType) {
+		final LogicalType elementType = elementDataType.getLogicalType();
+		return new ArrayObjectArrayConverter<>(
+			(Class<E>) elementDataType.getConversionClass(),
+			BinaryArrayData.calculateFixLengthPartSize(elementType),
+			BinaryArrayWriter.createNullSetter(elementType),
+			BinaryWriter.createValueSetter(elementType),
+			createGenericToJavaArrayConverter(elementType),
+			ArrayData.createElementGetter(elementType),
+			(DataStructureConverter<Object, E>) DataStructureConverters.getConverter(elementDataType)
+		);
+	}
+
+	@SuppressWarnings("unchecked")
+	private static <E> GenericToJavaArrayConverter<E> createGenericToJavaArrayConverter(LogicalType elementType) {
+		switch (elementType.getTypeRoot()) {
+			case BOOLEAN:
+				return internal -> (E[]) ArrayUtils.toObject(internal.toBooleanArray());
+			case TINYINT:
+				return internal -> (E[]) ArrayUtils.toObject(internal.toByteArray());
+			case SMALLINT:
+				return internal -> (E[]) ArrayUtils.toObject(internal.toShortArray());
+			case INTEGER:
+				return internal -> (E[]) ArrayUtils.toObject(internal.toIntArray());
+			case BIGINT:
+				return internal -> (E[]) ArrayUtils.toObject(internal.toLongArray());
+			case FLOAT:
+				return internal -> (E[]) ArrayUtils.toObject(internal.toFloatArray());
+			case DOUBLE:
+				return internal -> (E[]) ArrayUtils.toObject(internal.toDoubleArray());
+			case DISTINCT_TYPE:
+				return createGenericToJavaArrayConverter(((DistinctType) elementType).getSourceType());
+			default:
+				return internal -> {
+					throw new IllegalStateException();
+				};
+		}
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayShortArrayConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayShortArrayConverter.java
new file mode 100644
index 0000000..3b48ea4
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayShortArrayConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.types.logical.ArrayType;
+
+/**
+ * Converter for {@link ArrayType} of {@code short[]} external type.
+ */
+@Internal
+class ArrayShortArrayConverter implements DataStructureConverter<ArrayData, short[]> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public ArrayData toInternal(short[] external) {
+		return new GenericArrayData(external);
+	}
+
+	@Override
+	public short[] toExternal(ArrayData internal) {
+		return internal.toShortArray();
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverter.java
new file mode 100644
index 0000000..d2df1e5
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverter.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+import java.io.Serializable;
+
+/**
+ * Converter between internal and external data structure.
+ *
+ * <p>Converters are serializable and can be passed to runtime operators.
+ *
+ * @param <I> internal data structure (see {@link RowData})
+ * @param <E> external data structure (see {@link DataType#getConversionClass()})
+ */
+@Internal
+public interface DataStructureConverter<I, E> extends Serializable {
+
+	default void open(ClassLoader classLoader) {
+		assert classLoader != null;
+		// nothing to do
+	}
+
+	/**
+	 * Converts to internal data structure.
+	 *
+	 * <p>Note: Parameter must not be null. Output must not be null.
+	 */
+	I toInternal(E external);
+
+	/**
+	 * Converts to internal data structure or {@code null}.
+	 *
+	 * <p>The nullability could be derived from the data type. However, this method reduces null checks.
+	 */
+	default I toInternalOrNull(E external) {
+		if (external == null) {
+			return null;
+		}
+		return toInternal(external);
+	}
+
+	/**
+	 * Converts to external data structure.
+	 *
+	 * <p>Note: Parameter must not be null. Output must not be null.
+	 */
+	E toExternal(I internal);
+
+	/**
+	 * Converts to external data structure or {@code null}.
+	 *
+	 * <p>The nullability could be derived from the data type. However, this method reduces null checks.
+	 */
+	default E toExternalOrNull(I internal) {
+		if (internal == null) {
+			return null;
+		}
+		return toExternal(internal);
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverters.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverters.java
new file mode 100644
index 0000000..1d07d24
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverters.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.types.Row;
+
+import java.math.BigDecimal;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+/**
+ * Registry of available data structure converters.
+ *
+ * <p>Data structure converters are used at the edges for the API for converting between internal
+ * structures (see {@link RowData}) and external structures (see {@link DataType#getConversionClass()}).
+ *
+ * <p>This is useful for UDFs, sources, sinks, or exposing data in the API (e.g. via a {@code collect()}).
+ *
+ * <p>Note: It is NOT the responsibility of a converter to normalize the data. Thus, a converter does
+ * neither change the precision of a timestamp nor prune/expand strings to their defined length. This
+ * might be the responsibility of data classes that are called transitively.
+ */
+@Internal
+public final class DataStructureConverters {
+
+	private static final Map<ConverterIdentifier<?>, DataStructureConverterFactory> converters = new HashMap<>();
+	static {
+		// ordered by type root and conversion class definition
+		putConverter(LogicalTypeRoot.CHAR, String.class, constructor(StringStringConverter::new));
+		putConverter(LogicalTypeRoot.CHAR, byte[].class, constructor(StringByteArrayConverter::new));
+		putConverter(LogicalTypeRoot.CHAR, StringData.class, identity());
+		putConverter(LogicalTypeRoot.VARCHAR, String.class, constructor(StringStringConverter::new));
+		putConverter(LogicalTypeRoot.VARCHAR, byte[].class, constructor(StringByteArrayConverter::new));
+		putConverter(LogicalTypeRoot.VARCHAR, StringData.class, identity());
+		putConverter(LogicalTypeRoot.BOOLEAN, Boolean.class, identity());
+		putConverter(LogicalTypeRoot.BOOLEAN, boolean.class, identity());
+		putConverter(LogicalTypeRoot.BINARY, byte[].class, identity());
+		putConverter(LogicalTypeRoot.VARBINARY, byte[].class, identity());
+		putConverter(LogicalTypeRoot.DECIMAL, BigDecimal.class, DecimalBigDecimalConverter::create);
+		putConverter(LogicalTypeRoot.DECIMAL, DecimalData.class, identity());
+		putConverter(LogicalTypeRoot.TINYINT, Byte.class, identity());
+		putConverter(LogicalTypeRoot.TINYINT, byte.class, identity());
+		putConverter(LogicalTypeRoot.SMALLINT, Short.class, identity());
+		putConverter(LogicalTypeRoot.SMALLINT, short.class, identity());
+		putConverter(LogicalTypeRoot.INTEGER, Integer.class, identity());
+		putConverter(LogicalTypeRoot.INTEGER, int.class, identity());
+		putConverter(LogicalTypeRoot.BIGINT, Long.class, identity());
+		putConverter(LogicalTypeRoot.BIGINT, long.class, identity());
+		putConverter(LogicalTypeRoot.FLOAT, Float.class, identity());
+		putConverter(LogicalTypeRoot.FLOAT, float.class, identity());
+		putConverter(LogicalTypeRoot.DOUBLE, Double.class, identity());
+		putConverter(LogicalTypeRoot.DOUBLE, double.class, identity());
+		putConverter(LogicalTypeRoot.DATE, java.sql.Date.class, constructor(DateDateConverter::new));
+		putConverter(LogicalTypeRoot.DATE, java.time.LocalDate.class, constructor(DateLocalDateConverter::new));
+		putConverter(LogicalTypeRoot.DATE, Integer.class, identity());
+		putConverter(LogicalTypeRoot.DATE, int.class, identity());
+		putConverter(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE, java.sql.Time.class, constructor(TimeTimeConverter::new));
+		putConverter(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE, java.time.LocalTime.class, constructor(TimeLocalTimeConverter::new));
+		putConverter(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE, Integer.class, identity());
+		putConverter(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE, int.class, identity());
+		putConverter(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE, Long.class, constructor(TimeLongConverter::new));
+		putConverter(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE, long.class, constructor(TimeLongConverter::new));
+		putConverter(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, java.sql.Timestamp.class, constructor(TimestampTimestampConverter::new));
+		putConverter(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, java.time.LocalDateTime.class, constructor(TimestampLocalDateTimeConverter::new));
+		putConverter(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, TimestampData.class, identity());
+		putConverter(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE, java.time.ZonedDateTime.class, unsupported());
+		putConverter(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE, java.time.OffsetDateTime.class, unsupported());
+		putConverter(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, java.time.Instant.class, constructor(LocalZonedTimestampInstantConverter::new));
+		putConverter(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, Integer.class, constructor(LocalZonedTimestampIntConverter::new));
+		putConverter(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, int.class, constructor(LocalZonedTimestampIntConverter::new));
+		putConverter(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, Long.class, constructor(LocalZonedTimestampLongConverter::new));
+		putConverter(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, long.class, constructor(LocalZonedTimestampLongConverter::new));
+		putConverter(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, TimestampData.class, identity());
+		putConverter(LogicalTypeRoot.INTERVAL_YEAR_MONTH, java.time.Period.class, YearMonthIntervalPeriodConverter::create);
+		putConverter(LogicalTypeRoot.INTERVAL_YEAR_MONTH, Integer.class, identity());
+		putConverter(LogicalTypeRoot.INTERVAL_YEAR_MONTH, int.class, identity());
+		putConverter(LogicalTypeRoot.INTERVAL_DAY_TIME, java.time.Duration.class, constructor(DayTimeIntervalDurationConverter::new));
+		putConverter(LogicalTypeRoot.INTERVAL_DAY_TIME, Long.class, identity());
+		putConverter(LogicalTypeRoot.INTERVAL_DAY_TIME, long.class, identity());
+		putConverter(LogicalTypeRoot.ARRAY, ArrayData.class, identity());
+		putConverter(LogicalTypeRoot.ARRAY, boolean[].class, constructor(ArrayBooleanArrayConverter::new));
+		putConverter(LogicalTypeRoot.ARRAY, byte[].class, constructor(ArrayByteArrayConverter::new));
+		putConverter(LogicalTypeRoot.ARRAY, short[].class, constructor(ArrayShortArrayConverter::new));
+		putConverter(LogicalTypeRoot.ARRAY, int[].class, constructor(ArrayIntArrayConverter::new));
+		putConverter(LogicalTypeRoot.ARRAY, long[].class, constructor(ArrayLongArrayConverter::new));
+		putConverter(LogicalTypeRoot.ARRAY, float[].class, constructor(ArrayFloatArrayConverter::new));
+		putConverter(LogicalTypeRoot.ARRAY, double[].class, constructor(ArrayDoubleArrayConverter::new));
+		putConverter(LogicalTypeRoot.MAP, Map.class, MapMapConverter::createForMapType);
+		putConverter(LogicalTypeRoot.MAP, MapData.class, identity());
+		putConverter(LogicalTypeRoot.MULTISET, Map.class, MapMapConverter::createForMultisetType);
+		putConverter(LogicalTypeRoot.MULTISET, MapData.class, identity());
+		putConverter(LogicalTypeRoot.ROW, Row.class, RowRowConverter::create);
+		putConverter(LogicalTypeRoot.ROW, RowData.class, identity());
+		putConverter(LogicalTypeRoot.STRUCTURED_TYPE, Row.class, RowRowConverter::create);
+		putConverter(LogicalTypeRoot.STRUCTURED_TYPE, RowData.class, identity());
+		putConverter(LogicalTypeRoot.RAW, byte[].class, RawByteArrayConverter::create);
+		putConverter(LogicalTypeRoot.RAW, RawValueData.class, identity());
+	}
+
+	/**
+	 * Returns a converter for the given {@link DataType}.
+	 */
+	@SuppressWarnings("unchecked")
+	public static DataStructureConverter<Object, Object> getConverter(DataType dataType) {
+		// cast to Object for ease of use
+		return (DataStructureConverter<Object, Object>) getConverterInternal(dataType);
+	}
+
+	private static DataStructureConverter<?, ?> getConverterInternal(DataType dataType) {
+		final LogicalType logicalType = dataType.getLogicalType();
+		final DataStructureConverterFactory factory = converters.get(
+			new ConverterIdentifier<>(
+				logicalType.getTypeRoot(),
+				dataType.getConversionClass()));
+		if (factory != null) {
+			return factory.createConverter(dataType);
+		}
+		// special cases
+		switch (logicalType.getTypeRoot()) {
+			case ARRAY:
+				return ArrayObjectArrayConverter.create(dataType);
+			case DISTINCT_TYPE:
+				return getConverterInternal(dataType.getChildren().get(0));
+			case STRUCTURED_TYPE:
+				return StructuredObjectConverter.create(dataType);
+			case RAW:
+				return RawObjectConverter.create(dataType);
+			default:
+				throw new TableException("Could not find converter for data type: " + dataType);
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Helper methods
+	// --------------------------------------------------------------------------------------------
+
+	private static <E> void putConverter(
+			LogicalTypeRoot root,
+			Class<E> conversionClass,
+			DataStructureConverterFactory factory) {
+		converters.put(new ConverterIdentifier<>(root, conversionClass), factory);
+	}
+
+	private static DataStructureConverterFactory identity() {
+		return constructor(IdentityConverter::new);
+	}
+
+	private static DataStructureConverterFactory constructor(Supplier<DataStructureConverter<?, ?>> supplier) {
+		return dataType -> supplier.get();
+	}
+
+	private static DataStructureConverterFactory unsupported() {
+		return dataType -> {
+			throw new TableException("Unsupported data type: " + dataType);
+		};
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Helper classes
+	// --------------------------------------------------------------------------------------------
+
+	private static class ConverterIdentifier<E> {
+
+		final LogicalTypeRoot root;
+
+		final Class<E> conversionClass;
+
+		ConverterIdentifier(LogicalTypeRoot root, Class<E> conversionClass) {
+			this.root = root;
+			this.conversionClass = conversionClass;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			ConverterIdentifier<?> that = (ConverterIdentifier<?>) o;
+			return root == that.root && conversionClass.equals(that.conversionClass);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(root, conversionClass);
+		}
+	}
+
+	private interface DataStructureConverterFactory {
+		DataStructureConverter<?, ?> createConverter(DataType dt);
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DateDateConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DateDateConverter.java
new file mode 100644
index 0000000..e980891
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DateDateConverter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.runtime.functions.SqlDateTimeUtils;
+import org.apache.flink.table.types.logical.DateType;
+
+/**
+ * Converter for {@link DateType} of {@link java.sql.Date} external type.
+ */
+@Internal
+class DateDateConverter implements DataStructureConverter<Integer, java.sql.Date> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public Integer toInternal(java.sql.Date external) {
+		return SqlDateTimeUtils.dateToInternal(external);
+	}
+
+	@Override
+	public java.sql.Date toExternal(Integer internal) {
+		return SqlDateTimeUtils.internalToDate(internal);
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DateLocalDateConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DateLocalDateConverter.java
new file mode 100644
index 0000000..6df885d
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DateLocalDateConverter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.runtime.functions.SqlDateTimeUtils;
+import org.apache.flink.table.types.logical.DateType;
+
+/**
+ * Converter for {@link DateType} of {@link java.time.LocalDate} external type.
+ */
+@Internal
+class DateLocalDateConverter implements DataStructureConverter<Integer, java.time.LocalDate> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public Integer toInternal(java.time.LocalDate external) {
+		return SqlDateTimeUtils.localDateToUnixDate(external);
+	}
+
+	@Override
+	public java.time.LocalDate toExternal(Integer internal) {
+		return SqlDateTimeUtils.unixDateToLocalDate(internal);
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DayTimeIntervalDurationConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DayTimeIntervalDurationConverter.java
new file mode 100644
index 0000000..1ea01c5
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DayTimeIntervalDurationConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+
+import java.time.Duration;
+
+/**
+ * Converter for {@link DayTimeIntervalType} of {@link java.time.Duration} external type.
+ */
+@Internal
+class DayTimeIntervalDurationConverter implements DataStructureConverter<Long, java.time.Duration> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public Long toInternal(java.time.Duration external) {
+		return external.toMillis();
+	}
+
+	@Override
+	public java.time.Duration toExternal(Long internal) {
+		return Duration.ofMillis(internal);
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DecimalBigDecimalConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DecimalBigDecimalConverter.java
new file mode 100644
index 0000000..850b6a0
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DecimalBigDecimalConverter.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DecimalType;
+
+import java.math.BigDecimal;
+
+/**
+ * Converter for {@link DecimalType} of {@link BigDecimal} external type.
+ */
+@Internal
+class DecimalBigDecimalConverter implements DataStructureConverter<DecimalData, BigDecimal> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final int precision;
+
+	private final int scale;
+
+	private DecimalBigDecimalConverter(int precision, int scale) {
+		this.precision = precision;
+		this.scale = scale;
+	}
+
+	@Override
+	public DecimalData toInternal(BigDecimal external) {
+		return DecimalData.fromBigDecimal(external, precision, scale);
+	}
+
+	@Override
+	public BigDecimal toExternal(DecimalData internal) {
+		return internal.toBigDecimal();
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Factory method
+	// --------------------------------------------------------------------------------------------
+
+	static DecimalBigDecimalConverter create(DataType dataType) {
+		final DecimalType decimalType = (DecimalType) dataType.getLogicalType();
+		return new DecimalBigDecimalConverter(decimalType.getPrecision(), decimalType.getScale());
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/IdentityConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/IdentityConverter.java
new file mode 100644
index 0000000..8d9c874
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/IdentityConverter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * No-op converter that just forwards its input.
+ */
+@Internal
+class IdentityConverter<I> implements DataStructureConverter<I, I> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public I toInternal(I external) {
+		return external;
+	}
+
+	@Override
+	public I toExternal(I internal) {
+		return internal;
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampInstantConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampInstantConverter.java
new file mode 100644
index 0000000..fffefbe
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampInstantConverter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+
+/**
+ * Converter for {@link LocalZonedTimestampType} of {@link java.time.Instant} external type.
+ */
+@Internal
+class LocalZonedTimestampInstantConverter implements DataStructureConverter<TimestampData, java.time.Instant> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public TimestampData toInternal(java.time.Instant external) {
+		return TimestampData.fromInstant(external);
+	}
+
+	@Override
+	public java.time.Instant toExternal(TimestampData internal) {
+		return internal.toInstant();
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampIntConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampIntConverter.java
new file mode 100644
index 0000000..349d34e
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampIntConverter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+
+/**
+ * Converter for {@link LocalZonedTimestampType} of {@link Integer} external type.
+ */
+@Internal
+class LocalZonedTimestampIntConverter implements DataStructureConverter<TimestampData, Integer> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public TimestampData toInternal(Integer external) {
+		return TimestampData.fromEpochMillis(((long) external) * 1000);
+	}
+
+	@Override
+	public Integer toExternal(TimestampData internal) {
+		return ((int) internal.getMillisecond() / 1000);
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampLongConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampLongConverter.java
new file mode 100644
index 0000000..6ddb7eb
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampLongConverter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+
+/**
+ * Converter for {@link LocalZonedTimestampType} of {@link Long} external type.
+ */
+@Internal
+class LocalZonedTimestampLongConverter implements DataStructureConverter<TimestampData, Long> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public TimestampData toInternal(Long external) {
+		return TimestampData.fromEpochMillis(external);
+	}
+
+	@Override
+	public Long toExternal(TimestampData internal) {
+		return internal.getMillisecond();
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/MapMapConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/MapMapConverter.java
new file mode 100644
index 0000000..24131b0
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/MapMapConverter.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.binary.BinaryMapData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Converter for {@link MapType}/{@link MultisetType} of {@link Map} external type.
+ */
+@Internal
+class MapMapConverter<K, V> implements DataStructureConverter<MapData, Map<K, V>> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final ArrayObjectArrayConverter<K> keyConverter;
+
+	private final ArrayObjectArrayConverter<V> valueConverter;
+
+	private final boolean hasInternalEntries;
+
+	private MapMapConverter(
+			ArrayObjectArrayConverter<K> keyConverter,
+			ArrayObjectArrayConverter<V> valueConverter) {
+		this.keyConverter = keyConverter;
+		this.valueConverter = valueConverter;
+		this.hasInternalEntries = keyConverter.hasInternalElements && valueConverter.hasInternalElements;
+	}
+
+	@Override
+	public void open(ClassLoader classLoader) {
+		keyConverter.open(classLoader);
+		valueConverter.open(classLoader);
+	}
+
+	@Override
+	public MapData toInternal(Map<K, V> external) {
+		if (hasInternalEntries) {
+			return new GenericMapData(external);
+		}
+		return toBinaryMapData(external);
+	}
+
+	@Override
+	public Map<K, V> toExternal(MapData internal) {
+		final ArrayData keyArray = internal.keyArray();
+		final ArrayData valueArray = internal.valueArray();
+
+		final int length = internal.size();
+		final Map<K, V> map = new HashMap<>();
+		for (int pos = 0; pos < length; pos++) {
+			final Object keyValue = keyConverter.elementGetter.getElementOrNull(keyArray, pos);
+			final Object valueValue = valueConverter.elementGetter.getElementOrNull(valueArray, pos);
+			map.put(
+				keyConverter.elementConverter.toExternalOrNull(keyValue),
+				valueConverter.elementConverter.toExternalOrNull(valueValue));
+		}
+		return map;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Runtime helper methods
+	// --------------------------------------------------------------------------------------------
+
+	private MapData toBinaryMapData(Map<K, V> external) {
+		final int length = external.size();
+		keyConverter.allocateWriter(length);
+		valueConverter.allocateWriter(length);
+		int pos = 0;
+		for (Map.Entry<K, V> entry : external.entrySet()) {
+			keyConverter.writeElement(pos, entry.getKey());
+			valueConverter.writeElement(pos, entry.getValue());
+			pos++;
+		}
+		return BinaryMapData.valueOf(keyConverter.completeWriter(), valueConverter.completeWriter());
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Factory method
+	// --------------------------------------------------------------------------------------------
+
+	public static MapMapConverter<?, ?> createForMapType(DataType dataType) {
+		final DataType keyDataType = dataType.getChildren().get(0);
+		final DataType valueDataType = dataType.getChildren().get(1);
+		return new MapMapConverter<>(
+			ArrayObjectArrayConverter.createForElement(keyDataType),
+			ArrayObjectArrayConverter.createForElement(valueDataType)
+		);
+	}
+
+	public static MapMapConverter<?, ?> createForMultisetType(DataType dataType) {
+		final DataType keyDataType = dataType.getChildren().get(0);
+		final DataType valueDataType = DataTypes.INT().notNull();
+		return new MapMapConverter<>(
+			ArrayObjectArrayConverter.createForElement(keyDataType),
+			ArrayObjectArrayConverter.createForElement(valueDataType)
+		);
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RawByteArrayConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RawByteArrayConverter.java
new file mode 100644
index 0000000..db24407
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RawByteArrayConverter.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RawType;
+
+/**
+ * Converter for {@link RawType} of {@code byte[]} external type.
+ */
+@Internal
+class RawByteArrayConverter<T> implements DataStructureConverter<RawValueData<T>, byte[]> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final TypeSerializer<T> serializer;
+
+	private RawByteArrayConverter(TypeSerializer<T> serializer) {
+		this.serializer = serializer;
+	}
+
+	@Override
+	public RawValueData<T> toInternal(byte[] external) {
+		return RawValueData.fromBytes(external);
+	}
+
+	@Override
+	public byte[] toExternal(RawValueData<T> internal) {
+		return internal.toBytes(serializer);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Factory method
+	// --------------------------------------------------------------------------------------------
+
+	public static RawByteArrayConverter<?> create(DataType dataType) {
+		final TypeSerializer<?> serializer = ((RawType<?>) dataType.getLogicalType()).getTypeSerializer();
+		return new RawByteArrayConverter<>(serializer);
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RawObjectConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RawObjectConverter.java
new file mode 100644
index 0000000..d93756a
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RawObjectConverter.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RawType;
+
+/**
+ * Converter for {@link RawType} of object external type.
+ */
+@Internal
+class RawObjectConverter<T> implements DataStructureConverter<RawValueData<T>, T> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final TypeSerializer<T> serializer;
+
+	private RawObjectConverter(TypeSerializer<T> serializer) {
+		this.serializer = serializer;
+	}
+
+	@Override
+	public RawValueData<T> toInternal(T external) {
+		return RawValueData.fromObject(external);
+	}
+
+	@Override
+	public T toExternal(RawValueData<T> internal) {
+		return internal.toObject(serializer);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Factory method
+	// --------------------------------------------------------------------------------------------
+
+	public static RawObjectConverter<?> create(DataType dataType) {
+		final TypeSerializer<?> serializer = ((RawType<?>) dataType.getLogicalType()).getTypeSerializer();
+		return new RawObjectConverter<>(serializer);
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RowRowConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RowRowConverter.java
new file mode 100644
index 0000000..468efcd
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RowRowConverter.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.stream.IntStream;
+
+/**
+ * Converter for {@link RowType} of {@link Row} external type.
+ */
+@Internal
+class RowRowConverter implements DataStructureConverter<RowData, Row> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final DataStructureConverter<Object, Object>[] fieldConverters;
+
+	private final RowData.FieldGetter[] fieldGetters;
+
+	private RowRowConverter(
+			DataStructureConverter<Object, Object>[] fieldConverters,
+			RowData.FieldGetter[] fieldGetters) {
+		this.fieldConverters = fieldConverters;
+		this.fieldGetters = fieldGetters;
+	}
+
+	@Override
+	public void open(ClassLoader classLoader) {
+		for (DataStructureConverter<Object, Object> fieldConverter : fieldConverters) {
+			fieldConverter.open(classLoader);
+		}
+	}
+
+	@Override
+	public RowData toInternal(Row external) {
+		final int length = fieldConverters.length;
+		final GenericRowData genericRow = new GenericRowData(length);
+		for (int pos = 0; pos < length; pos++) {
+			final Object value = external.getField(pos);
+			genericRow.setField(pos, fieldConverters[pos].toInternalOrNull(value));
+		}
+		return genericRow;
+	}
+
+	@Override
+	public Row toExternal(RowData internal) {
+		final int length = fieldConverters.length;
+		final Row row = new Row(length);
+		for (int pos = 0; pos < length; pos++) {
+			final Object value = fieldGetters[pos].getFieldOrNull(internal);
+			row.setField(pos, fieldConverters[pos].toExternalOrNull(value));
+		}
+		return row;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Factory method
+	// --------------------------------------------------------------------------------------------
+
+	@SuppressWarnings({"unchecked", "Convert2MethodRef"})
+	public static RowRowConverter create(DataType dataType) {
+		final List<DataType> fields = dataType.getChildren();
+		final DataStructureConverter<Object, Object>[] fieldConverters = fields.stream()
+			.map(dt -> DataStructureConverters.getConverter(dt))
+			.toArray(DataStructureConverter[]::new);
+		final RowData.FieldGetter[] fieldGetters = IntStream
+			.range(0, fields.size())
+			.mapToObj(pos -> RowData.createFieldGetter(fields.get(pos).getLogicalType(), pos))
+			.toArray(RowData.FieldGetter[]::new);
+		return new RowRowConverter(fieldConverters, fieldGetters);
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/StringByteArrayConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/StringByteArrayConverter.java
new file mode 100644
index 0000000..3a7736c
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/StringByteArrayConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+/**
+ * Converter for {@link CharType}/{@link VarCharType} of {@code byte[]} external type.
+ */
+@Internal
+class StringByteArrayConverter implements DataStructureConverter<StringData, byte[]> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public StringData toInternal(byte[] external) {
+		return StringData.fromBytes(external);
+	}
+
+	@Override
+	public byte[] toExternal(StringData internal) {
+		return internal.toBytes();
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/StringStringConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/StringStringConverter.java
new file mode 100644
index 0000000..290758b
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/StringStringConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+/**
+ * Converter for {@link CharType}/{@link VarCharType} of {@link String} external type.
+ */
+@Internal
+class StringStringConverter implements DataStructureConverter<StringData, String> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public StringData toInternal(String external) {
+		return StringData.fromString(external);
+	}
+
+	@Override
+	public String toExternal(StringData internal) {
+		return internal.toString();
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/StructuredObjectConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/StructuredObjectConverter.java
new file mode 100644
index 0000000..2d21db8
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/StructuredObjectConverter.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.CompileUtils;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.StructuredType;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.types.extraction.ExtractionUtils.getStructuredField;
+import static org.apache.flink.table.types.extraction.ExtractionUtils.getStructuredFieldGetter;
+import static org.apache.flink.table.types.extraction.ExtractionUtils.getStructuredFieldSetter;
+import static org.apache.flink.table.types.extraction.ExtractionUtils.hasInvokableConstructor;
+import static org.apache.flink.table.types.extraction.ExtractionUtils.isStructuredFieldDirectlyReadable;
+import static org.apache.flink.table.types.extraction.ExtractionUtils.isStructuredFieldDirectlyWritable;
+import static org.apache.flink.table.types.extraction.ExtractionUtils.primitiveToWrapper;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldNames;
+
+/**
+ * Converter for {@link StructuredType} of its implementation class.
+ */
+@Internal
+@SuppressWarnings("unchecked")
+class StructuredObjectConverter<T> implements DataStructureConverter<RowData, T> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final DataStructureConverter<Object, Object>[] fieldConverters;
+
+	private final RowData.FieldGetter[] fieldGetters;
+
+	private final String generatedName;
+
+	private final String generatedCode;
+
+	private transient DataStructureConverter<RowData, T> generatedConverter;
+
+	private StructuredObjectConverter(
+			DataStructureConverter<Object, Object>[] fieldConverters,
+			RowData.FieldGetter[] fieldGetters,
+			String generatedName,
+			String generatedCode) {
+		this.fieldConverters = fieldConverters;
+		this.fieldGetters = fieldGetters;
+		this.generatedName = generatedName;
+		this.generatedCode = generatedCode;
+	}
+
+	@Override
+	public void open(ClassLoader classLoader) {
+		for (DataStructureConverter<Object, Object> fieldConverter : fieldConverters) {
+			fieldConverter.open(classLoader);
+		}
+		try {
+			final Class<?> compiledConverter = CompileUtils.compile(classLoader, generatedName, generatedCode);
+			generatedConverter = (DataStructureConverter<RowData, T>) compiledConverter
+				.getConstructor(RowData.FieldGetter[].class, DataStructureConverter[].class)
+				.newInstance(fieldGetters, fieldConverters);
+		} catch (Throwable t) {
+			throw new TableException("Error while generating structured type converter.", t);
+		}
+		generatedConverter.open(classLoader);
+	}
+
+	@Override
+	public RowData toInternal(T external) {
+		return generatedConverter.toInternal(external);
+	}
+
+	@Override
+	public T toExternal(RowData internal) {
+		return generatedConverter.toExternal(internal);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Factory method
+	// --------------------------------------------------------------------------------------------
+
+	public static StructuredObjectConverter<?> create(DataType dataType) {
+		try {
+			return createOrError(dataType);
+		} catch (Throwable t) {
+			throw new TableException(
+				String.format(
+					"Could not create converter for structured type '%s'.",
+					dataType),
+				t);
+		}
+	}
+
+	/**
+	 * Creates a {@link DataStructureConverter} for the given structured type.
+	 *
+	 * <p>Note: We do not perform validation if data type and structured type implementation match. This
+	 * must have been done earlier in the {@link DataTypeFactory}.
+	 */
+	@SuppressWarnings("RedundantCast")
+	private static StructuredObjectConverter<?> createOrError(DataType dataType) {
+		final List<DataType> fields = dataType.getChildren();
+
+		final DataStructureConverter<Object, Object>[] fieldConverters = fields.stream()
+			.map(dt -> (DataStructureConverter<Object, Object>) DataStructureConverters.getConverter(dt))
+			.toArray(DataStructureConverter[]::new);
+
+		final RowData.FieldGetter[] fieldGetters = IntStream
+			.range(0, fields.size())
+			.mapToObj(pos -> RowData.createFieldGetter(fields.get(pos).getLogicalType(), pos))
+			.toArray(RowData.FieldGetter[]::new);
+
+		final Class<?>[] fieldClasses = fields.stream()
+			.map(DataType::getConversionClass)
+			.toArray(Class[]::new);
+
+		final StructuredType structuredType = (StructuredType) dataType.getLogicalType();
+
+		final Class<?> implementationClass = structuredType.getImplementationClass()
+			.orElseThrow(IllegalStateException::new);
+
+		final String converterName = implementationClass.getName().replace('.', '$') + "$Converter";
+		final String converterCode = generateCode(
+				converterName,
+				implementationClass,
+				getFieldNames(structuredType).toArray(new String[0]),
+				fieldClasses);
+
+		return new StructuredObjectConverter<>(
+			fieldConverters,
+			fieldGetters,
+			converterName,
+			converterCode
+		);
+	}
+
+	private static String generateCode(
+			String converterName,
+			Class<?> clazz,
+			String[] fieldNames,
+			Class<?>[] fieldClasses) {
+		final int fieldCount = fieldClasses.length;
+		final StringBuilder sb = new StringBuilder();
+
+		// we ignore checkstyle here for readability and preserving indention
+
+		line(sb, "public class ", converterName, " implements ", DataStructureConverter.class, " {");
+		line(sb, "    private final ", RowData.FieldGetter.class, "[] fieldGetters;");
+		line(sb, "    private final ", DataStructureConverter.class, "[] fieldConverters;");
+
+		line(sb, "    public ", converterName, "(", RowData.FieldGetter.class, "[] fieldGetters, ", DataStructureConverter.class, "[] fieldConverters) {");
+		line(sb, "        this.fieldGetters = fieldGetters;");
+		line(sb, "        this.fieldConverters = fieldConverters;");
+		line(sb, "    }");
+
+		line(sb, "    public ", Object.class, " toInternal(", Object.class, " o) {");
+		line(sb, "        final ", clazz, " external = (", clazz, ") o;");
+		line(sb, "        final ", GenericRowData.class, " genericRow = new ", GenericRowData.class, "(", fieldCount, ");");
+		for (int pos = 0; pos < fieldCount; pos++) {
+		line(sb, "        ", getterExpr(clazz, pos, fieldNames[pos], fieldClasses[pos]), ";");
+		}
+		line(sb, "        return genericRow;");
+		line(sb, "    }");
+
+		line(sb, "    public ", Object.class, " toExternal(", Object.class, " o) {");
+		line(sb, "        final ", RowData.class, " internal = (", RowData.class, ") o;");
+		if (hasInvokableConstructor(clazz, fieldClasses)) {
+		line(sb, "        final ", clazz, " structured = new ", clazz, "(");
+		for (int pos = 0; pos < fieldCount; pos++) {
+		line(sb, "            ", parameterExpr(pos, fieldClasses[pos]), (pos < fieldCount - 1) ? ", " : "");
+		}
+		line(sb, "        );");
+		} else {
+		line(sb, "        final ", clazz, " structured = new ", clazz, "();");
+		for (int pos = 0; pos < fieldCount; pos++) {
+		line(sb, "        ", setterExpr(clazz, pos, fieldNames[pos]), ";");
+		}
+		}
+		line(sb, "        return structured;");
+		line(sb, "    }");
+		line(sb, "}");
+		return sb.toString();
+	}
+
+	private static String getterExpr(
+			Class<?> implementationClass,
+			int pos,
+			String fieldName,
+			Class<?> fieldClass) {
+		final Field field = getStructuredField(implementationClass, fieldName);
+		String accessExpr;
+		if (isStructuredFieldDirectlyReadable(field)) {
+			// field is accessible without getter
+			accessExpr = expr("external.", field.getName());
+		} else {
+			// field is accessible with a getter
+			final Method getter = getStructuredFieldGetter(implementationClass, field)
+				.orElseThrow(IllegalStateException::new);
+			accessExpr = expr("external.", getter.getName(), "()");
+		}
+		accessExpr = castExpr(accessExpr, fieldClass);
+		return expr("genericRow.setField(", pos, ", fieldConverters[", pos , "].toInternalOrNull(", accessExpr, "))");
+	}
+
+	private static String parameterExpr(int pos, Class<?> fieldClass) {
+		final String conversionExpr = expr("fieldConverters[", pos , "].toExternalOrNull(fieldGetters[", pos, "].getFieldOrNull(internal))");
+		return castExpr(conversionExpr, fieldClass);
+	}
+
+	private static String setterExpr(
+			Class<?> implementationClass,
+			int pos,
+			String fieldName) {
+		final Field field = getStructuredField(implementationClass, fieldName);
+		final String conversionExpr = expr("fieldConverters[", pos , "].toExternalOrNull(fieldGetters[", pos, "].getFieldOrNull(internal))");
+		if (isStructuredFieldDirectlyWritable(field)) {
+			// field is accessible without setter
+			return expr("structured.", field.getName(), " = ", castExpr(conversionExpr, field.getType()));
+		} else {
+			// field is accessible with a setter
+			final Method setter = getStructuredFieldSetter(implementationClass, field)
+				.orElseThrow(IllegalStateException::new);
+			return expr("structured.", setter.getName(), "(", castExpr(conversionExpr, setter.getParameterTypes()[0]), ")");
+		}
+	}
+
+	private static String castExpr(String expr, Class<?> clazz) {
+		// help Janino to box primitive types and fix missing generics
+		return expr("((", primitiveToWrapper(clazz), ") ", expr, ")");
+	}
+
+	private static String expr(Object... parts) {
+		final StringBuilder sb = new StringBuilder();
+		for (Object part : parts) {
+			if (part instanceof Class) {
+				sb.append(((Class<?>) part).getCanonicalName());
+			} else {
+				sb.append(part);
+			}
+		}
+		return sb.toString();
+	}
+
+	private static void line(StringBuilder sb, Object... parts) {
+		for (Object part : parts) {
+			if (part instanceof Class) {
+				sb.append(((Class<?>) part).getCanonicalName());
+			} else {
+				sb.append(part);
+			}
+		}
+		sb.append("\n");
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimeLocalTimeConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimeLocalTimeConverter.java
new file mode 100644
index 0000000..d418f25
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimeLocalTimeConverter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.runtime.functions.SqlDateTimeUtils;
+import org.apache.flink.table.types.logical.TimeType;
+
+/**
+ * Converter for {@link TimeType} of {@link java.time.LocalTime} external type.
+ */
+@Internal
+class TimeLocalTimeConverter implements DataStructureConverter<Integer, java.time.LocalTime> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public Integer toInternal(java.time.LocalTime external) {
+		return SqlDateTimeUtils.localTimeToUnixDate(external);
+	}
+
+	@Override
+	public java.time.LocalTime toExternal(Integer internal) {
+		return SqlDateTimeUtils.unixTimeToLocalTime(internal);
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimeLongConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimeLongConverter.java
new file mode 100644
index 0000000..6cc79fb
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimeLongConverter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.types.logical.TimeType;
+
+/**
+ * Converter for {@link TimeType} of {@link Long} external type.
+ */
+@Internal
+class TimeLongConverter implements DataStructureConverter<Integer, Long> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public Integer toInternal(Long external) {
+		return (int) (external / 1000 / 1000);
+	}
+
+	@Override
+	public Long toExternal(Integer internal) {
+		return ((long) internal) * 1000 * 1000;
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimeTimeConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimeTimeConverter.java
new file mode 100644
index 0000000..1c8b34a
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimeTimeConverter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.runtime.functions.SqlDateTimeUtils;
+import org.apache.flink.table.types.logical.TimeType;
+
+/**
+ * Converter for {@link TimeType} of {@link java.sql.Time} external type.
+ */
+@Internal
+class TimeTimeConverter implements DataStructureConverter<Integer, java.sql.Time> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public Integer toInternal(java.sql.Time external) {
+		return SqlDateTimeUtils.timeToInternal(external);
+	}
+
+	@Override
+	public java.sql.Time toExternal(Integer internal) {
+		return SqlDateTimeUtils.internalToTime(internal);
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimestampLocalDateTimeConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimestampLocalDateTimeConverter.java
new file mode 100644
index 0000000..c156715
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimestampLocalDateTimeConverter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.TimestampType;
+
+/**
+ * Converter for {@link TimestampType} of {@link java.time.LocalDateTime} external type.
+ */
+@Internal
+class TimestampLocalDateTimeConverter implements DataStructureConverter<TimestampData, java.time.LocalDateTime> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public TimestampData toInternal(java.time.LocalDateTime external) {
+		return TimestampData.fromLocalDateTime(external);
+	}
+
+	@Override
+	public java.time.LocalDateTime toExternal(TimestampData internal) {
+		return internal.toLocalDateTime();
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimestampTimestampConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimestampTimestampConverter.java
new file mode 100644
index 0000000..f9a72f0
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimestampTimestampConverter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.TimestampType;
+
+/**
+ * Converter for {@link TimestampType} of {@link java.sql.Timestamp} external type.
+ */
+@Internal
+class TimestampTimestampConverter implements DataStructureConverter<TimestampData, java.sql.Timestamp> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public TimestampData toInternal(java.sql.Timestamp external) {
+		return TimestampData.fromTimestamp(external);
+	}
+
+	@Override
+	public java.sql.Timestamp toExternal(TimestampData internal) {
+		return internal.toTimestamp();
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/YearMonthIntervalPeriodConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/YearMonthIntervalPeriodConverter.java
new file mode 100644
index 0000000..50f8d01
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/YearMonthIntervalPeriodConverter.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType.YearMonthResolution;
+
+import java.io.Serializable;
+import java.time.Period;
+
+/**
+ * Converter for {@link YearMonthIntervalType} of {@link java.time.Period} external type.
+ */
+@Internal
+class YearMonthIntervalPeriodConverter implements DataStructureConverter<Integer, java.time.Period> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final PeriodConstructor periodConstructor;
+
+	private YearMonthIntervalPeriodConverter(PeriodConstructor periodConstructor) {
+		this.periodConstructor = periodConstructor;
+	}
+
+	@Override
+	public Integer toInternal(java.time.Period external) {
+		return (int) external.toTotalMonths();
+	}
+
+	@Override
+	public java.time.Period toExternal(Integer internal) {
+		return periodConstructor.construct(internal);
+	}
+
+	private interface PeriodConstructor extends Serializable {
+		java.time.Period construct(Integer internal);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Factory method
+	// --------------------------------------------------------------------------------------------
+
+	public static YearMonthIntervalPeriodConverter create(DataType dataType) {
+		final YearMonthIntervalType intervalType = (YearMonthIntervalType) dataType.getLogicalType();
+		return new YearMonthIntervalPeriodConverter(createPeriodConstructor(intervalType.getResolution()));
+	}
+
+	private static PeriodConstructor createPeriodConstructor(YearMonthResolution resolution) {
+		switch (resolution) {
+			case YEAR:
+				return internal -> java.time.Period.ofYears(internal / 12);
+			case YEAR_TO_MONTH:
+				return internal -> java.time.Period.of(internal / 12, internal % 12, 0);
+			case MONTH:
+				return Period::ofMonths;
+			default:
+				throw new IllegalStateException();
+		}
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/writer/BinaryWriter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/writer/BinaryWriter.java
index 24d903c..cae1232 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/writer/BinaryWriter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/writer/BinaryWriter.java
@@ -153,6 +153,7 @@ public interface BinaryWriter {
 				writer.writeMap(pos, (MapData) o, (MapDataSerializer) serializer);
 				break;
 			case ROW:
+			case STRUCTURED_TYPE:
 				writer.writeRow(pos, (RowData) o, (RowDataSerializer) serializer);
 				break;
 			case RAW:
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/types/InternalSerializers.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/types/InternalSerializers.java
index 462a1ae..7779e98 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/types/InternalSerializers.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/types/InternalSerializers.java
@@ -36,17 +36,17 @@ import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.runtime.typeutils.StringDataSerializer;
 import org.apache.flink.table.runtime.typeutils.TimestampDataSerializer;
 import org.apache.flink.table.types.logical.ArrayType;
-import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DistinctType;
 import org.apache.flink.table.types.logical.IntType;
-import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.MultisetType;
 import org.apache.flink.table.types.logical.RawType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.table.types.logical.TypeInformationRawType;
 
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale;
+
 /**
  * {@link TypeSerializer} of {@link LogicalType} for internal sql engine execution data formats.
  */
@@ -63,9 +63,18 @@ public class InternalSerializers {
 	 * Creates a {@link TypeSerializer} for internal data structures of the given {@link LogicalType}.
 	 */
 	public static TypeSerializer create(LogicalType type, ExecutionConfig config) {
+		// ordered by type root definition
 		switch (type.getTypeRoot()) {
+			case CHAR:
+			case VARCHAR:
+				return StringDataSerializer.INSTANCE;
 			case BOOLEAN:
 				return BooleanSerializer.INSTANCE;
+			case BINARY:
+			case VARBINARY:
+				return BytePrimitiveArraySerializer.INSTANCE;
+			case DECIMAL:
+				return new DecimalDataSerializer(getPrecision(type), getScale(type));
 			case TINYINT:
 				return ByteSerializer.INSTANCE;
 			case SMALLINT:
@@ -78,35 +87,27 @@ public class InternalSerializers {
 			case BIGINT:
 			case INTERVAL_DAY_TIME:
 				return LongSerializer.INSTANCE;
-			case TIMESTAMP_WITHOUT_TIME_ZONE:
-				TimestampType timestampType = (TimestampType) type;
-				return new TimestampDataSerializer(timestampType.getPrecision());
-			case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-				LocalZonedTimestampType lzTs = (LocalZonedTimestampType) type;
-				return new TimestampDataSerializer(lzTs.getPrecision());
 			case FLOAT:
 				return FloatSerializer.INSTANCE;
 			case DOUBLE:
 				return DoubleSerializer.INSTANCE;
-			case CHAR:
-			case VARCHAR:
-				return StringDataSerializer.INSTANCE;
-			case DECIMAL:
-				DecimalType decimalType = (DecimalType) type;
-				return new DecimalDataSerializer(decimalType.getPrecision(), decimalType.getScale());
+			case TIMESTAMP_WITHOUT_TIME_ZONE:
+			case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+				return new TimestampDataSerializer(getPrecision(type));
+			case TIMESTAMP_WITH_TIME_ZONE:
+				throw new UnsupportedOperationException();
 			case ARRAY:
 				return new ArrayDataSerializer(((ArrayType) type).getElementType(), config);
+			case MULTISET:
+				return new MapDataSerializer(((MultisetType) type).getElementType(), new IntType(false), config);
 			case MAP:
 				MapType mapType = (MapType) type;
 				return new MapDataSerializer(mapType.getKeyType(), mapType.getValueType(), config);
-			case MULTISET:
-				return new MapDataSerializer(((MultisetType) type).getElementType(), new IntType(), config);
 			case ROW:
-				RowType rowType = (RowType) type;
-				return new RowDataSerializer(config, rowType);
-			case BINARY:
-			case VARBINARY:
-				return BytePrimitiveArraySerializer.INSTANCE;
+			case STRUCTURED_TYPE:
+				return new RowDataSerializer(config, type.getChildren().toArray(new LogicalType[0]));
+			case DISTINCT_TYPE:
+				return create(((DistinctType) type).getSourceType(), config);
 			case RAW:
 				if (type instanceof RawType) {
 					final RawType<?> rawType = (RawType<?>) type;
@@ -114,6 +115,9 @@ public class InternalSerializers {
 				}
 				return new RawValueDataSerializer<>(
 					((TypeInformationRawType<?>) type).getTypeInformation().createSerializer(config));
+			case NULL:
+			case SYMBOL:
+			case UNRESOLVED:
 			default:
 				throw new UnsupportedOperationException(
 					"Unsupported type '" + type + "' to get internal serializer");
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
new file mode 100644
index 0000000..8af45a4
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
@@ -0,0 +1,730 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.types.AbstractDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeFactoryMock;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+import javax.annotation.Nullable;
+
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.Period;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+import static java.util.Arrays.asList;
+import static org.apache.flink.table.api.DataTypes.ARRAY;
+import static org.apache.flink.table.api.DataTypes.BINARY;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.CHAR;
+import static org.apache.flink.table.api.DataTypes.DATE;
+import static org.apache.flink.table.api.DataTypes.DAY;
+import static org.apache.flink.table.api.DataTypes.DECIMAL;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.INTERVAL;
+import static org.apache.flink.table.api.DataTypes.MAP;
+import static org.apache.flink.table.api.DataTypes.MONTH;
+import static org.apache.flink.table.api.DataTypes.MULTISET;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.SECOND;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIME;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_TIME_ZONE;
+import static org.apache.flink.table.api.DataTypes.VARBINARY;
+import static org.apache.flink.table.api.DataTypes.VARCHAR;
+import static org.apache.flink.table.api.DataTypes.YEAR;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * Tests for {@link DataStructureConverters}.
+ */
+@RunWith(Parameterized.class)
+public class DataStructureConvertersTest {
+
+	@Parameters(name = "{index}: {0}")
+	public static List<TestSpec> testData() {
+		// ordered by definition in DataStructureConverters
+		return asList(
+			TestSpec
+				.forDataType(CHAR(5))
+				.convertedTo(String.class, "12345")
+				.convertedTo(byte[].class, "12345".getBytes(StandardCharsets.UTF_8))
+				.convertedTo(StringData.class, StringData.fromString("12345")),
+
+			TestSpec
+				.forDataType(VARCHAR(100))
+				.convertedTo(String.class, "12345")
+				.convertedTo(byte[].class, "12345".getBytes(StandardCharsets.UTF_8))
+				.convertedTo(StringData.class, StringData.fromString("12345")),
+
+			TestSpec
+				.forDataType(BOOLEAN().notNull())
+				.convertedTo(Boolean.class, true)
+				.convertedTo(boolean.class, true),
+
+			TestSpec
+				.forDataType(BINARY(5))
+				.convertedTo(byte[].class, new byte[]{1, 2, 3, 4, 5}),
+
+			TestSpec
+				.forDataType(VARBINARY(100))
+				.convertedTo(byte[].class, new byte[]{1, 2, 3, 4, 5}),
+
+			TestSpec
+				.forDataType(DECIMAL(3, 2))
+				.convertedTo(BigDecimal.class, new BigDecimal("1.23"))
+				.convertedTo(DecimalData.class, DecimalData.fromUnscaledLong(123, 3, 2)),
+
+			// TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE are skipped for simplicity
+
+			TestSpec
+				.forDataType(DATE())
+				.convertedTo(Date.class, Date.valueOf("2010-11-12"))
+				.convertedTo(LocalDate.class, LocalDate.parse("2010-11-12"))
+				.convertedTo(Integer.class, 14_925),
+
+			TestSpec
+				.forDataType(TIME(0))
+				.convertedTo(java.sql.Time.class, java.sql.Time.valueOf("12:34:56"))
+				.convertedTo(LocalTime.class, LocalTime.parse("12:34:56"))
+				.convertedTo(Integer.class, 45_296_000)
+				.convertedTo(Long.class, 45_296_000_000_000L),
+
+			TestSpec
+				.forDataType(TIME(3)) // TODO support precision of 9
+				.convertedTo(LocalTime.class, LocalTime.parse("12:34:56.001"))
+				.convertedTo(Integer.class, 45_296_001),
+
+			TestSpec
+				.forDataType(TIMESTAMP(9))
+				.convertedTo(Timestamp.class, Timestamp.valueOf("2010-11-12 12:34:56.000000001"))
+				.convertedTo(LocalDateTime.class, LocalDateTime.parse("2010-11-12T12:34:56.000000001"))
+				.convertedTo(TimestampData.class, TimestampData.fromEpochMillis(1_289_565_296_000L, 1)),
+
+			TestSpec
+				.forDataType(TIMESTAMP_WITH_TIME_ZONE(0))
+				.convertedTo(
+					ZonedDateTime.class,
+					ZonedDateTime.ofInstant(Instant.EPOCH, ZoneId.of("UTC")))
+				.convertedTo(
+					java.time.OffsetDateTime.class,
+					ZonedDateTime.ofInstant(Instant.EPOCH, ZoneId.of("UTC")).toOffsetDateTime())
+				.expectErrorMessage("Unsupported data type: TIMESTAMP(0) WITH TIME ZONE"),
+
+			TestSpec
+				.forDataType(TIMESTAMP_WITH_LOCAL_TIME_ZONE(0))
+				.convertedTo(Instant.class, Instant.ofEpochSecond(12_345))
+				.convertedTo(Integer.class, 12_345)
+				.convertedTo(Long.class, 12_345_000L)
+				.convertedTo(TimestampData.class, TimestampData.fromEpochMillis(12_345_000)),
+
+			TestSpec
+				.forDataType(TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))
+				.convertedTo(Instant.class, Instant.ofEpochSecond(12_345, 1_000_000))
+				.convertedTo(Long.class, 12_345_001L)
+				.convertedTo(TimestampData.class, TimestampData.fromEpochMillis(12_345_001)),
+
+			TestSpec
+				.forDataType(TIMESTAMP_WITH_LOCAL_TIME_ZONE(9))
+				.convertedTo(Instant.class, Instant.ofEpochSecond(12_345, 1))
+				.convertedTo(TimestampData.class, TimestampData.fromEpochMillis(12_345_000, 1)),
+
+			TestSpec
+				.forDataType(INTERVAL(YEAR(2), MONTH()))
+				.convertedTo(Period.class, Period.of(2, 6, 0))
+				.convertedTo(Integer.class, 30),
+
+			TestSpec
+				.forDataType(INTERVAL(MONTH()))
+				.convertedTo(Period.class, Period.of(0, 30, 0))
+				.convertedTo(Integer.class, 30),
+
+			TestSpec
+				.forDataType(INTERVAL(DAY(), SECOND(3))) // TODO support precision of 9
+				.convertedTo(Duration.class, Duration.ofMillis(123))
+				.convertedTo(Long.class, 123L),
+
+			TestSpec
+				.forDataType(ARRAY(BOOLEAN()))
+				.convertedTo(boolean[].class, new boolean[]{true, false, true, true})
+				.convertedTo(ArrayData.class, new GenericArrayData(new boolean[]{true, false, true, true})),
+
+			// arrays of TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE are skipped for simplicity
+
+			TestSpec
+				.forDataType(ARRAY(DATE()))
+				.convertedTo(
+					LocalDate[].class,
+					new LocalDate[]{null, LocalDate.parse("2010-11-12"), null, LocalDate.parse("2010-11-12")}),
+
+			TestSpec
+				.forDataType(MAP(INT(), BOOLEAN()))
+				.convertedTo(Map.class, createIdentityMap())
+				.convertedTo(MapData.class, new GenericMapData(createIdentityMap())),
+
+			TestSpec
+				.forDataType(MAP(DATE(), BOOLEAN()))
+				.convertedTo(Map.class, createLocalDateMap()),
+
+			TestSpec
+				.forDataType(MULTISET(BOOLEAN()))
+				.convertedTo(Map.class, createIdentityMultiset())
+				.convertedTo(MapData.class, new GenericMapData(createIdentityMultiset())),
+
+			TestSpec
+				.forDataType(MULTISET(DATE()))
+				.convertedTo(Map.class, createLocalDateMultiset()),
+
+			TestSpec
+				.forDataType(
+					ROW(
+						FIELD("a", INT()),
+						FIELD("b",
+							ROW(
+								FIELD("b_1", DOUBLE()),
+								FIELD("b_2", BOOLEAN())))))
+				.convertedTo(Row.class, Row.of(12, Row.of(2.0, null)))
+				.convertedTo(RowData.class, GenericRowData.of(12, GenericRowData.of(2.0, null))),
+
+			TestSpec
+				.forDataType(
+					ROW(
+						FIELD("a", INT()),
+						FIELD("b",
+							ROW(
+								FIELD("b_1", DATE()),
+								FIELD("b_2", DATE())))))
+				.convertedTo(Row.class, Row.of(12, Row.of(LocalDate.ofEpochDay(1), null))),
+
+			TestSpec
+				.forClass(PojoWithMutableFields.class)
+				.convertedToSupplier(
+					PojoWithMutableFields.class,
+					() -> {
+						final PojoWithMutableFields pojo = new PojoWithMutableFields();
+						pojo.age = 42;
+						pojo.name = "Bob";
+						return pojo;
+					})
+				.convertedTo(Row.class, Row.of(42, "Bob"))
+				.convertedTo(RowData.class, GenericRowData.of(42, StringData.fromString("Bob"))),
+
+			TestSpec
+				.forClass(PojoWithImmutableFields.class)
+				.convertedTo(PojoWithImmutableFields.class, new PojoWithImmutableFields(42, "Bob"))
+				.convertedTo(Row.class, Row.of(42, "Bob"))
+				.convertedTo(RowData.class, GenericRowData.of(42, StringData.fromString("Bob"))),
+
+			TestSpec
+				.forClass(PojoWithGettersAndSetters.class)
+				.convertedToSupplier(
+					PojoWithGettersAndSetters.class,
+					() -> {
+						final PojoWithGettersAndSetters pojo = new PojoWithGettersAndSetters();
+						pojo.setAge(42);
+						pojo.setName("Bob");
+						return pojo;
+					})
+				.convertedTo(Row.class, Row.of(42, "Bob"))
+				.convertedTo(RowData.class, GenericRowData.of(42, StringData.fromString("Bob"))),
+
+			TestSpec
+				.forClass(ComplexPojo.class)
+				.convertedToSupplier(
+					ComplexPojo.class,
+					() -> {
+						final ComplexPojo pojo = new ComplexPojo();
+						pojo.setTimestamp(Timestamp.valueOf("2010-11-12 13:14:15.000000001"));
+						pojo.setPreferences(Row.of(42, "Bob", new Boolean[]{true, null, false}));
+						pojo.setBalance(new BigDecimal("1.23"));
+						return pojo;
+					})
+				.convertedTo(
+					Row.class,
+					Row.of(
+						Timestamp.valueOf("2010-11-12 13:14:15.000000001"),
+						Row.of(42, "Bob", new Boolean[]{true, null, false}),
+						new BigDecimal("1.23"))),
+
+			TestSpec
+				.forClass(PojoAsSuperclass.class)
+				.convertedToSupplier(
+					PojoWithMutableFields.class,
+					() -> {
+						final PojoWithMutableFields pojo = new PojoWithMutableFields();
+						pojo.age = 42;
+						pojo.name = "Bob";
+						return pojo;
+					})
+				.convertedTo(Row.class, Row.of(42)),
+
+			TestSpec
+				.forDataType(MAP(STRING(), DataTypes.of(PojoWithImmutableFields.class)))
+				.convertedTo(Map.class, createPojoWithImmutableFieldsMap()),
+
+			TestSpec
+				.forDataType(ARRAY(DataTypes.of(PojoWithNestedPojo.class)))
+				.convertedTo(PojoWithNestedPojo[].class, createPojoWithNestedPojoArray())
+				.convertedTo(
+					Row[].class,
+					new Row[]{
+						Row.of(
+							new PojoWithImmutableFields(42, "Bob"),
+							new PojoWithImmutableFields[]{new PojoWithImmutableFields(42, "Bob"), null}
+						),
+						null,
+						Row.of(
+							null,
+							new PojoWithImmutableFields[3]
+						),
+						Row.of(
+							null,
+							null
+						)
+					})
+		);
+	}
+
+	@Parameter
+	public TestSpec testSpec;
+
+	@Rule
+	public ExpectedException thrown = ExpectedException.none();
+
+	@Test
+	public void testConversions() {
+		if (testSpec.expectedErrorMessage != null) {
+			thrown.expect(TableException.class);
+			thrown.expectMessage(equalTo(testSpec.expectedErrorMessage));
+		}
+		for (Map.Entry<Class<?>, Object> from : testSpec.conversions.entrySet()) {
+			final DataType fromDataType = testSpec.dataType.bridgedTo(from.getKey());
+
+			final DataStructureConverter<Object, Object> fromConverter =
+				simulateSerialization(DataStructureConverters.getConverter(fromDataType));
+			fromConverter.open(DataStructureConvertersTest.class.getClassLoader());
+
+			final Object internalValue = fromConverter.toInternalOrNull(from.getValue());
+
+			for (Map.Entry<Class<?>, Object> to : testSpec.conversions.entrySet()) {
+				final DataType toDataType = testSpec.dataType.bridgedTo(to.getKey());
+
+				final DataStructureConverter<Object, Object> toConverter =
+					simulateSerialization(DataStructureConverters.getConverter(toDataType));
+				toConverter.open(DataStructureConvertersTest.class.getClassLoader());
+
+				assertArrayEquals(
+					new Object[]{to.getValue()},
+					new Object[]{toConverter.toExternalOrNull(internalValue)});
+			}
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Test utilities
+	// --------------------------------------------------------------------------------------------
+
+	private static class TestSpec {
+
+		private final String description;
+
+		private final DataType dataType;
+
+		private final Map<Class<?>, Object> conversions;
+
+		private @Nullable String expectedErrorMessage;
+
+		private TestSpec(String description, DataType dataType) {
+			this.description = description;
+			this.dataType = dataType;
+			this.conversions = new LinkedHashMap<>();
+		}
+
+		static TestSpec forDataType(AbstractDataType<?> dataType) {
+			final DataTypeFactoryMock factoryMock = new DataTypeFactoryMock();
+			final DataType resolvedDataType = factoryMock.createDataType(dataType);
+			return new TestSpec(
+				resolvedDataType.toString(),
+				resolvedDataType);
+		}
+
+		static TestSpec forClass(Class<?> clazz) {
+			return forDataType(DataTypes.of(clazz));
+		}
+
+		<T> TestSpec convertedTo(Class<T> clazz, T value) {
+			conversions.put(clazz, value);
+			return this;
+		}
+
+		<T> TestSpec convertedToSupplier(Class<T> clazz, Supplier<T> supplier) {
+			conversions.put(clazz, supplier.get());
+			return this;
+		}
+
+		TestSpec expectErrorMessage(String expectedErrorMessage) {
+			this.expectedErrorMessage = expectedErrorMessage;
+			return this;
+		}
+
+		@Override
+		public String toString() {
+			return description;
+		}
+	}
+
+	private static DataStructureConverter<Object, Object> simulateSerialization(DataStructureConverter<Object, Object> converter) {
+		try {
+			final byte[] bytes = InstantiationUtil.serializeObject(converter);
+			return InstantiationUtil.deserializeObject(bytes, DataStructureConverter.class.getClassLoader());
+		} catch (Exception e) {
+			throw new AssertionError("Serialization failed.", e);
+		}
+	}
+
+	private static Map<Integer, Boolean> createIdentityMap() {
+		final Map<Integer, Boolean> map = new HashMap<>();
+		map.put(1, true);
+		map.put(2, false);
+		map.put(3, null);
+		map.put(null, true);
+		return map;
+	}
+
+	private static Map<LocalDate, Boolean> createLocalDateMap() {
+		final Map<LocalDate, Boolean> map = new HashMap<>();
+		map.put(LocalDate.ofEpochDay(0), true);
+		map.put(LocalDate.ofEpochDay(1), false);
+		map.put(LocalDate.ofEpochDay(3), null);
+		map.put(null, true);
+		return map;
+	}
+
+	private static Map<String, PojoWithImmutableFields> createPojoWithImmutableFieldsMap() {
+		final Map<String, PojoWithImmutableFields> map = new HashMap<>();
+		map.put("Alice", new PojoWithImmutableFields(12, "Alice"));
+		map.put("Bob", new PojoWithImmutableFields(42, "Bob"));
+		map.put("Unknown", null);
+		return map;
+	}
+
+	private static PojoWithNestedPojo[] createPojoWithNestedPojoArray() {
+		final PojoWithNestedPojo pojo1 = new PojoWithNestedPojo();
+		pojo1.inner = new PojoWithImmutableFields(42, "Bob");
+		pojo1.innerArray = new PojoWithImmutableFields[]{new PojoWithImmutableFields(42, "Bob"), null};
+
+		final PojoWithNestedPojo pojo2 = new PojoWithNestedPojo();
+		pojo2.inner = null;
+		pojo2.innerArray = new PojoWithImmutableFields[3];
+
+		final PojoWithNestedPojo pojo3 = new PojoWithNestedPojo();
+
+		return new PojoWithNestedPojo[]{
+			pojo1,
+			null,
+			pojo2,
+			pojo3
+		};
+	}
+
+	private static Map<Boolean, Integer> createIdentityMultiset() {
+		final Map<Boolean, Integer> map = new HashMap<>();
+		map.put(true, 1);
+		map.put(false, 2);
+		map.put(null, 3);
+		return map;
+	}
+
+	private static Map<LocalDate, Integer> createLocalDateMultiset() {
+		final Map<LocalDate, Integer> map = new HashMap<>();
+		map.put(LocalDate.ofEpochDay(0), 1);
+		map.put(LocalDate.ofEpochDay(1), 2);
+		map.put(null, 3);
+		return map;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Structured types
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * POJO as superclass.
+	 */
+	public static class PojoAsSuperclass {
+		public int age;
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (!(o instanceof PojoAsSuperclass)) {
+				return false;
+			}
+			PojoAsSuperclass that = (PojoAsSuperclass) o;
+			return age == that.age;
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(age);
+		}
+	}
+
+	/**
+	 * POJO with public mutable fields.
+	 */
+	public static class PojoWithMutableFields extends PojoAsSuperclass {
+		public String name;
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null) {
+				return false;
+			}
+			if (!super.equals(o)) {
+				return false;
+			}
+			// modified to support PojoAsSuperclass
+			if (o.getClass() == PojoAsSuperclass.class) {
+				return true;
+			}
+			PojoWithMutableFields that = (PojoWithMutableFields) o;
+			return Objects.equals(name, that.name);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(super.hashCode(), name);
+		}
+	}
+
+	/**
+	 * POJO with immutable fields.
+	 */
+	public static class PojoWithImmutableFields {
+		public final int age;
+		public final String name;
+
+		public PojoWithImmutableFields(int age, String name) {
+			this.age = age;
+			this.name = name;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			PojoWithImmutableFields that = (PojoWithImmutableFields) o;
+			return age == that.age && Objects.equals(name, that.name);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(age, name);
+		}
+	}
+
+	/**
+	 * POJO with default constructor and private fields.
+	 */
+	public static class PojoWithGettersAndSetters {
+		private int age;
+		private String name;
+
+		public int getAge() {
+			return age;
+		}
+
+		public void setAge(int age) {
+			this.age = age;
+		}
+
+		public String getName() {
+			return name;
+		}
+
+		public void setName(String name) {
+			this.name = name;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			PojoWithGettersAndSetters that = (PojoWithGettersAndSetters) o;
+			return age == that.age && Objects.equals(name, that.name);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(age, name);
+		}
+	}
+
+	/**
+	 * POJO with annotations, nested types, and custom field order.
+	 */
+	public static class ComplexPojo {
+		private Timestamp timestamp;
+		private @DataTypeHint("ROW<age INT, name STRING, mask ARRAY<BOOLEAN>>") Row preferences;
+		private @DataTypeHint("DECIMAL(3, 2)") BigDecimal balance;
+
+		public ComplexPojo() {
+			// default constructor
+		}
+
+		// determines the order of the fields
+		public ComplexPojo(Timestamp timestamp, Row preferences, BigDecimal balance) {
+			this.timestamp = timestamp;
+			this.preferences = preferences;
+			this.balance = balance;
+		}
+
+		public Timestamp getTimestamp() {
+			return timestamp;
+		}
+
+		public void setTimestamp(Timestamp timestamp) {
+			this.timestamp = timestamp;
+		}
+
+		public Row getPreferences() {
+			return preferences;
+		}
+
+		public void setPreferences(Row preferences) {
+			this.preferences = preferences;
+		}
+
+		public BigDecimal getBalance() {
+			return balance;
+		}
+
+		public void setBalance(BigDecimal balance) {
+			this.balance = balance;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			ComplexPojo that = (ComplexPojo) o;
+			return Objects.equals(timestamp, that.timestamp) &&
+				Objects.equals(preferences, that.preferences) &&
+				Objects.equals(balance, that.balance);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(timestamp, preferences, balance);
+		}
+	}
+
+	/**
+	 * POJO with nested fields.
+	 */
+	public static class PojoWithNestedPojo {
+		public PojoWithImmutableFields inner;
+
+		public PojoWithImmutableFields[] innerArray;
+
+		public  PojoWithNestedPojo() {
+			// default constructor
+		}
+
+		public PojoWithNestedPojo(
+				PojoWithImmutableFields inner,
+				PojoWithImmutableFields[] innerArray) {
+			this.inner = inner;
+			this.innerArray = innerArray;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			PojoWithNestedPojo that = (PojoWithNestedPojo) o;
+			return Objects.equals(inner, that.inner) &&
+				Arrays.equals(innerArray, that.innerArray);
+		}
+
+		@Override
+		public int hashCode() {
+			int result = Objects.hash(inner);
+			result = 31 * result + Arrays.hashCode(innerArray);
+			return result;
+		}
+	}
+}