You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/08/05 09:02:34 UTC

[flink] branch release-1.9 updated (8186f18 -> ae0dba5)

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 8186f18  [FLINK-13532][docs] Fix broken links of zh docs
     new 889292b  [FLINK-10257][table-common] Generalize variable strings for varying lengths
     new 2f4e5ea  [FLINK-13463][table-planner-blink] Add test case for VALUES with char literal
     new cecbe3c  [hotfix][table-common] Add utilities for modifying (nested) types
     new ae0dba5  [FLINK-13463][table-common] Relax legacy type info conversion for VARCHAR literals

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../table/client/gateway/local/LocalExecutor.java  |  17 +--
 .../types/logical/utils/LogicalTypeDuplicator.java | 133 +++++++++++++++++
 .../logical/utils/LogicalTypeGeneralization.java   |  15 +-
 .../types/logical/utils/LogicalTypeUtils.java      |  68 +++++++++
 .../DataTypeUtils.java}                            |  24 +--
 .../utils/LegacyTypeInfoDataTypeConverter.java     |   6 +-
 .../table/types/LogicalTypeDuplicatorTest.java     | 161 +++++++++++++++++++++
 .../table/types/LogicalTypeGeneralizationTest.java |  20 ++-
 .../planner/runtime/stream/sql/ValuesITCase.scala  |  12 +-
 9 files changed, 426 insertions(+), 30 deletions(-)
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeDuplicator.java
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeUtils.java
 copy flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/{inference/strategies/MissingTypeStrategy.java => utils/DataTypeUtils.java} (63%)
 create mode 100644 flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeDuplicatorTest.java


[flink] 02/04: [FLINK-13463][table-planner-blink] Add test case for VALUES with char literal

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2f4e5eab3ee983f76952a4867403eced7bdd32de
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed Jul 31 17:36:38 2019 +0200

    [FLINK-13463][table-planner-blink] Add test case for VALUES with char literal
---
 .../table/planner/runtime/stream/sql/ValuesITCase.scala      | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/ValuesITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/ValuesITCase.scala
index e43c6d2..4a9fd45 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/ValuesITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/ValuesITCase.scala
@@ -23,8 +23,7 @@ import org.apache.flink.table.api.scala._
 import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.planner.runtime.utils.{StreamingTestBase, TestingAppendBaseRowSink}
 import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo
-import org.apache.flink.table.types.logical.IntType
-
+import org.apache.flink.table.types.logical.{IntType, VarCharType}
 import org.junit.Assert._
 import org.junit.Test
 
@@ -33,17 +32,18 @@ class ValuesITCase extends StreamingTestBase {
   @Test
   def testValues(): Unit = {
 
-    val sqlQuery = "SELECT * FROM (VALUES (1, 2, 3)) T(a, b, c)"
+    val sqlQuery = "SELECT * FROM (VALUES (1, 'Bob'), (1, 'Alice')) T(a, b)"
 
-    val outputType = new BaseRowTypeInfo(new IntType(), new IntType(), new IntType())
+    val outputType = new BaseRowTypeInfo(
+      new IntType(),
+      new VarCharType(5))
 
     val result = tEnv.sqlQuery(sqlQuery).toAppendStream[BaseRow]
     val sink = new TestingAppendBaseRowSink(outputType)
     result.addSink(sink).setParallelism(1)
     env.execute()
 
-    val expected = List("0|1,2,3")
+    val expected = List("0|1,Alice", "0|1,Bob")
     assertEquals(expected.sorted, sink.getAppendResults.sorted)
   }
-
 }


[flink] 04/04: [FLINK-13463][table-common] Relax legacy type info conversion for VARCHAR literals

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ae0dba54746e7bedf72ed71c36c8a9f48b593744
Author: Timo Walther <tw...@apache.org>
AuthorDate: Thu Aug 1 16:44:57 2019 +0200

    [FLINK-13463][table-common] Relax legacy type info conversion for VARCHAR literals
    
    This closes #9334.
---
 .../flink/table/types/utils/LegacyTypeInfoDataTypeConverter.java    | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/LegacyTypeInfoDataTypeConverter.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/LegacyTypeInfoDataTypeConverter.java
