You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2020/05/16 11:00:34 UTC
[flink] 05/05: [FLINK-16999][table-runtime-blink] Add converters
for all data types and conversion classes
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8bc744285531bd0b650c799be0a591c3492124f2
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed May 13 18:59:11 2020 +0200
[FLINK-16999][table-runtime-blink] Add converters for all data types and conversion classes
This adds converters for all data types and all conversion classes. It refactors the
DataFormatConverter for cleaner readability and adds a lot of tests.
Among other conversions, it adds complete support for structured types
(incl. immutable POJOs).
The only not 100% unsupported types are TimeType and DayTimeIntervalType.
This closes #12135.
---
.../flink/table/catalog/DataTypeFactoryImpl.java | 1 +
.../org/apache/flink/table/data/ArrayData.java | 3 +
.../java/org/apache/flink/table/data/RowData.java | 3 +
.../table/types/extraction/ExtractionUtils.java | 34 +
flink-table/flink-table-runtime-blink/pom.xml | 8 +
.../conversion/ArrayBooleanArrayConverter.java | 43 ++
.../data/conversion/ArrayByteArrayConverter.java | 43 ++
.../data/conversion/ArrayDoubleArrayConverter.java | 43 ++
.../data/conversion/ArrayFloatArrayConverter.java | 43 ++
.../data/conversion/ArrayIntArrayConverter.java | 43 ++
.../data/conversion/ArrayLongArrayConverter.java | 43 ++
.../data/conversion/ArrayObjectArrayConverter.java | 206 ++++++
.../data/conversion/ArrayShortArrayConverter.java | 43 ++
.../data/conversion/DataStructureConverter.java | 80 +++
.../data/conversion/DataStructureConverters.java | 224 +++++++
.../table/data/conversion/DateDateConverter.java | 42 ++
.../data/conversion/DateLocalDateConverter.java | 42 ++
.../DayTimeIntervalDurationConverter.java | 43 ++
.../conversion/DecimalBigDecimalConverter.java | 63 ++
.../table/data/conversion/IdentityConverter.java | 40 ++
.../LocalZonedTimestampInstantConverter.java | 42 ++
.../LocalZonedTimestampIntConverter.java | 42 ++
.../LocalZonedTimestampLongConverter.java | 42 ++
.../table/data/conversion/MapMapConverter.java | 125 ++++
.../data/conversion/RawByteArrayConverter.java | 59 ++
.../table/data/conversion/RawObjectConverter.java | 59 ++
.../table/data/conversion/RowRowConverter.java | 95 +++
.../data/conversion/StringByteArrayConverter.java | 43 ++
.../data/conversion/StringStringConverter.java | 43 ++
.../data/conversion/StructuredObjectConverter.java | 276 ++++++++
.../data/conversion/TimeLocalTimeConverter.java | 42 ++
.../table/data/conversion/TimeLongConverter.java | 41 ++
.../table/data/conversion/TimeTimeConverter.java | 42 ++
.../TimestampLocalDateTimeConverter.java | 42 ++
.../conversion/TimestampTimestampConverter.java | 42 ++
.../YearMonthIntervalPeriodConverter.java | 78 +++
.../flink/table/data/writer/BinaryWriter.java | 1 +
.../table/runtime/types/InternalSerializers.java | 50 +-
.../table/data/DataStructureConvertersTest.java | 730 +++++++++++++++++++++
39 files changed, 2961 insertions(+), 23 deletions(-)
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DataTypeFactoryImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DataTypeFactoryImpl.java
index 9b0e76e..6ceb3f6 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DataTypeFactoryImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DataTypeFactoryImpl.java
@@ -151,6 +151,7 @@ final class DataTypeFactoryImpl implements DataTypeFactory {
private LogicalType resolveType(UnresolvedIdentifier identifier) {
assert identifier != null;
+ // TODO validate implementation class of structured types when converting from LogicalType to DataType
throw new TableException("User-defined types are not supported yet.");
}
}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/ArrayData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/ArrayData.java
index 59c6b2f..5132458 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/ArrayData.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/ArrayData.java
@@ -217,6 +217,9 @@ public interface ArrayData {
return array.getMap(pos);
case ROW:
return array.getRow(pos, ((RowType) elementType).getFieldCount());
+ case STRUCTURED_TYPE:
+ // not the most efficient code but ok for a deprecated method
+ return array.getRow(pos, getFieldCount(elementType));
case BINARY:
case VARBINARY:
return array.getBinary(pos);
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RowData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RowData.java
index e4f2c5a..e8c5afb 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RowData.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RowData.java
@@ -281,6 +281,9 @@ public interface RowData {
return row.getMap(pos);
case ROW:
return row.getRow(pos, ((RowType) fieldType).getFieldCount());
+ case STRUCTURED_TYPE:
+ // not the most efficient code but ok for a deprecated method
+ return row.getRow(pos, getFieldCount(fieldType));
case BINARY:
case VARBINARY:
return row.getBinary(pos);
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java
index b3ed7a3..56e5266 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java
@@ -146,6 +146,25 @@ public final class ExtractionUtils {
}
/**
+ * Returns the field of a structured type. The logic is as broad as possible to support
+ * both Java and Scala in different flavors.
+ */
+ public static Field getStructuredField(Class<?> clazz, String fieldName) {
+ final String normalizedFieldName = fieldName.toUpperCase();
+
+ final List<Field> fields = collectStructuredFields(clazz);
+ for (Field field : fields) {
+ if (field.getName().toUpperCase().equals(normalizedFieldName)) {
+ return field;
+ }
+ }
+ throw extractionError(
+ "Could not to find a field named '%s' in class '%s' for structured type.",
+ fieldName,
+ clazz.getName());
+ }
+
+ /**
* Checks for a field getter of a structured type. The logic is as broad as possible to support
* both Java and Scala in different flavors.
*/
@@ -260,6 +279,21 @@ public final class ExtractionUtils {
return Modifier.isPublic(m);
}
+ /**
+ * Checks whether a field is directly writable without a setter or constructor.
+ */
+ public static boolean isStructuredFieldDirectlyWritable(Field field) {
+ final int m = field.getModifiers();
+
+ // field is immutable
+ if (Modifier.isFinal(m)) {
+ return false;
+ }
+
+ // field is directly writable
+ return Modifier.isPublic(m);
+ }
+
// --------------------------------------------------------------------------------------------
// Methods intended for this package
// --------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-runtime-blink/pom.xml b/flink-table/flink-table-runtime-blink/pom.xml
index 62115b9..53d745c 100644
--- a/flink-table/flink-table-runtime-blink/pom.xml
+++ b/flink-table/flink-table-runtime-blink/pom.xml
@@ -153,6 +153,14 @@ under the License.
<type>test-jar</type>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayBooleanArrayConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayBooleanArrayConverter.java
new file mode 100644
index 0000000..3939413
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayBooleanArrayConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.types.logical.ArrayType;
+
+/**
+ * Converter for {@link ArrayType} of {@code boolean[]} external type.
+ */
+@Internal
+class ArrayBooleanArrayConverter implements DataStructureConverter<ArrayData, boolean[]> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ArrayData toInternal(boolean[] external) {
+ return new GenericArrayData(external);
+ }
+
+ @Override
+ public boolean[] toExternal(ArrayData internal) {
+ return internal.toBooleanArray();
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayByteArrayConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayByteArrayConverter.java
new file mode 100644
index 0000000..6c8b665
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayByteArrayConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.types.logical.ArrayType;
+
+/**
+ * Converter for {@link ArrayType} of {@code byte[]} external type.
+ */
+@Internal
+class ArrayByteArrayConverter implements DataStructureConverter<ArrayData, byte[]> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ArrayData toInternal(byte[] external) {
+ return new GenericArrayData(external);
+ }
+
+ @Override
+ public byte[] toExternal(ArrayData internal) {
+ return internal.toByteArray();
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayDoubleArrayConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayDoubleArrayConverter.java
new file mode 100644
index 0000000..b442ff9
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayDoubleArrayConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.types.logical.ArrayType;
+
+/**
+ * Converter for {@link ArrayType} of {@code double[]} external type.
+ */
+@Internal
+class ArrayDoubleArrayConverter implements DataStructureConverter<ArrayData, double[]> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ArrayData toInternal(double[] external) {
+ return new GenericArrayData(external);
+ }
+
+ @Override
+ public double[] toExternal(ArrayData internal) {
+ return internal.toDoubleArray();
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayFloatArrayConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayFloatArrayConverter.java
new file mode 100644
index 0000000..3b8bf15
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayFloatArrayConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.types.logical.ArrayType;
+
+/**
+ * Converter for {@link ArrayType} of {@code float[]} external type.
+ */
+@Internal
+class ArrayFloatArrayConverter implements DataStructureConverter<ArrayData, float[]> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ArrayData toInternal(float[] external) {
+ return new GenericArrayData(external);
+ }
+
+ @Override
+ public float[] toExternal(ArrayData internal) {
+ return internal.toFloatArray();
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayIntArrayConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayIntArrayConverter.java
new file mode 100644
index 0000000..fe8880a
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayIntArrayConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.types.logical.ArrayType;
+
+/**
+ * Converter for {@link ArrayType} of {@code int[]} external type.
+ */
+@Internal
+class ArrayIntArrayConverter implements DataStructureConverter<ArrayData, int[]> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ArrayData toInternal(int[] external) {
+ return new GenericArrayData(external);
+ }
+
+ @Override
+ public int[] toExternal(ArrayData internal) {
+ return internal.toIntArray();
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayLongArrayConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayLongArrayConverter.java
new file mode 100644
index 0000000..963d146
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayLongArrayConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.types.logical.ArrayType;
+
+/**
+ * Converter for {@link ArrayType} of {@code long[]} external type.
+ */
+@Internal
+class ArrayLongArrayConverter implements DataStructureConverter<ArrayData, long[]> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ArrayData toInternal(long[] external) {
+ return new GenericArrayData(external);
+ }
+
+ @Override
+ public long[] toExternal(ArrayData internal) {
+ return internal.toLongArray();
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java
new file mode 100644
index 0000000..761d758
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.binary.BinaryArrayData;
+import org.apache.flink.table.data.writer.BinaryArrayWriter;
+import org.apache.flink.table.data.writer.BinaryWriter;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.Serializable;
+import java.lang.reflect.Array;
+
+/**
+ * Converter for {@link ArrayType} of nested primitive or object arrays external types.
+ */
+@Internal
+@SuppressWarnings("unchecked")
+class ArrayObjectArrayConverter<E> implements DataStructureConverter<ArrayData, E[]> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final Class<E> elementClass;
+
+ private final int elementSize;
+
+ private final BinaryArrayWriter.NullSetter writerNullSetter;
+
+ private final BinaryWriter.ValueSetter writerValueSetter;
+
+ private final GenericToJavaArrayConverter<E> genericToJavaArrayConverter;
+
+ private transient BinaryArrayData reuseArray;
+
+ private transient BinaryArrayWriter reuseWriter;
+
+ final boolean hasInternalElements;
+
+ final ArrayData.ElementGetter elementGetter;
+
+ final DataStructureConverter<Object, E> elementConverter;
+
+ private ArrayObjectArrayConverter(
+ Class<E> elementClass,
+ int elementSize,
+ BinaryArrayWriter.NullSetter writerNullSetter,
+ BinaryWriter.ValueSetter writerValueSetter,
+ GenericToJavaArrayConverter<E> genericToJavaArrayConverter,
+ ArrayData.ElementGetter elementGetter,
+ DataStructureConverter<Object, E> elementConverter) {
+ this.elementClass = elementClass;
+ this.elementSize = elementSize;
+ this.writerNullSetter = writerNullSetter;
+ this.writerValueSetter = writerValueSetter;
+ this.genericToJavaArrayConverter = genericToJavaArrayConverter;
+ this.hasInternalElements = elementConverter instanceof IdentityConverter;
+ this.elementGetter = elementGetter;
+ this.elementConverter = elementConverter;
+ }
+
+ @Override
+ public void open(ClassLoader classLoader) {
+ reuseArray = new BinaryArrayData();
+ reuseWriter = new BinaryArrayWriter(reuseArray, 0, elementSize);
+ elementConverter.open(classLoader);
+ }
+
+ @Override
+ public ArrayData toInternal(E[] external) {
+ return hasInternalElements ? new GenericArrayData(external) : toBinaryArrayData(external);
+ }
+
+ @Override
+ public E[] toExternal(ArrayData internal) {
+ if (hasInternalElements && internal instanceof GenericArrayData) {
+ final GenericArrayData genericArray = (GenericArrayData) internal;
+ if (genericArray.isPrimitiveArray()) {
+ return genericToJavaArrayConverter.convert((GenericArrayData) internal);
+ }
+ return (E[]) genericArray.toObjectArray();
+ }
+ return toJavaArray(internal);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Runtime helper methods
+ // --------------------------------------------------------------------------------------------
+
+ private ArrayData toBinaryArrayData(E[] external) {
+ final int length = external.length;
+ allocateWriter(length);
+ for (int pos = 0; pos < length; pos++) {
+ writeElement(pos, external[pos]);
+ }
+ return completeWriter();
+ }
+
+ private E[] toJavaArray(ArrayData internal) {
+ final int size = internal.size();
+ final E[] values = (E[]) Array.newInstance(elementClass, size);
+ for (int pos = 0; pos < size; pos++) {
+ final Object value = elementGetter.getElementOrNull(internal, pos);
+ values[pos] = elementConverter.toExternalOrNull(value);
+ }
+ return values;
+ }
+
+ interface GenericToJavaArrayConverter<E> extends Serializable {
+ E[] convert(GenericArrayData internal);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Shared code
+ // --------------------------------------------------------------------------------------------
+
+ void allocateWriter(int length) {
+ if (reuseWriter.getNumElements() != length) {
+ reuseWriter = new BinaryArrayWriter(reuseArray, length, elementSize);
+ } else {
+ reuseWriter.reset();
+ }
+ }
+
+ void writeElement(int pos, E element) {
+ if (element == null) {
+ writerNullSetter.setNull(reuseWriter, pos);
+ } else {
+ writerValueSetter.setValue(reuseWriter, pos, elementConverter.toInternal(element));
+ }
+ }
+
+ BinaryArrayData completeWriter() {
+ reuseWriter.complete();
+ return reuseArray;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Factory method
+ // --------------------------------------------------------------------------------------------
+
+ public static ArrayObjectArrayConverter<?> create(DataType dataType) {
+ return createForElement(dataType.getChildren().get(0));
+ }
+
+ public static <E> ArrayObjectArrayConverter<E> createForElement(DataType elementDataType) {
+ final LogicalType elementType = elementDataType.getLogicalType();
+ return new ArrayObjectArrayConverter<>(
+ (Class<E>) elementDataType.getConversionClass(),
+ BinaryArrayData.calculateFixLengthPartSize(elementType),
+ BinaryArrayWriter.createNullSetter(elementType),
+ BinaryWriter.createValueSetter(elementType),
+ createGenericToJavaArrayConverter(elementType),
+ ArrayData.createElementGetter(elementType),
+ (DataStructureConverter<Object, E>) DataStructureConverters.getConverter(elementDataType)
+ );
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <E> GenericToJavaArrayConverter<E> createGenericToJavaArrayConverter(LogicalType elementType) {
+ switch (elementType.getTypeRoot()) {
+ case BOOLEAN:
+ return internal -> (E[]) ArrayUtils.toObject(internal.toBooleanArray());
+ case TINYINT:
+ return internal -> (E[]) ArrayUtils.toObject(internal.toByteArray());
+ case SMALLINT:
+ return internal -> (E[]) ArrayUtils.toObject(internal.toShortArray());
+ case INTEGER:
+ return internal -> (E[]) ArrayUtils.toObject(internal.toIntArray());
+ case BIGINT:
+ return internal -> (E[]) ArrayUtils.toObject(internal.toLongArray());
+ case FLOAT:
+ return internal -> (E[]) ArrayUtils.toObject(internal.toFloatArray());
+ case DOUBLE:
+ return internal -> (E[]) ArrayUtils.toObject(internal.toDoubleArray());
+ case DISTINCT_TYPE:
+ return createGenericToJavaArrayConverter(((DistinctType) elementType).getSourceType());
+ default:
+ return internal -> {
+ throw new IllegalStateException();
+ };
+ }
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayShortArrayConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayShortArrayConverter.java
new file mode 100644
index 0000000..3b48ea4
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayShortArrayConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.types.logical.ArrayType;
+
+/**
+ * Converter for {@link ArrayType} of {@code short[]} external type.
+ */
+@Internal
+class ArrayShortArrayConverter implements DataStructureConverter<ArrayData, short[]> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ArrayData toInternal(short[] external) {
+ return new GenericArrayData(external);
+ }
+
+ @Override
+ public short[] toExternal(ArrayData internal) {
+ return internal.toShortArray();
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverter.java
new file mode 100644
index 0000000..d2df1e5
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverter.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+import java.io.Serializable;
+
+/**
+ * Converter between internal and external data structure.
+ *
+ * <p>Converters are serializable and can be passed to runtime operators.
+ *
+ * @param <I> internal data structure (see {@link RowData})
+ * @param <E> external data structure (see {@link DataType#getConversionClass()})
+ */
+@Internal
+public interface DataStructureConverter<I, E> extends Serializable {
+
+ default void open(ClassLoader classLoader) {
+ assert classLoader != null;
+ // nothing to do
+ }
+
+ /**
+ * Converts to internal data structure.
+ *
+ * <p>Note: Parameter must not be null. Output must not be null.
+ */
+ I toInternal(E external);
+
+ /**
+ * Converts to internal data structure or {@code null}.
+ *
+ * <p>The nullability could be derived from the data type. However, this method reduces null checks.
+ */
+ default I toInternalOrNull(E external) {
+ if (external == null) {
+ return null;
+ }
+ return toInternal(external);
+ }
+
+ /**
+ * Converts to external data structure.
+ *
+ * <p>Note: Parameter must not be null. Output must not be null.
+ */
+ E toExternal(I internal);
+
+ /**
+ * Converts to external data structure or {@code null}.
+ *
+ * <p>The nullability could be derived from the data type. However, this method reduces null checks.
+ */
+ default E toExternalOrNull(I internal) {
+ if (internal == null) {
+ return null;
+ }
+ return toExternal(internal);
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverters.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverters.java
new file mode 100644
index 0000000..1d07d24
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverters.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.types.Row;
+
+import java.math.BigDecimal;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+/**
+ * Registry of available data structure converters.
+ *
+ * <p>Data structure converters are used at the edges for the API for converting between internal
+ * structures (see {@link RowData}) and external structures (see {@link DataType#getConversionClass()}).
+ *
+ * <p>This is useful for UDFs, sources, sinks, or exposing data in the API (e.g. via a {@code collect()}).
+ *
+ * <p>Note: It is NOT the responsibility of a converter to normalize the data. Thus, a converter does
+ * neither change the precision of a timestamp nor prune/expand strings to their defined length. This
+ * might be the responsibility of data classes that are called transitively.
+ */
+@Internal
+public final class DataStructureConverters {
+
+ private static final Map<ConverterIdentifier<?>, DataStructureConverterFactory> converters = new HashMap<>();
+ static {
+ // ordered by type root and conversion class definition
+ putConverter(LogicalTypeRoot.CHAR, String.class, constructor(StringStringConverter::new));
+ putConverter(LogicalTypeRoot.CHAR, byte[].class, constructor(StringByteArrayConverter::new));
+ putConverter(LogicalTypeRoot.CHAR, StringData.class, identity());
+ putConverter(LogicalTypeRoot.VARCHAR, String.class, constructor(StringStringConverter::new));
+ putConverter(LogicalTypeRoot.VARCHAR, byte[].class, constructor(StringByteArrayConverter::new));
+ putConverter(LogicalTypeRoot.VARCHAR, StringData.class, identity());
+ putConverter(LogicalTypeRoot.BOOLEAN, Boolean.class, identity());
+ putConverter(LogicalTypeRoot.BOOLEAN, boolean.class, identity());
+ putConverter(LogicalTypeRoot.BINARY, byte[].class, identity());
+ putConverter(LogicalTypeRoot.VARBINARY, byte[].class, identity());
+ putConverter(LogicalTypeRoot.DECIMAL, BigDecimal.class, DecimalBigDecimalConverter::create);
+ putConverter(LogicalTypeRoot.DECIMAL, DecimalData.class, identity());
+ putConverter(LogicalTypeRoot.TINYINT, Byte.class, identity());
+ putConverter(LogicalTypeRoot.TINYINT, byte.class, identity());
+ putConverter(LogicalTypeRoot.SMALLINT, Short.class, identity());
+ putConverter(LogicalTypeRoot.SMALLINT, short.class, identity());
+ putConverter(LogicalTypeRoot.INTEGER, Integer.class, identity());
+ putConverter(LogicalTypeRoot.INTEGER, int.class, identity());
+ putConverter(LogicalTypeRoot.BIGINT, Long.class, identity());
+ putConverter(LogicalTypeRoot.BIGINT, long.class, identity());
+ putConverter(LogicalTypeRoot.FLOAT, Float.class, identity());
+ putConverter(LogicalTypeRoot.FLOAT, float.class, identity());
+ putConverter(LogicalTypeRoot.DOUBLE, Double.class, identity());
+ putConverter(LogicalTypeRoot.DOUBLE, double.class, identity());
+ putConverter(LogicalTypeRoot.DATE, java.sql.Date.class, constructor(DateDateConverter::new));
+ putConverter(LogicalTypeRoot.DATE, java.time.LocalDate.class, constructor(DateLocalDateConverter::new));
+ putConverter(LogicalTypeRoot.DATE, Integer.class, identity());
+ putConverter(LogicalTypeRoot.DATE, int.class, identity());
+ putConverter(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE, java.sql.Time.class, constructor(TimeTimeConverter::new));
+ putConverter(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE, java.time.LocalTime.class, constructor(TimeLocalTimeConverter::new));
+ putConverter(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE, Integer.class, identity());
+ putConverter(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE, int.class, identity());
+ putConverter(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE, Long.class, constructor(TimeLongConverter::new));
+ putConverter(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE, long.class, constructor(TimeLongConverter::new));
+ putConverter(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, java.sql.Timestamp.class, constructor(TimestampTimestampConverter::new));
+ putConverter(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, java.time.LocalDateTime.class, constructor(TimestampLocalDateTimeConverter::new));
+ putConverter(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, TimestampData.class, identity());
+ putConverter(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE, java.time.ZonedDateTime.class, unsupported());
+ putConverter(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE, java.time.OffsetDateTime.class, unsupported());
+ putConverter(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, java.time.Instant.class, constructor(LocalZonedTimestampInstantConverter::new));
+ putConverter(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, Integer.class, constructor(LocalZonedTimestampIntConverter::new));
+ putConverter(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, int.class, constructor(LocalZonedTimestampIntConverter::new));
+ putConverter(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, Long.class, constructor(LocalZonedTimestampLongConverter::new));
+ putConverter(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, long.class, constructor(LocalZonedTimestampLongConverter::new));
+ putConverter(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, TimestampData.class, identity());
+ putConverter(LogicalTypeRoot.INTERVAL_YEAR_MONTH, java.time.Period.class, YearMonthIntervalPeriodConverter::create);
+ putConverter(LogicalTypeRoot.INTERVAL_YEAR_MONTH, Integer.class, identity());
+ putConverter(LogicalTypeRoot.INTERVAL_YEAR_MONTH, int.class, identity());
+ putConverter(LogicalTypeRoot.INTERVAL_DAY_TIME, java.time.Duration.class, constructor(DayTimeIntervalDurationConverter::new));
+ putConverter(LogicalTypeRoot.INTERVAL_DAY_TIME, Long.class, identity());
+ putConverter(LogicalTypeRoot.INTERVAL_DAY_TIME, long.class, identity());
+ putConverter(LogicalTypeRoot.ARRAY, ArrayData.class, identity());
+ putConverter(LogicalTypeRoot.ARRAY, boolean[].class, constructor(ArrayBooleanArrayConverter::new));
+ putConverter(LogicalTypeRoot.ARRAY, byte[].class, constructor(ArrayByteArrayConverter::new));
+ putConverter(LogicalTypeRoot.ARRAY, short[].class, constructor(ArrayShortArrayConverter::new));
+ putConverter(LogicalTypeRoot.ARRAY, int[].class, constructor(ArrayIntArrayConverter::new));
+ putConverter(LogicalTypeRoot.ARRAY, long[].class, constructor(ArrayLongArrayConverter::new));
+ putConverter(LogicalTypeRoot.ARRAY, float[].class, constructor(ArrayFloatArrayConverter::new));
+ putConverter(LogicalTypeRoot.ARRAY, double[].class, constructor(ArrayDoubleArrayConverter::new));
+ putConverter(LogicalTypeRoot.MAP, Map.class, MapMapConverter::createForMapType);
+ putConverter(LogicalTypeRoot.MAP, MapData.class, identity());
+ putConverter(LogicalTypeRoot.MULTISET, Map.class, MapMapConverter::createForMultisetType);
+ putConverter(LogicalTypeRoot.MULTISET, MapData.class, identity());
+ putConverter(LogicalTypeRoot.ROW, Row.class, RowRowConverter::create);
+ putConverter(LogicalTypeRoot.ROW, RowData.class, identity());
+ putConverter(LogicalTypeRoot.STRUCTURED_TYPE, Row.class, RowRowConverter::create);
+ putConverter(LogicalTypeRoot.STRUCTURED_TYPE, RowData.class, identity());
+ putConverter(LogicalTypeRoot.RAW, byte[].class, RawByteArrayConverter::create);
+ putConverter(LogicalTypeRoot.RAW, RawValueData.class, identity());
+ }
+
+ /**
+ * Returns a converter for the given {@link DataType}.
+ */
+ @SuppressWarnings("unchecked")
+ public static DataStructureConverter<Object, Object> getConverter(DataType dataType) {
+ // cast to Object for ease of use
+ return (DataStructureConverter<Object, Object>) getConverterInternal(dataType);
+ }
+
+ private static DataStructureConverter<?, ?> getConverterInternal(DataType dataType) {
+ final LogicalType logicalType = dataType.getLogicalType();
+ final DataStructureConverterFactory factory = converters.get(
+ new ConverterIdentifier<>(
+ logicalType.getTypeRoot(),
+ dataType.getConversionClass()));
+ if (factory != null) {
+ return factory.createConverter(dataType);
+ }
+ // special cases
+ switch (logicalType.getTypeRoot()) {
+ case ARRAY:
+ return ArrayObjectArrayConverter.create(dataType);
+ case DISTINCT_TYPE:
+ return getConverterInternal(dataType.getChildren().get(0));
+ case STRUCTURED_TYPE:
+ return StructuredObjectConverter.create(dataType);
+ case RAW:
+ return RawObjectConverter.create(dataType);
+ default:
+ throw new TableException("Could not find converter for data type: " + dataType);
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Helper methods
+ // --------------------------------------------------------------------------------------------
+
+ private static <E> void putConverter(
+ LogicalTypeRoot root,
+ Class<E> conversionClass,
+ DataStructureConverterFactory factory) {
+ converters.put(new ConverterIdentifier<>(root, conversionClass), factory);
+ }
+
+ private static DataStructureConverterFactory identity() {
+ return constructor(IdentityConverter::new);
+ }
+
+ private static DataStructureConverterFactory constructor(Supplier<DataStructureConverter<?, ?>> supplier) {
+ return dataType -> supplier.get();
+ }
+
+ private static DataStructureConverterFactory unsupported() {
+ return dataType -> {
+ throw new TableException("Unsupported data type: " + dataType);
+ };
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Helper classes
+ // --------------------------------------------------------------------------------------------
+
+ private static class ConverterIdentifier<E> {
+
+ final LogicalTypeRoot root;
+
+ final Class<E> conversionClass;
+
+ ConverterIdentifier(LogicalTypeRoot root, Class<E> conversionClass) {
+ this.root = root;
+ this.conversionClass = conversionClass;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ConverterIdentifier<?> that = (ConverterIdentifier<?>) o;
+ return root == that.root && conversionClass.equals(that.conversionClass);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(root, conversionClass);
+ }
+ }
+
+ private interface DataStructureConverterFactory {
+ DataStructureConverter<?, ?> createConverter(DataType dt);
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DateDateConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DateDateConverter.java
new file mode 100644
index 0000000..e980891
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DateDateConverter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.runtime.functions.SqlDateTimeUtils;
+import org.apache.flink.table.types.logical.DateType;
+
+/**
+ * Converter for {@link DateType} of {@link java.sql.Date} external type.
+ */
+@Internal
+class DateDateConverter implements DataStructureConverter<Integer, java.sql.Date> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer toInternal(java.sql.Date external) {
+ return SqlDateTimeUtils.dateToInternal(external);
+ }
+
+ @Override
+ public java.sql.Date toExternal(Integer internal) {
+ return SqlDateTimeUtils.internalToDate(internal);
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DateLocalDateConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DateLocalDateConverter.java
new file mode 100644
index 0000000..6df885d
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DateLocalDateConverter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.runtime.functions.SqlDateTimeUtils;
+import org.apache.flink.table.types.logical.DateType;
+
+/**
+ * Converter for {@link DateType} of {@link java.time.LocalDate} external type.
+ */
+@Internal
+class DateLocalDateConverter implements DataStructureConverter<Integer, java.time.LocalDate> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer toInternal(java.time.LocalDate external) {
+ return SqlDateTimeUtils.localDateToUnixDate(external);
+ }
+
+ @Override
+ public java.time.LocalDate toExternal(Integer internal) {
+ return SqlDateTimeUtils.unixDateToLocalDate(internal);
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DayTimeIntervalDurationConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DayTimeIntervalDurationConverter.java
new file mode 100644
index 0000000..1ea01c5
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DayTimeIntervalDurationConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+
+import java.time.Duration;
+
+/**
+ * Converter for {@link DayTimeIntervalType} of {@link java.time.Duration} external type.
+ */
+@Internal
+class DayTimeIntervalDurationConverter implements DataStructureConverter<Long, java.time.Duration> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Long toInternal(java.time.Duration external) {
+ return external.toMillis();
+ }
+
+ @Override
+ public java.time.Duration toExternal(Long internal) {
+ return Duration.ofMillis(internal);
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DecimalBigDecimalConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DecimalBigDecimalConverter.java
new file mode 100644
index 0000000..850b6a0
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DecimalBigDecimalConverter.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DecimalType;
+
+import java.math.BigDecimal;
+
+/**
+ * Converter for {@link DecimalType} of {@link BigDecimal} external type.
+ */
+@Internal
+class DecimalBigDecimalConverter implements DataStructureConverter<DecimalData, BigDecimal> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int precision;
+
+ private final int scale;
+
+ private DecimalBigDecimalConverter(int precision, int scale) {
+ this.precision = precision;
+ this.scale = scale;
+ }
+
+ @Override
+ public DecimalData toInternal(BigDecimal external) {
+ return DecimalData.fromBigDecimal(external, precision, scale);
+ }
+
+ @Override
+ public BigDecimal toExternal(DecimalData internal) {
+ return internal.toBigDecimal();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Factory method
+ // --------------------------------------------------------------------------------------------
+
+ static DecimalBigDecimalConverter create(DataType dataType) {
+ final DecimalType decimalType = (DecimalType) dataType.getLogicalType();
+ return new DecimalBigDecimalConverter(decimalType.getPrecision(), decimalType.getScale());
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/IdentityConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/IdentityConverter.java
new file mode 100644
index 0000000..8d9c874
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/IdentityConverter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * No-op converter that just forwards its input.
+ */
+@Internal
+class IdentityConverter<I> implements DataStructureConverter<I, I> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public I toInternal(I external) {
+ return external;
+ }
+
+ @Override
+ public I toExternal(I internal) {
+ return internal;
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampInstantConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampInstantConverter.java
new file mode 100644
index 0000000..fffefbe
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampInstantConverter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+
+/**
+ * Converter for {@link LocalZonedTimestampType} of {@link java.time.Instant} external type.
+ */
+@Internal
+class LocalZonedTimestampInstantConverter implements DataStructureConverter<TimestampData, java.time.Instant> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public TimestampData toInternal(java.time.Instant external) {
+ return TimestampData.fromInstant(external);
+ }
+
+ @Override
+ public java.time.Instant toExternal(TimestampData internal) {
+ return internal.toInstant();
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampIntConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampIntConverter.java
new file mode 100644
index 0000000..349d34e
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampIntConverter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+
+/**
+ * Converter for {@link LocalZonedTimestampType} of {@link Integer} external type.
+ */
+@Internal
+class LocalZonedTimestampIntConverter implements DataStructureConverter<TimestampData, Integer> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public TimestampData toInternal(Integer external) {
+ return TimestampData.fromEpochMillis(((long) external) * 1000);
+ }
+
+ @Override
+ public Integer toExternal(TimestampData internal) {
+ return ((int) internal.getMillisecond() / 1000);
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampLongConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampLongConverter.java
new file mode 100644
index 0000000..6ddb7eb
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampLongConverter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+
+/**
+ * Converter for {@link LocalZonedTimestampType} of {@link Long} external type.
+ */
+@Internal
+class LocalZonedTimestampLongConverter implements DataStructureConverter<TimestampData, Long> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public TimestampData toInternal(Long external) {
+ return TimestampData.fromEpochMillis(external);
+ }
+
+ @Override
+ public Long toExternal(TimestampData internal) {
+ return internal.getMillisecond();
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/MapMapConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/MapMapConverter.java
new file mode 100644
index 0000000..24131b0
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/MapMapConverter.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.binary.BinaryMapData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Converter for {@link MapType}/{@link MultisetType} of {@link Map} external type.
+ */
+@Internal
+class MapMapConverter<K, V> implements DataStructureConverter<MapData, Map<K, V>> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final ArrayObjectArrayConverter<K> keyConverter;
+
+ private final ArrayObjectArrayConverter<V> valueConverter;
+
+ private final boolean hasInternalEntries;
+
+ private MapMapConverter(
+ ArrayObjectArrayConverter<K> keyConverter,
+ ArrayObjectArrayConverter<V> valueConverter) {
+ this.keyConverter = keyConverter;
+ this.valueConverter = valueConverter;
+ this.hasInternalEntries = keyConverter.hasInternalElements && valueConverter.hasInternalElements;
+ }
+
+ @Override
+ public void open(ClassLoader classLoader) {
+ keyConverter.open(classLoader);
+ valueConverter.open(classLoader);
+ }
+
+ @Override
+ public MapData toInternal(Map<K, V> external) {
+ if (hasInternalEntries) {
+ return new GenericMapData(external);
+ }
+ return toBinaryMapData(external);
+ }
+
+ @Override
+ public Map<K, V> toExternal(MapData internal) {
+ final ArrayData keyArray = internal.keyArray();
+ final ArrayData valueArray = internal.valueArray();
+
+ final int length = internal.size();
+ final Map<K, V> map = new HashMap<>();
+ for (int pos = 0; pos < length; pos++) {
+ final Object keyValue = keyConverter.elementGetter.getElementOrNull(keyArray, pos);
+ final Object valueValue = valueConverter.elementGetter.getElementOrNull(valueArray, pos);
+ map.put(
+ keyConverter.elementConverter.toExternalOrNull(keyValue),
+ valueConverter.elementConverter.toExternalOrNull(valueValue));
+ }
+ return map;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Runtime helper methods
+ // --------------------------------------------------------------------------------------------
+
+ private MapData toBinaryMapData(Map<K, V> external) {
+ final int length = external.size();
+ keyConverter.allocateWriter(length);
+ valueConverter.allocateWriter(length);
+ int pos = 0;
+ for (Map.Entry<K, V> entry : external.entrySet()) {
+ keyConverter.writeElement(pos, entry.getKey());
+ valueConverter.writeElement(pos, entry.getValue());
+ pos++;
+ }
+ return BinaryMapData.valueOf(keyConverter.completeWriter(), valueConverter.completeWriter());
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Factory method
+ // --------------------------------------------------------------------------------------------
+
+ public static MapMapConverter<?, ?> createForMapType(DataType dataType) {
+ final DataType keyDataType = dataType.getChildren().get(0);
+ final DataType valueDataType = dataType.getChildren().get(1);
+ return new MapMapConverter<>(
+ ArrayObjectArrayConverter.createForElement(keyDataType),
+ ArrayObjectArrayConverter.createForElement(valueDataType)
+ );
+ }
+
+ public static MapMapConverter<?, ?> createForMultisetType(DataType dataType) {
+ final DataType keyDataType = dataType.getChildren().get(0);
+ final DataType valueDataType = DataTypes.INT().notNull();
+ return new MapMapConverter<>(
+ ArrayObjectArrayConverter.createForElement(keyDataType),
+ ArrayObjectArrayConverter.createForElement(valueDataType)
+ );
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RawByteArrayConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RawByteArrayConverter.java
new file mode 100644
index 0000000..db24407
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RawByteArrayConverter.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RawType;
+
+/**
+ * Converter for {@link RawType} of {@code byte[]} external type.
+ */
+@Internal
+class RawByteArrayConverter<T> implements DataStructureConverter<RawValueData<T>, byte[]> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final TypeSerializer<T> serializer;
+
+ private RawByteArrayConverter(TypeSerializer<T> serializer) {
+ this.serializer = serializer;
+ }
+
+ @Override
+ public RawValueData<T> toInternal(byte[] external) {
+ return RawValueData.fromBytes(external);
+ }
+
+ @Override
+ public byte[] toExternal(RawValueData<T> internal) {
+ return internal.toBytes(serializer);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Factory method
+ // --------------------------------------------------------------------------------------------
+
+ public static RawByteArrayConverter<?> create(DataType dataType) {
+ final TypeSerializer<?> serializer = ((RawType<?>) dataType.getLogicalType()).getTypeSerializer();
+ return new RawByteArrayConverter<>(serializer);
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RawObjectConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RawObjectConverter.java
new file mode 100644
index 0000000..d93756a
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RawObjectConverter.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RawType;
+
+/**
+ * Converter for {@link RawType} of object external type.
+ */
+@Internal
+class RawObjectConverter<T> implements DataStructureConverter<RawValueData<T>, T> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final TypeSerializer<T> serializer;
+
+ private RawObjectConverter(TypeSerializer<T> serializer) {
+ this.serializer = serializer;
+ }
+
+ @Override
+ public RawValueData<T> toInternal(T external) {
+ return RawValueData.fromObject(external);
+ }
+
+ @Override
+ public T toExternal(RawValueData<T> internal) {
+ return internal.toObject(serializer);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Factory method
+ // --------------------------------------------------------------------------------------------
+
+ public static RawObjectConverter<?> create(DataType dataType) {
+ final TypeSerializer<?> serializer = ((RawType<?>) dataType.getLogicalType()).getTypeSerializer();
+ return new RawObjectConverter<>(serializer);
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RowRowConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RowRowConverter.java
new file mode 100644
index 0000000..468efcd
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RowRowConverter.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.stream.IntStream;
+
+/**
+ * Converter for {@link RowType} of {@link Row} external type.
+ */
+@Internal
+class RowRowConverter implements DataStructureConverter<RowData, Row> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final DataStructureConverter<Object, Object>[] fieldConverters;
+
+ private final RowData.FieldGetter[] fieldGetters;
+
+ private RowRowConverter(
+ DataStructureConverter<Object, Object>[] fieldConverters,
+ RowData.FieldGetter[] fieldGetters) {
+ this.fieldConverters = fieldConverters;
+ this.fieldGetters = fieldGetters;
+ }
+
+ @Override
+ public void open(ClassLoader classLoader) {
+ for (DataStructureConverter<Object, Object> fieldConverter : fieldConverters) {
+ fieldConverter.open(classLoader);
+ }
+ }
+
+ @Override
+ public RowData toInternal(Row external) {
+ final int length = fieldConverters.length;
+ final GenericRowData genericRow = new GenericRowData(length);
+ for (int pos = 0; pos < length; pos++) {
+ final Object value = external.getField(pos);
+ genericRow.setField(pos, fieldConverters[pos].toInternalOrNull(value));
+ }
+ return genericRow;
+ }
+
+ @Override
+ public Row toExternal(RowData internal) {
+ final int length = fieldConverters.length;
+ final Row row = new Row(length);
+ for (int pos = 0; pos < length; pos++) {
+ final Object value = fieldGetters[pos].getFieldOrNull(internal);
+ row.setField(pos, fieldConverters[pos].toExternalOrNull(value));
+ }
+ return row;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Factory method
+ // --------------------------------------------------------------------------------------------
+
+ @SuppressWarnings({"unchecked", "Convert2MethodRef"})
+ public static RowRowConverter create(DataType dataType) {
+ final List<DataType> fields = dataType.getChildren();
+ final DataStructureConverter<Object, Object>[] fieldConverters = fields.stream()
+ .map(dt -> DataStructureConverters.getConverter(dt))
+ .toArray(DataStructureConverter[]::new);
+ final RowData.FieldGetter[] fieldGetters = IntStream
+ .range(0, fields.size())
+ .mapToObj(pos -> RowData.createFieldGetter(fields.get(pos).getLogicalType(), pos))
+ .toArray(RowData.FieldGetter[]::new);
+ return new RowRowConverter(fieldConverters, fieldGetters);
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/StringByteArrayConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/StringByteArrayConverter.java
new file mode 100644
index 0000000..3a7736c
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/StringByteArrayConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+/**
+ * Converter for {@link CharType}/{@link VarCharType} of {@code byte[]} external type.
+ */
+@Internal
+class StringByteArrayConverter implements DataStructureConverter<StringData, byte[]> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public StringData toInternal(byte[] external) {
+ return StringData.fromBytes(external);
+ }
+
+ @Override
+ public byte[] toExternal(StringData internal) {
+ return internal.toBytes();
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/StringStringConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/StringStringConverter.java
new file mode 100644
index 0000000..290758b
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/StringStringConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+/**
+ * Converter for {@link CharType}/{@link VarCharType} of {@link String} external type.
+ */
+@Internal
+class StringStringConverter implements DataStructureConverter<StringData, String> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public StringData toInternal(String external) {
+ return StringData.fromString(external);
+ }
+
+ @Override
+ public String toExternal(StringData internal) {
+ return internal.toString();
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/StructuredObjectConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/StructuredObjectConverter.java
new file mode 100644
index 0000000..2d21db8
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/StructuredObjectConverter.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.CompileUtils;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.StructuredType;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.types.extraction.ExtractionUtils.getStructuredField;
+import static org.apache.flink.table.types.extraction.ExtractionUtils.getStructuredFieldGetter;
+import static org.apache.flink.table.types.extraction.ExtractionUtils.getStructuredFieldSetter;
+import static org.apache.flink.table.types.extraction.ExtractionUtils.hasInvokableConstructor;
+import static org.apache.flink.table.types.extraction.ExtractionUtils.isStructuredFieldDirectlyReadable;
+import static org.apache.flink.table.types.extraction.ExtractionUtils.isStructuredFieldDirectlyWritable;
+import static org.apache.flink.table.types.extraction.ExtractionUtils.primitiveToWrapper;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldNames;
+
+/**
+ * Converter for {@link StructuredType} of its implementation class.
+ */
+@Internal
+@SuppressWarnings("unchecked")
+class StructuredObjectConverter<T> implements DataStructureConverter<RowData, T> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final DataStructureConverter<Object, Object>[] fieldConverters;
+
+ private final RowData.FieldGetter[] fieldGetters;
+
+ private final String generatedName;
+
+ private final String generatedCode;
+
+ private transient DataStructureConverter<RowData, T> generatedConverter;
+
+ private StructuredObjectConverter(
+ DataStructureConverter<Object, Object>[] fieldConverters,
+ RowData.FieldGetter[] fieldGetters,
+ String generatedName,
+ String generatedCode) {
+ this.fieldConverters = fieldConverters;
+ this.fieldGetters = fieldGetters;
+ this.generatedName = generatedName;
+ this.generatedCode = generatedCode;
+ }
+
+ @Override
+ public void open(ClassLoader classLoader) {
+ for (DataStructureConverter<Object, Object> fieldConverter : fieldConverters) {
+ fieldConverter.open(classLoader);
+ }
+ try {
+ final Class<?> compiledConverter = CompileUtils.compile(classLoader, generatedName, generatedCode);
+ generatedConverter = (DataStructureConverter<RowData, T>) compiledConverter
+ .getConstructor(RowData.FieldGetter[].class, DataStructureConverter[].class)
+ .newInstance(fieldGetters, fieldConverters);
+ } catch (Throwable t) {
+ throw new TableException("Error while generating structured type converter.", t);
+ }
+ generatedConverter.open(classLoader);
+ }
+
+ @Override
+ public RowData toInternal(T external) {
+ return generatedConverter.toInternal(external);
+ }
+
+ @Override
+ public T toExternal(RowData internal) {
+ return generatedConverter.toExternal(internal);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Factory method
+ // --------------------------------------------------------------------------------------------
+
+ public static StructuredObjectConverter<?> create(DataType dataType) {
+ try {
+ return createOrError(dataType);
+ } catch (Throwable t) {
+ throw new TableException(
+ String.format(
+ "Could not create converter for structured type '%s'.",
+ dataType),
+ t);
+ }
+ }
+
+ /**
+ * Creates a {@link DataStructureConverter} for the given structured type.
+ *
+ * <p>Note: We do not perform validation if data type and structured type implementation match. This
+ * must have been done earlier in the {@link DataTypeFactory}.
+ */
+ @SuppressWarnings("RedundantCast")
+ private static StructuredObjectConverter<?> createOrError(DataType dataType) {
+ final List<DataType> fields = dataType.getChildren();
+
+ final DataStructureConverter<Object, Object>[] fieldConverters = fields.stream()
+ .map(dt -> (DataStructureConverter<Object, Object>) DataStructureConverters.getConverter(dt))
+ .toArray(DataStructureConverter[]::new);
+
+ final RowData.FieldGetter[] fieldGetters = IntStream
+ .range(0, fields.size())
+ .mapToObj(pos -> RowData.createFieldGetter(fields.get(pos).getLogicalType(), pos))
+ .toArray(RowData.FieldGetter[]::new);
+
+ final Class<?>[] fieldClasses = fields.stream()
+ .map(DataType::getConversionClass)
+ .toArray(Class[]::new);
+
+ final StructuredType structuredType = (StructuredType) dataType.getLogicalType();
+
+ final Class<?> implementationClass = structuredType.getImplementationClass()
+ .orElseThrow(IllegalStateException::new);
+
+ final String converterName = implementationClass.getName().replace('.', '$') + "$Converter";
+ final String converterCode = generateCode(
+ converterName,
+ implementationClass,
+ getFieldNames(structuredType).toArray(new String[0]),
+ fieldClasses);
+
+ return new StructuredObjectConverter<>(
+ fieldConverters,
+ fieldGetters,
+ converterName,
+ converterCode
+ );
+ }
+
+ private static String generateCode(
+ String converterName,
+ Class<?> clazz,
+ String[] fieldNames,
+ Class<?>[] fieldClasses) {
+ final int fieldCount = fieldClasses.length;
+ final StringBuilder sb = new StringBuilder();
+
+ // we ignore checkstyle here for readability and preserving indention
+
+ line(sb, "public class ", converterName, " implements ", DataStructureConverter.class, " {");
+ line(sb, " private final ", RowData.FieldGetter.class, "[] fieldGetters;");
+ line(sb, " private final ", DataStructureConverter.class, "[] fieldConverters;");
+
+ line(sb, " public ", converterName, "(", RowData.FieldGetter.class, "[] fieldGetters, ", DataStructureConverter.class, "[] fieldConverters) {");
+ line(sb, " this.fieldGetters = fieldGetters;");
+ line(sb, " this.fieldConverters = fieldConverters;");
+ line(sb, " }");
+
+ line(sb, " public ", Object.class, " toInternal(", Object.class, " o) {");
+ line(sb, " final ", clazz, " external = (", clazz, ") o;");
+ line(sb, " final ", GenericRowData.class, " genericRow = new ", GenericRowData.class, "(", fieldCount, ");");
+ for (int pos = 0; pos < fieldCount; pos++) {
+ line(sb, " ", getterExpr(clazz, pos, fieldNames[pos], fieldClasses[pos]), ";");
+ }
+ line(sb, " return genericRow;");
+ line(sb, " }");
+
+ line(sb, " public ", Object.class, " toExternal(", Object.class, " o) {");
+ line(sb, " final ", RowData.class, " internal = (", RowData.class, ") o;");
+ if (hasInvokableConstructor(clazz, fieldClasses)) {
+ line(sb, " final ", clazz, " structured = new ", clazz, "(");
+ for (int pos = 0; pos < fieldCount; pos++) {
+ line(sb, " ", parameterExpr(pos, fieldClasses[pos]), (pos < fieldCount - 1) ? ", " : "");
+ }
+ line(sb, " );");
+ } else {
+ line(sb, " final ", clazz, " structured = new ", clazz, "();");
+ for (int pos = 0; pos < fieldCount; pos++) {
+ line(sb, " ", setterExpr(clazz, pos, fieldNames[pos]), ";");
+ }
+ }
+ line(sb, " return structured;");
+ line(sb, " }");
+ line(sb, "}");
+ return sb.toString();
+ }
+
+ private static String getterExpr(
+ Class<?> implementationClass,
+ int pos,
+ String fieldName,
+ Class<?> fieldClass) {
+ final Field field = getStructuredField(implementationClass, fieldName);
+ String accessExpr;
+ if (isStructuredFieldDirectlyReadable(field)) {
+ // field is accessible without getter
+ accessExpr = expr("external.", field.getName());
+ } else {
+ // field is accessible with a getter
+ final Method getter = getStructuredFieldGetter(implementationClass, field)
+ .orElseThrow(IllegalStateException::new);
+ accessExpr = expr("external.", getter.getName(), "()");
+ }
+ accessExpr = castExpr(accessExpr, fieldClass);
+ return expr("genericRow.setField(", pos, ", fieldConverters[", pos , "].toInternalOrNull(", accessExpr, "))");
+ }
+
+ private static String parameterExpr(int pos, Class<?> fieldClass) {
+ final String conversionExpr = expr("fieldConverters[", pos , "].toExternalOrNull(fieldGetters[", pos, "].getFieldOrNull(internal))");
+ return castExpr(conversionExpr, fieldClass);
+ }
+
+ private static String setterExpr(
+ Class<?> implementationClass,
+ int pos,
+ String fieldName) {
+ final Field field = getStructuredField(implementationClass, fieldName);
+ final String conversionExpr = expr("fieldConverters[", pos , "].toExternalOrNull(fieldGetters[", pos, "].getFieldOrNull(internal))");
+ if (isStructuredFieldDirectlyWritable(field)) {
+ // field is accessible without setter
+ return expr("structured.", field.getName(), " = ", castExpr(conversionExpr, field.getType()));
+ } else {
+ // field is accessible with a setter
+ final Method setter = getStructuredFieldSetter(implementationClass, field)
+ .orElseThrow(IllegalStateException::new);
+ return expr("structured.", setter.getName(), "(", castExpr(conversionExpr, setter.getParameterTypes()[0]), ")");
+ }
+ }
+
+ private static String castExpr(String expr, Class<?> clazz) {
+ // help Janino to box primitive types and fix missing generics
+ return expr("((", primitiveToWrapper(clazz), ") ", expr, ")");
+ }
+
+ private static String expr(Object... parts) {
+ final StringBuilder sb = new StringBuilder();
+ for (Object part : parts) {
+ if (part instanceof Class) {
+ sb.append(((Class<?>) part).getCanonicalName());
+ } else {
+ sb.append(part);
+ }
+ }
+ return sb.toString();
+ }
+
+ private static void line(StringBuilder sb, Object... parts) {
+ for (Object part : parts) {
+ if (part instanceof Class) {
+ sb.append(((Class<?>) part).getCanonicalName());
+ } else {
+ sb.append(part);
+ }
+ }
+ sb.append("\n");
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimeLocalTimeConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimeLocalTimeConverter.java
new file mode 100644
index 0000000..d418f25
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimeLocalTimeConverter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.runtime.functions.SqlDateTimeUtils;
+import org.apache.flink.table.types.logical.TimeType;
+
+/**
+ * Converter for {@link TimeType} of {@link java.time.LocalTime} external type.
+ */
+@Internal
+class TimeLocalTimeConverter implements DataStructureConverter<Integer, java.time.LocalTime> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer toInternal(java.time.LocalTime external) {
+ return SqlDateTimeUtils.localTimeToUnixDate(external);
+ }
+
+ @Override
+ public java.time.LocalTime toExternal(Integer internal) {
+ return SqlDateTimeUtils.unixTimeToLocalTime(internal);
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimeLongConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimeLongConverter.java
new file mode 100644
index 0000000..6cc79fb
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimeLongConverter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.types.logical.TimeType;
+
+/**
+ * Converter for {@link TimeType} of {@link Long} external type.
+ */
+@Internal
+class TimeLongConverter implements DataStructureConverter<Integer, Long> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer toInternal(Long external) {
+ return (int) (external / 1000 / 1000);
+ }
+
+ @Override
+ public Long toExternal(Integer internal) {
+ return ((long) internal) * 1000 * 1000;
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimeTimeConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimeTimeConverter.java
new file mode 100644
index 0000000..1c8b34a
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimeTimeConverter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.runtime.functions.SqlDateTimeUtils;
+import org.apache.flink.table.types.logical.TimeType;
+
+/**
+ * Converter for {@link TimeType} of {@link java.sql.Time} external type.
+ */
+@Internal
+class TimeTimeConverter implements DataStructureConverter<Integer, java.sql.Time> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer toInternal(java.sql.Time external) {
+ return SqlDateTimeUtils.timeToInternal(external);
+ }
+
+ @Override
+ public java.sql.Time toExternal(Integer internal) {
+ return SqlDateTimeUtils.internalToTime(internal);
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimestampLocalDateTimeConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimestampLocalDateTimeConverter.java
new file mode 100644
index 0000000..c156715
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimestampLocalDateTimeConverter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.TimestampType;
+
+/**
+ * Converter for {@link TimestampType} of {@link java.time.LocalDateTime} external type.
+ */
+@Internal
+class TimestampLocalDateTimeConverter implements DataStructureConverter<TimestampData, java.time.LocalDateTime> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public TimestampData toInternal(java.time.LocalDateTime external) {
+ return TimestampData.fromLocalDateTime(external);
+ }
+
+ @Override
+ public java.time.LocalDateTime toExternal(TimestampData internal) {
+ return internal.toLocalDateTime();
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimestampTimestampConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimestampTimestampConverter.java
new file mode 100644
index 0000000..f9a72f0
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimestampTimestampConverter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.TimestampType;
+
+/**
+ * Converter for {@link TimestampType} of {@link java.sql.Timestamp} external type.
+ */
+@Internal
+class TimestampTimestampConverter implements DataStructureConverter<TimestampData, java.sql.Timestamp> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public TimestampData toInternal(java.sql.Timestamp external) {
+ return TimestampData.fromTimestamp(external);
+ }
+
+ @Override
+ public java.sql.Timestamp toExternal(TimestampData internal) {
+ return internal.toTimestamp();
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/YearMonthIntervalPeriodConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/YearMonthIntervalPeriodConverter.java
new file mode 100644
index 0000000..50f8d01
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/YearMonthIntervalPeriodConverter.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType.YearMonthResolution;
+
+import java.io.Serializable;
+import java.time.Period;
+
+/**
+ * Converter for {@link YearMonthIntervalType} of {@link java.time.Period} external type.
+ */
+@Internal
+class YearMonthIntervalPeriodConverter implements DataStructureConverter<Integer, java.time.Period> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final PeriodConstructor periodConstructor;
+
+ private YearMonthIntervalPeriodConverter(PeriodConstructor periodConstructor) {
+ this.periodConstructor = periodConstructor;
+ }
+
+ @Override
+ public Integer toInternal(java.time.Period external) {
+ return (int) external.toTotalMonths();
+ }
+
+ @Override
+ public java.time.Period toExternal(Integer internal) {
+ return periodConstructor.construct(internal);
+ }
+
+ private interface PeriodConstructor extends Serializable {
+ java.time.Period construct(Integer internal);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Factory method
+ // --------------------------------------------------------------------------------------------
+
+ public static YearMonthIntervalPeriodConverter create(DataType dataType) {
+ final YearMonthIntervalType intervalType = (YearMonthIntervalType) dataType.getLogicalType();
+ return new YearMonthIntervalPeriodConverter(createPeriodConstructor(intervalType.getResolution()));
+ }
+
+ private static PeriodConstructor createPeriodConstructor(YearMonthResolution resolution) {
+ switch (resolution) {
+ case YEAR:
+ return internal -> java.time.Period.ofYears(internal / 12);
+ case YEAR_TO_MONTH:
+ return internal -> java.time.Period.of(internal / 12, internal % 12, 0);
+ case MONTH:
+ return Period::ofMonths;
+ default:
+ throw new IllegalStateException();
+ }
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/writer/BinaryWriter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/writer/BinaryWriter.java
index 24d903c..cae1232 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/writer/BinaryWriter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/writer/BinaryWriter.java
@@ -153,6 +153,7 @@ public interface BinaryWriter {
writer.writeMap(pos, (MapData) o, (MapDataSerializer) serializer);
break;
case ROW:
+ case STRUCTURED_TYPE:
writer.writeRow(pos, (RowData) o, (RowDataSerializer) serializer);
break;
case RAW:
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/types/InternalSerializers.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/types/InternalSerializers.java
index 462a1ae..7779e98 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/types/InternalSerializers.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/types/InternalSerializers.java
@@ -36,17 +36,17 @@ import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.typeutils.StringDataSerializer;
import org.apache.flink.table.runtime.typeutils.TimestampDataSerializer;
import org.apache.flink.table.types.logical.ArrayType;
-import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DistinctType;
import org.apache.flink.table.types.logical.IntType;
-import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.MultisetType;
import org.apache.flink.table.types.logical.RawType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TypeInformationRawType;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale;
+
/**
* {@link TypeSerializer} of {@link LogicalType} for internal sql engine execution data formats.
*/
@@ -63,9 +63,18 @@ public class InternalSerializers {
* Creates a {@link TypeSerializer} for internal data structures of the given {@link LogicalType}.
*/
public static TypeSerializer create(LogicalType type, ExecutionConfig config) {
+ // ordered by type root definition
switch (type.getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ return StringDataSerializer.INSTANCE;
case BOOLEAN:
return BooleanSerializer.INSTANCE;
+ case BINARY:
+ case VARBINARY:
+ return BytePrimitiveArraySerializer.INSTANCE;
+ case DECIMAL:
+ return new DecimalDataSerializer(getPrecision(type), getScale(type));
case TINYINT:
return ByteSerializer.INSTANCE;
case SMALLINT:
@@ -78,35 +87,27 @@ public class InternalSerializers {
case BIGINT:
case INTERVAL_DAY_TIME:
return LongSerializer.INSTANCE;
- case TIMESTAMP_WITHOUT_TIME_ZONE:
- TimestampType timestampType = (TimestampType) type;
- return new TimestampDataSerializer(timestampType.getPrecision());
- case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
- LocalZonedTimestampType lzTs = (LocalZonedTimestampType) type;
- return new TimestampDataSerializer(lzTs.getPrecision());
case FLOAT:
return FloatSerializer.INSTANCE;
case DOUBLE:
return DoubleSerializer.INSTANCE;
- case CHAR:
- case VARCHAR:
- return StringDataSerializer.INSTANCE;
- case DECIMAL:
- DecimalType decimalType = (DecimalType) type;
- return new DecimalDataSerializer(decimalType.getPrecision(), decimalType.getScale());
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return new TimestampDataSerializer(getPrecision(type));
+ case TIMESTAMP_WITH_TIME_ZONE:
+ throw new UnsupportedOperationException();
case ARRAY:
return new ArrayDataSerializer(((ArrayType) type).getElementType(), config);
+ case MULTISET:
+ return new MapDataSerializer(((MultisetType) type).getElementType(), new IntType(false), config);
case MAP:
MapType mapType = (MapType) type;
return new MapDataSerializer(mapType.getKeyType(), mapType.getValueType(), config);
- case MULTISET:
- return new MapDataSerializer(((MultisetType) type).getElementType(), new IntType(), config);
case ROW:
- RowType rowType = (RowType) type;
- return new RowDataSerializer(config, rowType);
- case BINARY:
- case VARBINARY:
- return BytePrimitiveArraySerializer.INSTANCE;
+ case STRUCTURED_TYPE:
+ return new RowDataSerializer(config, type.getChildren().toArray(new LogicalType[0]));
+ case DISTINCT_TYPE:
+ return create(((DistinctType) type).getSourceType(), config);
case RAW:
if (type instanceof RawType) {
final RawType<?> rawType = (RawType<?>) type;
@@ -114,6 +115,9 @@ public class InternalSerializers {
}
return new RawValueDataSerializer<>(
((TypeInformationRawType<?>) type).getTypeInformation().createSerializer(config));
+ case NULL:
+ case SYMBOL:
+ case UNRESOLVED:
default:
throw new UnsupportedOperationException(
"Unsupported type '" + type + "' to get internal serializer");
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
new file mode 100644
index 0000000..8af45a4
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
@@ -0,0 +1,730 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.types.AbstractDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeFactoryMock;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+import javax.annotation.Nullable;
+
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.Period;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+import static java.util.Arrays.asList;
+import static org.apache.flink.table.api.DataTypes.ARRAY;
+import static org.apache.flink.table.api.DataTypes.BINARY;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.CHAR;
+import static org.apache.flink.table.api.DataTypes.DATE;
+import static org.apache.flink.table.api.DataTypes.DAY;
+import static org.apache.flink.table.api.DataTypes.DECIMAL;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.INTERVAL;
+import static org.apache.flink.table.api.DataTypes.MAP;
+import static org.apache.flink.table.api.DataTypes.MONTH;
+import static org.apache.flink.table.api.DataTypes.MULTISET;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.SECOND;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIME;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_TIME_ZONE;
+import static org.apache.flink.table.api.DataTypes.VARBINARY;
+import static org.apache.flink.table.api.DataTypes.VARCHAR;
+import static org.apache.flink.table.api.DataTypes.YEAR;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * Tests for {@link DataStructureConverters}.
+ */
+@RunWith(Parameterized.class)
+public class DataStructureConvertersTest {
+
+ @Parameters(name = "{index}: {0}")
+ public static List<TestSpec> testData() {
+ // ordered by definition in DataStructureConverters
+ return asList(
+ TestSpec
+ .forDataType(CHAR(5))
+ .convertedTo(String.class, "12345")
+ .convertedTo(byte[].class, "12345".getBytes(StandardCharsets.UTF_8))
+ .convertedTo(StringData.class, StringData.fromString("12345")),
+
+ TestSpec
+ .forDataType(VARCHAR(100))
+ .convertedTo(String.class, "12345")
+ .convertedTo(byte[].class, "12345".getBytes(StandardCharsets.UTF_8))
+ .convertedTo(StringData.class, StringData.fromString("12345")),
+
+ TestSpec
+ .forDataType(BOOLEAN().notNull())
+ .convertedTo(Boolean.class, true)
+ .convertedTo(boolean.class, true),
+
+ TestSpec
+ .forDataType(BINARY(5))
+ .convertedTo(byte[].class, new byte[]{1, 2, 3, 4, 5}),
+
+ TestSpec
+ .forDataType(VARBINARY(100))
+ .convertedTo(byte[].class, new byte[]{1, 2, 3, 4, 5}),
+
+ TestSpec
+ .forDataType(DECIMAL(3, 2))
+ .convertedTo(BigDecimal.class, new BigDecimal("1.23"))
+ .convertedTo(DecimalData.class, DecimalData.fromUnscaledLong(123, 3, 2)),
+
+ // TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE are skipped for simplicity
+
+ TestSpec
+ .forDataType(DATE())
+ .convertedTo(Date.class, Date.valueOf("2010-11-12"))
+ .convertedTo(LocalDate.class, LocalDate.parse("2010-11-12"))
+ .convertedTo(Integer.class, 14_925),
+
+ TestSpec
+ .forDataType(TIME(0))
+ .convertedTo(java.sql.Time.class, java.sql.Time.valueOf("12:34:56"))
+ .convertedTo(LocalTime.class, LocalTime.parse("12:34:56"))
+ .convertedTo(Integer.class, 45_296_000)
+ .convertedTo(Long.class, 45_296_000_000_000L),
+
+ TestSpec
+ .forDataType(TIME(3)) // TODO support precision of 9
+ .convertedTo(LocalTime.class, LocalTime.parse("12:34:56.001"))
+ .convertedTo(Integer.class, 45_296_001),
+
+ TestSpec
+ .forDataType(TIMESTAMP(9))
+ .convertedTo(Timestamp.class, Timestamp.valueOf("2010-11-12 12:34:56.000000001"))
+ .convertedTo(LocalDateTime.class, LocalDateTime.parse("2010-11-12T12:34:56.000000001"))
+ .convertedTo(TimestampData.class, TimestampData.fromEpochMillis(1_289_565_296_000L, 1)),
+
+ TestSpec
+ .forDataType(TIMESTAMP_WITH_TIME_ZONE(0))
+ .convertedTo(
+ ZonedDateTime.class,
+ ZonedDateTime.ofInstant(Instant.EPOCH, ZoneId.of("UTC")))
+ .convertedTo(
+ java.time.OffsetDateTime.class,
+ ZonedDateTime.ofInstant(Instant.EPOCH, ZoneId.of("UTC")).toOffsetDateTime())
+ .expectErrorMessage("Unsupported data type: TIMESTAMP(0) WITH TIME ZONE"),
+
+ TestSpec
+ .forDataType(TIMESTAMP_WITH_LOCAL_TIME_ZONE(0))
+ .convertedTo(Instant.class, Instant.ofEpochSecond(12_345))
+ .convertedTo(Integer.class, 12_345)
+ .convertedTo(Long.class, 12_345_000L)
+ .convertedTo(TimestampData.class, TimestampData.fromEpochMillis(12_345_000)),
+
+ TestSpec
+ .forDataType(TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))
+ .convertedTo(Instant.class, Instant.ofEpochSecond(12_345, 1_000_000))
+ .convertedTo(Long.class, 12_345_001L)
+ .convertedTo(TimestampData.class, TimestampData.fromEpochMillis(12_345_001)),
+
+ TestSpec
+ .forDataType(TIMESTAMP_WITH_LOCAL_TIME_ZONE(9))
+ .convertedTo(Instant.class, Instant.ofEpochSecond(12_345, 1))
+ .convertedTo(TimestampData.class, TimestampData.fromEpochMillis(12_345_000, 1)),
+
+ TestSpec
+ .forDataType(INTERVAL(YEAR(2), MONTH()))
+ .convertedTo(Period.class, Period.of(2, 6, 0))
+ .convertedTo(Integer.class, 30),
+
+ TestSpec
+ .forDataType(INTERVAL(MONTH()))
+ .convertedTo(Period.class, Period.of(0, 30, 0))
+ .convertedTo(Integer.class, 30),
+
+ TestSpec
+ .forDataType(INTERVAL(DAY(), SECOND(3))) // TODO support precision of 9
+ .convertedTo(Duration.class, Duration.ofMillis(123))
+ .convertedTo(Long.class, 123L),
+
+ TestSpec
+ .forDataType(ARRAY(BOOLEAN()))
+ .convertedTo(boolean[].class, new boolean[]{true, false, true, true})
+ .convertedTo(ArrayData.class, new GenericArrayData(new boolean[]{true, false, true, true})),
+
+ // arrays of TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE are skipped for simplicity
+
+ TestSpec
+ .forDataType(ARRAY(DATE()))
+ .convertedTo(
+ LocalDate[].class,
+ new LocalDate[]{null, LocalDate.parse("2010-11-12"), null, LocalDate.parse("2010-11-12")}),
+
+ TestSpec
+ .forDataType(MAP(INT(), BOOLEAN()))
+ .convertedTo(Map.class, createIdentityMap())
+ .convertedTo(MapData.class, new GenericMapData(createIdentityMap())),
+
+ TestSpec
+ .forDataType(MAP(DATE(), BOOLEAN()))
+ .convertedTo(Map.class, createLocalDateMap()),
+
+ TestSpec
+ .forDataType(MULTISET(BOOLEAN()))
+ .convertedTo(Map.class, createIdentityMultiset())
+ .convertedTo(MapData.class, new GenericMapData(createIdentityMultiset())),
+
+ TestSpec
+ .forDataType(MULTISET(DATE()))
+ .convertedTo(Map.class, createLocalDateMultiset()),
+
+ TestSpec
+ .forDataType(
+ ROW(
+ FIELD("a", INT()),
+ FIELD("b",
+ ROW(
+ FIELD("b_1", DOUBLE()),
+ FIELD("b_2", BOOLEAN())))))
+ .convertedTo(Row.class, Row.of(12, Row.of(2.0, null)))
+ .convertedTo(RowData.class, GenericRowData.of(12, GenericRowData.of(2.0, null))),
+
+ TestSpec
+ .forDataType(
+ ROW(
+ FIELD("a", INT()),
+ FIELD("b",
+ ROW(
+ FIELD("b_1", DATE()),
+ FIELD("b_2", DATE())))))
+ .convertedTo(Row.class, Row.of(12, Row.of(LocalDate.ofEpochDay(1), null))),
+
+ TestSpec
+ .forClass(PojoWithMutableFields.class)
+ .convertedToSupplier(
+ PojoWithMutableFields.class,
+ () -> {
+ final PojoWithMutableFields pojo = new PojoWithMutableFields();
+ pojo.age = 42;
+ pojo.name = "Bob";
+ return pojo;
+ })
+ .convertedTo(Row.class, Row.of(42, "Bob"))
+ .convertedTo(RowData.class, GenericRowData.of(42, StringData.fromString("Bob"))),
+
+ TestSpec
+ .forClass(PojoWithImmutableFields.class)
+ .convertedTo(PojoWithImmutableFields.class, new PojoWithImmutableFields(42, "Bob"))
+ .convertedTo(Row.class, Row.of(42, "Bob"))
+ .convertedTo(RowData.class, GenericRowData.of(42, StringData.fromString("Bob"))),
+
+ TestSpec
+ .forClass(PojoWithGettersAndSetters.class)
+ .convertedToSupplier(
+ PojoWithGettersAndSetters.class,
+ () -> {
+ final PojoWithGettersAndSetters pojo = new PojoWithGettersAndSetters();
+ pojo.setAge(42);
+ pojo.setName("Bob");
+ return pojo;
+ })
+ .convertedTo(Row.class, Row.of(42, "Bob"))
+ .convertedTo(RowData.class, GenericRowData.of(42, StringData.fromString("Bob"))),
+
+ TestSpec
+ .forClass(ComplexPojo.class)
+ .convertedToSupplier(
+ ComplexPojo.class,
+ () -> {
+ final ComplexPojo pojo = new ComplexPojo();
+ pojo.setTimestamp(Timestamp.valueOf("2010-11-12 13:14:15.000000001"));
+ pojo.setPreferences(Row.of(42, "Bob", new Boolean[]{true, null, false}));
+ pojo.setBalance(new BigDecimal("1.23"));
+ return pojo;
+ })
+ .convertedTo(
+ Row.class,
+ Row.of(
+ Timestamp.valueOf("2010-11-12 13:14:15.000000001"),
+ Row.of(42, "Bob", new Boolean[]{true, null, false}),
+ new BigDecimal("1.23"))),
+
+ TestSpec
+ .forClass(PojoAsSuperclass.class)
+ .convertedToSupplier(
+ PojoWithMutableFields.class,
+ () -> {
+ final PojoWithMutableFields pojo = new PojoWithMutableFields();
+ pojo.age = 42;
+ pojo.name = "Bob";
+ return pojo;
+ })
+ .convertedTo(Row.class, Row.of(42)),
+
+ TestSpec
+ .forDataType(MAP(STRING(), DataTypes.of(PojoWithImmutableFields.class)))
+ .convertedTo(Map.class, createPojoWithImmutableFieldsMap()),
+
+ TestSpec
+ .forDataType(ARRAY(DataTypes.of(PojoWithNestedPojo.class)))
+ .convertedTo(PojoWithNestedPojo[].class, createPojoWithNestedPojoArray())
+ .convertedTo(
+ Row[].class,
+ new Row[]{
+ Row.of(
+ new PojoWithImmutableFields(42, "Bob"),
+ new PojoWithImmutableFields[]{new PojoWithImmutableFields(42, "Bob"), null}
+ ),
+ null,
+ Row.of(
+ null,
+ new PojoWithImmutableFields[3]
+ ),
+ Row.of(
+ null,
+ null
+ )
+ })
+ );
+ }
+
+ @Parameter
+ public TestSpec testSpec;
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testConversions() {
+ if (testSpec.expectedErrorMessage != null) {
+ thrown.expect(TableException.class);
+ thrown.expectMessage(equalTo(testSpec.expectedErrorMessage));
+ }
+ for (Map.Entry<Class<?>, Object> from : testSpec.conversions.entrySet()) {
+ final DataType fromDataType = testSpec.dataType.bridgedTo(from.getKey());
+
+ final DataStructureConverter<Object, Object> fromConverter =
+ simulateSerialization(DataStructureConverters.getConverter(fromDataType));
+ fromConverter.open(DataStructureConvertersTest.class.getClassLoader());
+
+ final Object internalValue = fromConverter.toInternalOrNull(from.getValue());
+
+ for (Map.Entry<Class<?>, Object> to : testSpec.conversions.entrySet()) {
+ final DataType toDataType = testSpec.dataType.bridgedTo(to.getKey());
+
+ final DataStructureConverter<Object, Object> toConverter =
+ simulateSerialization(DataStructureConverters.getConverter(toDataType));
+ toConverter.open(DataStructureConvertersTest.class.getClassLoader());
+
+ assertArrayEquals(
+ new Object[]{to.getValue()},
+ new Object[]{toConverter.toExternalOrNull(internalValue)});
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Test utilities
+ // --------------------------------------------------------------------------------------------
+
+ private static class TestSpec {
+
+ private final String description;
+
+ private final DataType dataType;
+
+ private final Map<Class<?>, Object> conversions;
+
+ private @Nullable String expectedErrorMessage;
+
+ private TestSpec(String description, DataType dataType) {
+ this.description = description;
+ this.dataType = dataType;
+ this.conversions = new LinkedHashMap<>();
+ }
+
+ static TestSpec forDataType(AbstractDataType<?> dataType) {
+ final DataTypeFactoryMock factoryMock = new DataTypeFactoryMock();
+ final DataType resolvedDataType = factoryMock.createDataType(dataType);
+ return new TestSpec(
+ resolvedDataType.toString(),
+ resolvedDataType);
+ }
+
+ static TestSpec forClass(Class<?> clazz) {
+ return forDataType(DataTypes.of(clazz));
+ }
+
+ <T> TestSpec convertedTo(Class<T> clazz, T value) {
+ conversions.put(clazz, value);
+ return this;
+ }
+
+ <T> TestSpec convertedToSupplier(Class<T> clazz, Supplier<T> supplier) {
+ conversions.put(clazz, supplier.get());
+ return this;
+ }
+
+ TestSpec expectErrorMessage(String expectedErrorMessage) {
+ this.expectedErrorMessage = expectedErrorMessage;
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return description;
+ }
+ }
+
+ private static DataStructureConverter<Object, Object> simulateSerialization(DataStructureConverter<Object, Object> converter) {
+ try {
+ final byte[] bytes = InstantiationUtil.serializeObject(converter);
+ return InstantiationUtil.deserializeObject(bytes, DataStructureConverter.class.getClassLoader());
+ } catch (Exception e) {
+ throw new AssertionError("Serialization failed.", e);
+ }
+ }
+
+ private static Map<Integer, Boolean> createIdentityMap() {
+ final Map<Integer, Boolean> map = new HashMap<>();
+ map.put(1, true);
+ map.put(2, false);
+ map.put(3, null);
+ map.put(null, true);
+ return map;
+ }
+
+ private static Map<LocalDate, Boolean> createLocalDateMap() {
+ final Map<LocalDate, Boolean> map = new HashMap<>();
+ map.put(LocalDate.ofEpochDay(0), true);
+ map.put(LocalDate.ofEpochDay(1), false);
+ map.put(LocalDate.ofEpochDay(3), null);
+ map.put(null, true);
+ return map;
+ }
+
+ private static Map<String, PojoWithImmutableFields> createPojoWithImmutableFieldsMap() {
+ final Map<String, PojoWithImmutableFields> map = new HashMap<>();
+ map.put("Alice", new PojoWithImmutableFields(12, "Alice"));
+ map.put("Bob", new PojoWithImmutableFields(42, "Bob"));
+ map.put("Unknown", null);
+ return map;
+ }
+
+ private static PojoWithNestedPojo[] createPojoWithNestedPojoArray() {
+ final PojoWithNestedPojo pojo1 = new PojoWithNestedPojo();
+ pojo1.inner = new PojoWithImmutableFields(42, "Bob");
+ pojo1.innerArray = new PojoWithImmutableFields[]{new PojoWithImmutableFields(42, "Bob"), null};
+
+ final PojoWithNestedPojo pojo2 = new PojoWithNestedPojo();
+ pojo2.inner = null;
+ pojo2.innerArray = new PojoWithImmutableFields[3];
+
+ final PojoWithNestedPojo pojo3 = new PojoWithNestedPojo();
+
+ return new PojoWithNestedPojo[]{
+ pojo1,
+ null,
+ pojo2,
+ pojo3
+ };
+ }
+
+ private static Map<Boolean, Integer> createIdentityMultiset() {
+ final Map<Boolean, Integer> map = new HashMap<>();
+ map.put(true, 1);
+ map.put(false, 2);
+ map.put(null, 3);
+ return map;
+ }
+
+ private static Map<LocalDate, Integer> createLocalDateMultiset() {
+ final Map<LocalDate, Integer> map = new HashMap<>();
+ map.put(LocalDate.ofEpochDay(0), 1);
+ map.put(LocalDate.ofEpochDay(1), 2);
+ map.put(null, 3);
+ return map;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Structured types
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * POJO as superclass.
+ */
+ public static class PojoAsSuperclass {
+ public int age;
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof PojoAsSuperclass)) {
+ return false;
+ }
+ PojoAsSuperclass that = (PojoAsSuperclass) o;
+ return age == that.age;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(age);
+ }
+ }
+
+ /**
+ * POJO with public mutable fields.
+ */
+ public static class PojoWithMutableFields extends PojoAsSuperclass {
+ public String name;
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ // modified to support PojoAsSuperclass
+ if (o.getClass() == PojoAsSuperclass.class) {
+ return true;
+ }
+ PojoWithMutableFields that = (PojoWithMutableFields) o;
+ return Objects.equals(name, that.name);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), name);
+ }
+ }
+
+ /**
+ * POJO with immutable fields.
+ */
+ public static class PojoWithImmutableFields {
+ public final int age;
+ public final String name;
+
+ public PojoWithImmutableFields(int age, String name) {
+ this.age = age;
+ this.name = name;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PojoWithImmutableFields that = (PojoWithImmutableFields) o;
+ return age == that.age && Objects.equals(name, that.name);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(age, name);
+ }
+ }
+
+ /**
+ * POJO with default constructor and private fields.
+ */
+ public static class PojoWithGettersAndSetters {
+ private int age;
+ private String name;
+
+ public int getAge() {
+ return age;
+ }
+
+ public void setAge(int age) {
+ this.age = age;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PojoWithGettersAndSetters that = (PojoWithGettersAndSetters) o;
+ return age == that.age && Objects.equals(name, that.name);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(age, name);
+ }
+ }
+
+ /**
+ * POJO with annotations, nested types, and custom field order.
+ */
+ public static class ComplexPojo {
+ private Timestamp timestamp;
+ private @DataTypeHint("ROW<age INT, name STRING, mask ARRAY<BOOLEAN>>") Row preferences;
+ private @DataTypeHint("DECIMAL(3, 2)") BigDecimal balance;
+
+ public ComplexPojo() {
+ // default constructor
+ }
+
+ // determines the order of the fields
+ public ComplexPojo(Timestamp timestamp, Row preferences, BigDecimal balance) {
+ this.timestamp = timestamp;
+ this.preferences = preferences;
+ this.balance = balance;
+ }
+
+ public Timestamp getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(Timestamp timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public Row getPreferences() {
+ return preferences;
+ }
+
+ public void setPreferences(Row preferences) {
+ this.preferences = preferences;
+ }
+
+ public BigDecimal getBalance() {
+ return balance;
+ }
+
+ public void setBalance(BigDecimal balance) {
+ this.balance = balance;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ComplexPojo that = (ComplexPojo) o;
+ return Objects.equals(timestamp, that.timestamp) &&
+ Objects.equals(preferences, that.preferences) &&
+ Objects.equals(balance, that.balance);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(timestamp, preferences, balance);
+ }
+ }
+
+ /**
+ * POJO with nested fields.
+ */
+ public static class PojoWithNestedPojo {
+ public PojoWithImmutableFields inner;
+
+ public PojoWithImmutableFields[] innerArray;
+
+ public PojoWithNestedPojo() {
+ // default constructor
+ }
+
+ public PojoWithNestedPojo(
+ PojoWithImmutableFields inner,
+ PojoWithImmutableFields[] innerArray) {
+ this.inner = inner;
+ this.innerArray = innerArray;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PojoWithNestedPojo that = (PojoWithNestedPojo) o;
+ return Objects.equals(inner, that.inner) &&
+ Arrays.equals(innerArray, that.innerArray);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Objects.hash(inner);
+ result = 31 * result + Arrays.hashCode(innerArray);
+ return result;
+ }
+ }
+}