You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/08/05 08:58:33 UTC

[flink] 03/04: [hotfix][table-common] Add utilities for modifying (nested) types

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e4a225513bd30919f4824c7298b564f0d9dd7b9f
Author: Timo Walther <tw...@apache.org>
AuthorDate: Thu Aug 1 16:41:41 2019 +0200

    [hotfix][table-common] Add utilities for modifying (nested) types
---
 .../table/client/gateway/local/LocalExecutor.java  |  17 +--
 .../types/logical/utils/LogicalTypeDuplicator.java | 133 +++++++++++++++++
 .../types/logical/utils/LogicalTypeUtils.java      |  68 +++++++++
 .../flink/table/types/utils/DataTypeUtils.java     |  42 ++++++
 .../table/types/LogicalTypeDuplicatorTest.java     | 161 +++++++++++++++++++++
 5 files changed, 411 insertions(+), 10 deletions(-)

diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 5d91964..a7fd6b1 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -19,8 +19,6 @@
 package org.apache.flink.table.client.gateway.local;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.cli.CliFrontend;
 import org.apache.flink.client.cli.CliFrontendParser;
@@ -41,7 +39,6 @@ import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
-import org.apache.flink.table.calcite.FlinkTypeFactory;
 import org.apache.flink.table.client.SqlClientException;
 import org.apache.flink.table.client.config.Environment;
 import org.apache.flink.table.client.gateway.Executor;
@@ -54,6 +51,9 @@ import org.apache.flink.table.client.gateway.local.result.BasicResult;
 import org.apache.flink.table.client.gateway.local.result.ChangelogResult;
 import org.apache.flink.table.client.gateway.local.result.DynamicResult;
 import org.apache.flink.table.client.gateway.local.result.MaterializedResult;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
+import org.apache.flink.table.types.utils.DataTypeUtils;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.StringUtils;
 