index 2990a67..20b6db8 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/LegacyTypeInfoDataTypeConverter.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/LegacyTypeInfoDataTypeConverter.java
@@ -197,7 +197,7 @@ public final class LegacyTypeInfoDataTypeConverter {
 			return foundTypeInfo;
 		}
 
-		// we are relaxing the constraint for DECIMAL, CHAR, TIMESTAMP_WITHOUT_TIME_ZONE to
+		// we are relaxing the constraint for DECIMAL, CHAR, VARCHAR, TIMESTAMP_WITHOUT_TIME_ZONE to
 		// support value literals in legacy planner
 		LogicalType logicalType = dataType.getLogicalType();
 		if (hasRoot(logicalType, LogicalTypeRoot.DECIMAL)) {
@@ -208,6 +208,10 @@ public final class LegacyTypeInfoDataTypeConverter {
 			return Types.STRING;
 		}
 
+		else if (hasRoot(logicalType, LogicalTypeRoot.VARCHAR)) {
+			return Types.STRING;
+		}
+
 		else if (canConvertToTimestampTypeInfoLenient(dataType)) {
 			return Types.SQL_TIMESTAMP;
 		}


[flink] 03/04: [hotfix][table-common] Add utilities for modifying (nested) types

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cecbe3ce295d44e8c712e68385f75cda339fff79
Author: Timo Walther <tw...@apache.org>
AuthorDate: Thu Aug 1 16:41:41 2019 +0200

    [hotfix][table-common] Add utilities for modifying (nested) types
---
 .../table/client/gateway/local/LocalExecutor.java  |  17 +--
 .../types/logical/utils/LogicalTypeDuplicator.java | 133 +++++++++++++++++
 .../types/logical/utils/LogicalTypeUtils.java      |  68 +++++++++
 .../flink/table/types/utils/DataTypeUtils.java     |  42 ++++++
 .../table/types/LogicalTypeDuplicatorTest.java     | 161 +++++++++++++++++++++
 5 files changed, 411 insertions(+), 10 deletions(-)

diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 5d91964..a7fd6b1 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -19,8 +19,6 @@
 package org.apache.flink.table.client.gateway.local;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.cli.CliFrontend;
 import org.apache.flink.client.cli.CliFrontendParser;
@@ -41,7 +39,6 @@ import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
-import org.apache.flink.table.calcite.FlinkTypeFactory;
 import org.apache.flink.table.client.SqlClientException;
 import org.apache.flink.table.client.config.Environment;
 import org.apache.flink.table.client.gateway.Executor;
@@ -54,6 +51,9 @@ import org.apache.flink.table.client.gateway.local.result.BasicResult;
 import org.apache.flink.table.client.gateway.local.result.ChangelogResult;
 import org.apache.flink.table.client.gateway.local.result.DynamicResult;
 import org.apache.flink.table.client.gateway.local.result.MaterializedResult;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
+import org.apache.flink.table.types.utils.DataTypeUtils;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.StringUtils;
 
@@ -611,13 +611,10 @@ public class LocalExecutor implements Executor {
 	private static TableSchema removeTimeAttributes(TableSchema schema) {
 		final TableSchema.Builder builder = TableSchema.builder();
 		for (int i = 0; i < schema.getFieldCount(); i++) {
-			final TypeInformation<?> type = schema.getFieldTypes()[i];
-			final TypeInformation<?> convertedType;
-			if (FlinkTypeFactory.isTimeIndicatorType(type)) {
-				convertedType = Types.SQL_TIMESTAMP;
-			} else {
-				convertedType = type;
-			}
+			final DataType dataType = schema.getFieldDataTypes()[i];
+			final DataType convertedType = DataTypeUtils.replaceLogicalType(
+				dataType,
+				LogicalTypeUtils.removeTimeAttributes(dataType.getLogicalType()));
 			builder.field(schema.getFieldNames()[i], convertedType);
 		}
 		return builder.build();
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeDuplicator.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeDuplicator.java
new file mode 100644
index 0000000..e417530
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeDuplicator.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Returns a deep copy of a {@link LogicalType}.
+ *
+ * <p>It also enables replacing children of possibly nested structures by overwriting corresponding
+ * {@code visit()} methods.
+ */
+@Internal
+public class LogicalTypeDuplicator extends LogicalTypeDefaultVisitor<LogicalType> {
+
+	@Override
+	public LogicalType visit(ArrayType arrayType) {
+		return new ArrayType(
+			arrayType.isNullable(),
+			arrayType.getElementType().accept(this));
+	}
+
+	@Override
+	public LogicalType visit(MultisetType multisetType) {
+		return new MultisetType(
+			multisetType.isNullable(),
+			multisetType.getElementType().accept(this));
+	}
+
+	@Override
+	public LogicalType visit(MapType mapType) {
+		return new MapType(
+			mapType.isNullable(),
+			mapType.getKeyType().accept(this),
+			mapType.getValueType().accept(this));
+	}
+
+	@Override
+	public LogicalType visit(RowType rowType) {
+		final List<RowField> fields = rowType.getFields().stream()
+			.map(f -> {
+				if (f.getDescription().isPresent()) {
+					return new RowField(
+						f.getName(),
+						f.getType().accept(this),
+						f.getDescription().get());
+				}
+				return new RowField(f.getName(), f.getType().accept(this));
+			})
+			.collect(Collectors.toList());
+
+		return new RowType(
+			rowType.isNullable(),
+			fields);
+	}
+
+	@Override
+	public LogicalType visit(DistinctType distinctType) {
+		final DistinctType.Builder builder = new DistinctType.Builder(
+			distinctType.getObjectIdentifier(),
+			distinctType.getSourceType().accept(this));
+		distinctType.getDescription().ifPresent(builder::setDescription);
+		return builder.build();
+	}
+
+	@Override
+	public LogicalType visit(StructuredType structuredType) {
+		final List<StructuredAttribute> attributes = structuredType.getAttributes().stream()
+			.map(a -> {
+				if (a.getDescription().isPresent()) {
+					return new StructuredAttribute(
+						a.getName(),
+						a.getType().accept(this),
+						a.getDescription().get());
+				}
+				return new StructuredAttribute(
+					a.getName(),
+					a.getType().accept(this));
+			})
+			.collect(Collectors.toList());
+		final StructuredType.Builder builder = new StructuredType.Builder(
+			structuredType.getObjectIdentifier(),
+			attributes);
+		builder.setNullable(structuredType.isNullable());
+		builder.setFinal(structuredType.isFinal());
+		builder.setInstantiable(structuredType.isInstantiable());
+		builder.setComparision(structuredType.getComparision());
+		structuredType.getSuperType().ifPresent(st -> {
+			final LogicalType visited = st.accept(this);
+			if (!(visited instanceof StructuredType)) {
+				throw new TableException("Unexpected super type. Structured type expected but was: " + visited);
+			}
+			builder.setSuperType((StructuredType) visited);
+		});
+		structuredType.getDescription().ifPresent(builder::setDescription);
+		structuredType.getImplementationClass().ifPresent(builder::setImplementationClass);
+		return builder.build();
+	}
+
+	@Override
+	protected LogicalType defaultMethod(LogicalType logicalType) {
+		return logicalType.copy();
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeUtils.java
new file mode 100644
index 0000000..cde6068
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeUtils.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+
+/**
+ * Utilities for handling {@link LogicalType}s.
+ */
+@Internal
+public final class LogicalTypeUtils {
+
+	private static final TimeAttributeRemover TIME_ATTRIBUTE_REMOVER = new TimeAttributeRemover();
+
+	public static LogicalType removeTimeAttributes(LogicalType logicalType) {
+		return logicalType.accept(TIME_ATTRIBUTE_REMOVER);
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private static class TimeAttributeRemover extends LogicalTypeDuplicator {
+
+		@Override
+		public LogicalType visit(TimestampType timestampType) {
+			return new TimestampType(
+				timestampType.isNullable(),
+				timestampType.getPrecision());
+		}
+
+		@Override
+		public LogicalType visit(ZonedTimestampType zonedTimestampType) {
+			return new ZonedTimestampType(
+				zonedTimestampType.isNullable(),
+				zonedTimestampType.getPrecision());
+		}
+
+		@Override
+		public LogicalType visit(LocalZonedTimestampType localZonedTimestampType) {
+			return new LocalZonedTimestampType(
+				localZonedTimestampType.isNullable(),
+				localZonedTimestampType.getPrecision());
+		}
+	}
+
+	private LogicalTypeUtils() {
+		// no instantiation
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
new file mode 100644
index 0000000..43ca734
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+/**
+ * Utilities for handling {@link DataType}s.
+ */
+@Internal
+public final class DataTypeUtils {
+
+	/**
+	 * Replaces the {@link LogicalType} of a {@link DataType}, i.e., it keeps the bridging class.
+	 */
+	public static DataType replaceLogicalType(DataType dataType, LogicalType replacement) {
+		return LogicalTypeDataTypeConverter.toDataType(replacement)
+			.bridgedTo(dataType.getConversionClass());
+	}
+
+	private DataTypeUtils() {
+		// no instantiation
+	}
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeDuplicatorTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeDuplicatorTest.java
new file mode 100644
index 0000000..ef7b3a8
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeDuplicatorTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeDuplicator;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link LogicalTypeDuplicator}.
+ */
+@RunWith(Parameterized.class)
+public class LogicalTypeDuplicatorTest {
+
+	private static final LogicalTypeDuplicator DUPLICATOR = new LogicalTypeDuplicator();
+
+	private static final LogicalTypeDuplicator INT_REPLACER = new IntReplacer();
+
+	@Parameters(name = "{index}: {0}")
+	public static List<Object[]> testData() {
+		return Arrays.asList(
+			new Object[][]{
+				{new CharType(2), new CharType(2)},
+				{createMultisetType(new IntType()), createMultisetType(new BigIntType())},
+				{createArrayType(new IntType()), createArrayType(new BigIntType())},
+				{createMapType(new IntType()), createMapType(new BigIntType())},
+				{createRowType(new IntType()), createRowType(new BigIntType())},
+				{createDistinctType(new IntType()), createDistinctType(new BigIntType())},
+				{createUserType(new IntType()), createUserType(new BigIntType())},
+				{createHumanType(), createHumanType()}
+			}
+		);
+	}
+
+	@Parameter
+	public LogicalType logicalType;
+
+	@Parameter(1)
+	public LogicalType replacedLogicalType;
+
+	@Test
+	public void testDuplication() {
+		assertThat(logicalType.accept(DUPLICATOR), equalTo(logicalType));
+	}
+
+	@Test
+	public void testReplacement() {
+		assertThat(logicalType.accept(INT_REPLACER), equalTo(replacedLogicalType));
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private static class IntReplacer extends LogicalTypeDuplicator {
+		@Override
+		public LogicalType visit(IntType intType) {
+			return new BigIntType();
+		}
+	}
+
+	private static MultisetType createMultisetType(LogicalType replacedType) {
+		return new MultisetType(new MultisetType(replacedType));
+	}
+
+	private static ArrayType createArrayType(LogicalType replacedType) {
+		return new ArrayType(new ArrayType(replacedType));
+	}
+
+	private static MapType createMapType(LogicalType replacedType) {
+		return new MapType(replacedType, new SmallIntType());
+	}
+
+	private static DistinctType createDistinctType(LogicalType replacedType) {
+		return new DistinctType.Builder(
+				ObjectIdentifier.of("cat", "db", "Money"),
+				replacedType)
+			.setDescription("Money type desc.")
+			.build();
+	}
+
+	private static RowType createRowType(LogicalType replacedType) {
+		return new RowType(
+			Arrays.asList(
+				new RowType.RowField("field1", new CharType(2)),
+				new RowType.RowField("field2", new BooleanType()),
+				new RowType.RowField("field3", replacedType)));
+	}
+
+	private static StructuredType createHumanType() {
+		return new StructuredType.Builder(
+				ObjectIdentifier.of("cat", "db", "Human"),
+				Collections.singletonList(
+					new StructuredType.StructuredAttribute("name", new VarCharType(), "Description.")))
+			.setDescription("Human type desc.")
+			.setFinal(false)
+			.setInstantiable(false)
+			.setImplementationClass(Human.class)
+			.build();
+	}
+
+	private static StructuredType createUserType(LogicalType replacedType) {
+		return new StructuredType.Builder(
+				ObjectIdentifier.of("cat", "db", "User"),
+				Collections.singletonList(
+					new StructuredType.StructuredAttribute("setting", replacedType)))
+			.setDescription("User type desc.")
+			.setFinal(false)
+			.setInstantiable(true)
+			.setImplementationClass(User.class)
+			.setSuperType(createHumanType())
+			.build();
+	}
+
+	private abstract static class Human {
+		public String name;
+	}
+
+	private static final class User extends Human {
+		public int setting;
+	}
+}


[flink] 01/04: [FLINK-10257][table-common] Generalize variable strings for varying lengths

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 889292b1d80434ff27cbe63b917288cd5941eb65
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed Jul 31 17:32:26 2019 +0200

    [FLINK-10257][table-common] Generalize variable strings for varying lengths
    
    The Blink planner generalizes multiple CHAR literals into VARCHAR. It seems
    likely that we will adopt this behavior in the future to prevent unwanted
    side-effects for users. This PR updates the existing type generalization classes.
---
 .../logical/utils/LogicalTypeGeneralization.java     | 15 +++++++++++++--
 .../table/types/LogicalTypeGeneralizationTest.java   | 20 +++++++++++++++++++-
 2 files changed, 32 insertions(+), 3 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeGeneralization.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeGeneralization.java
index 18ffeb3..c9d6e25 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeGeneralization.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeGeneralization.java
@@ -76,6 +76,8 @@ import static org.apache.flink.table.types.logical.LogicalTypeFamily.TIME;
 import static org.apache.flink.table.types.logical.LogicalTypeFamily.TIMESTAMP;
 import static org.apache.flink.table.types.logical.LogicalTypeRoot.ANY;
 import static org.apache.flink.table.types.logical.LogicalTypeRoot.ARRAY;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.BINARY;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.CHAR;
 import static org.apache.flink.table.types.logical.LogicalTypeRoot.DATE;
 import static org.apache.flink.table.types.logical.LogicalTypeRoot.DECIMAL;
 import static org.apache.flink.table.types.logical.LogicalTypeRoot.DOUBLE;
@@ -266,10 +268,19 @@ public final class LogicalTypeGeneralization {
 				final int length = combineLength(resultType, type);
 
 				if (hasRoot(resultType, VARCHAR) || hasRoot(resultType, VARBINARY)) {
-					// for variable length type we are done here
+					// variable length types remain variable length types
 					resultType = createStringType(resultType.getTypeRoot(), length);
+				} else if (getLength(resultType) != getLength(type)) {
+					// for different fixed lengths
+					// this is different from the SQL standard but prevents whitespace
+					// padding/modification of strings
+					if (hasRoot(resultType, CHAR)) {
+						resultType = createStringType(VARCHAR, length);
+					} else if (hasRoot(resultType, BINARY)) {
+						resultType = createStringType(VARBINARY, length);
+					}
 				} else {
-					// for mixed fixed/variable or fixed/fixed lengths
+					// for same type with same length
 					resultType = createStringType(typeRoot, length);
 				}
 			}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeGeneralizationTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeGeneralizationTest.java
index 44209e7..d860aed 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeGeneralizationTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeGeneralizationTest.java
@@ -86,6 +86,12 @@ public class LogicalTypeGeneralizationTest {
 					null
 				},
 
+				// incompatible types
+				{
+					Arrays.asList(new BinaryType(), new VarCharType(23)),
+					null
+				},
+
 				// NOT NULL types
 				{
 					Arrays.asList(new IntType(false), new IntType(false)),
@@ -144,10 +150,16 @@ public class LogicalTypeGeneralizationTest {
 					RowType.of(new BigIntType(), new IntType(), new BigIntType())
 				},
 
+				// CHAR types of same length
+				{
+					Arrays.asList(new CharType(2), new CharType(2)),
+					new CharType(2)
+				},
+
 				// CHAR types of different length
 				{
 					Arrays.asList(new CharType(2), new CharType(4)),
-					new CharType(4)
+					new VarCharType(4)
 				},
 
 				// VARCHAR types of different length
@@ -168,6 +180,12 @@ public class LogicalTypeGeneralizationTest {
 					new VarCharType(7)
 				},
 
+				// BINARY types of different length
+				{
+					Arrays.asList(new BinaryType(2), new BinaryType(4)),
+					new VarBinaryType(4)
+				},
+
 				// mixed BINARY and VARBINARY types
 				{
 					Arrays.asList(new BinaryType(5), new VarBinaryType(2), new VarBinaryType(7)),