@@ -611,13 +611,10 @@ public class LocalExecutor implements Executor {
 	private static TableSchema removeTimeAttributes(TableSchema schema) {
 		final TableSchema.Builder builder = TableSchema.builder();
 		for (int i = 0; i < schema.getFieldCount(); i++) {
-			final TypeInformation<?> type = schema.getFieldTypes()[i];
-			final TypeInformation<?> convertedType;
-			if (FlinkTypeFactory.isTimeIndicatorType(type)) {
-				convertedType = Types.SQL_TIMESTAMP;
-			} else {
-				convertedType = type;
-			}
+			final DataType dataType = schema.getFieldDataTypes()[i];
+			final DataType convertedType = DataTypeUtils.replaceLogicalType(
+				dataType,
+				LogicalTypeUtils.removeTimeAttributes(dataType.getLogicalType()));
 			builder.field(schema.getFieldNames()[i], convertedType);
 		}
 		return builder.build();
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeDuplicator.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeDuplicator.java
new file mode 100644
index 0000000..e417530
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeDuplicator.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Returns a deep copy of a {@link LogicalType}.
+ *
+ * <p>It also enables replacing children of possibly nested structures by overwriting corresponding
+ * {@code visit()} methods.
+ */
+@Internal
+public class LogicalTypeDuplicator extends LogicalTypeDefaultVisitor<LogicalType> {
+
+	@Override
+	public LogicalType visit(ArrayType arrayType) {
+		return new ArrayType(
+			arrayType.isNullable(),
+			arrayType.getElementType().accept(this));
+	}
+
+	@Override
+	public LogicalType visit(MultisetType multisetType) {
+		return new MultisetType(
+			multisetType.isNullable(),
+			multisetType.getElementType().accept(this));
+	}
+
+	@Override
+	public LogicalType visit(MapType mapType) {
+		return new MapType(
+			mapType.isNullable(),
+			mapType.getKeyType().accept(this),
+			mapType.getValueType().accept(this));
+	}
+
+	@Override
+	public LogicalType visit(RowType rowType) {
+		final List<RowField> fields = rowType.getFields().stream()
+			.map(f -> {
+				if (f.getDescription().isPresent()) {
+					return new RowField(
+						f.getName(),
+						f.getType().accept(this),
+						f.getDescription().get());
+				}
+				return new RowField(f.getName(), f.getType().accept(this));
+			})
+			.collect(Collectors.toList());
+
+		return new RowType(
+			rowType.isNullable(),
+			fields);
+	}
+
+	@Override
+	public LogicalType visit(DistinctType distinctType) {
+		final DistinctType.Builder builder = new DistinctType.Builder(
+			distinctType.getObjectIdentifier(),
+			distinctType.getSourceType().accept(this));
+		distinctType.getDescription().ifPresent(builder::setDescription);
+		return builder.build();
+	}
+
+	@Override
+	public LogicalType visit(StructuredType structuredType) {
+		final List<StructuredAttribute> attributes = structuredType.getAttributes().stream()
+			.map(a -> {
+				if (a.getDescription().isPresent()) {
+					return new StructuredAttribute(
+						a.getName(),
+						a.getType().accept(this),
+						a.getDescription().get());
+				}
+				return new StructuredAttribute(
+					a.getName(),
+					a.getType().accept(this));
+			})
+			.collect(Collectors.toList());
+		final StructuredType.Builder builder = new StructuredType.Builder(
+			structuredType.getObjectIdentifier(),
+			attributes);
+		builder.setNullable(structuredType.isNullable());
+		builder.setFinal(structuredType.isFinal());
+		builder.setInstantiable(structuredType.isInstantiable());
+		builder.setComparision(structuredType.getComparision());
+		structuredType.getSuperType().ifPresent(st -> {
+			final LogicalType visited = st.accept(this);
+			if (!(visited instanceof StructuredType)) {
+				throw new TableException("Unexpected super type. Structured type expected but was: " + visited);
+			}
+			builder.setSuperType((StructuredType) visited);
+		});
+		structuredType.getDescription().ifPresent(builder::setDescription);
+		structuredType.getImplementationClass().ifPresent(builder::setImplementationClass);
+		return builder.build();
+	}
+
+	@Override
+	protected LogicalType defaultMethod(LogicalType logicalType) {
+		return logicalType.copy();
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeUtils.java
new file mode 100644
index 0000000..cde6068
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeUtils.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+
+/**
+ * Utilities for handling {@link LogicalType}s.
+ */
+@Internal
+public final class LogicalTypeUtils {
+
+	private static final TimeAttributeRemover TIME_ATTRIBUTE_REMOVER = new TimeAttributeRemover();
+
+	public static LogicalType removeTimeAttributes(LogicalType logicalType) {
+		return logicalType.accept(TIME_ATTRIBUTE_REMOVER);
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private static class TimeAttributeRemover extends LogicalTypeDuplicator {
+
+		@Override
+		public LogicalType visit(TimestampType timestampType) {
+			return new TimestampType(
+				timestampType.isNullable(),
+				timestampType.getPrecision());
+		}
+
+		@Override
+		public LogicalType visit(ZonedTimestampType zonedTimestampType) {
+			return new ZonedTimestampType(
+				zonedTimestampType.isNullable(),
+				zonedTimestampType.getPrecision());
+		}
+
+		@Override
+		public LogicalType visit(LocalZonedTimestampType localZonedTimestampType) {
+			return new LocalZonedTimestampType(
+				localZonedTimestampType.isNullable(),
+				localZonedTimestampType.getPrecision());
+		}
+	}
+
+	private LogicalTypeUtils() {
+		// no instantiation
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
new file mode 100644
index 0000000..43ca734
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+/**
+ * Utilities for handling {@link DataType}s.
+ */
+@Internal
+public final class DataTypeUtils {
+
+	/**
+	 * Replaces the {@link LogicalType} of a {@link DataType}, i.e., it keeps the bridging class.
+	 */
+	public static DataType replaceLogicalType(DataType dataType, LogicalType replacement) {
+		return LogicalTypeDataTypeConverter.toDataType(replacement)
+			.bridgedTo(dataType.getConversionClass());
+	}
+
+	private DataTypeUtils() {
+		// no instantiation
+	}
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeDuplicatorTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeDuplicatorTest.java
new file mode 100644
index 0000000..ef7b3a8
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeDuplicatorTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeDuplicator;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link LogicalTypeDuplicator}.
+ */
+@RunWith(Parameterized.class)
+public class LogicalTypeDuplicatorTest {
+
+	private static final LogicalTypeDuplicator DUPLICATOR = new LogicalTypeDuplicator();
+
+	private static final LogicalTypeDuplicator INT_REPLACER = new IntReplacer();
+
+	@Parameters(name = "{index}: {0}")
+	public static List<Object[]> testData() {
+		return Arrays.asList(
+			new Object[][]{
+				{new CharType(2), new CharType(2)},
+				{createMultisetType(new IntType()), createMultisetType(new BigIntType())},
+				{createArrayType(new IntType()), createArrayType(new BigIntType())},
+				{createMapType(new IntType()), createMapType(new BigIntType())},
+				{createRowType(new IntType()), createRowType(new BigIntType())},
+				{createDistinctType(new IntType()), createDistinctType(new BigIntType())},
+				{createUserType(new IntType()), createUserType(new BigIntType())},
+				{createHumanType(), createHumanType()}
+			}
+		);
+	}
+
+	@Parameter
+	public LogicalType logicalType;
+
+	@Parameter(1)
+	public LogicalType replacedLogicalType;
+
+	@Test
+	public void testDuplication() {
+		assertThat(logicalType.accept(DUPLICATOR), equalTo(logicalType));
+	}
+
+	@Test
+	public void testReplacement() {
+		assertThat(logicalType.accept(INT_REPLACER), equalTo(replacedLogicalType));
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private static class IntReplacer extends LogicalTypeDuplicator {
+		@Override
+		public LogicalType visit(IntType intType) {
+			return new BigIntType();
+		}
+	}
+
+	private static MultisetType createMultisetType(LogicalType replacedType) {
+		return new MultisetType(new MultisetType(replacedType));
+	}
+
+	private static ArrayType createArrayType(LogicalType replacedType) {
+		return new ArrayType(new ArrayType(replacedType));
+	}
+
+	private static MapType createMapType(LogicalType replacedType) {
+		return new MapType(replacedType, new SmallIntType());
+	}
+
+	private static DistinctType createDistinctType(LogicalType replacedType) {
+		return new DistinctType.Builder(
+				ObjectIdentifier.of("cat", "db", "Money"),
+				replacedType)
+			.setDescription("Money type desc.")
+			.build();
+	}
+
+	private static RowType createRowType(LogicalType replacedType) {
+		return new RowType(
+			Arrays.asList(
+				new RowType.RowField("field1", new CharType(2)),
+				new RowType.RowField("field2", new BooleanType()),
+				new RowType.RowField("field3", replacedType)));
+	}
+
+	private static StructuredType createHumanType() {
+		return new StructuredType.Builder(
+				ObjectIdentifier.of("cat", "db", "Human"),
+				Collections.singletonList(
+					new StructuredType.StructuredAttribute("name", new VarCharType(), "Description.")))
+			.setDescription("Human type desc.")
+			.setFinal(false)
+			.setInstantiable(false)
+			.setImplementationClass(Human.class)
+			.build();
+	}
+
+	private static StructuredType createUserType(LogicalType replacedType) {
+		return new StructuredType.Builder(
+				ObjectIdentifier.of("cat", "db", "User"),
+				Collections.singletonList(
+					new StructuredType.StructuredAttribute("setting", replacedType)))
+			.setDescription("User type desc.")
+			.setFinal(false)
+			.setInstantiable(true)
+			.setImplementationClass(User.class)
+			.setSuperType(createHumanType())
+			.build();
+	}
+
+	private abstract static class Human {
+		public String name;
+	}
+
+	private static final class User extends Human {
+		public int setting;
+	}
+}