You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/07/18 15:32:03 UTC

[flink] 05/06: [FLINK-13078][table-common] Add an unresolved user-defined logical type

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 29fcd0c65900256222e954b7d91743053c379694
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed Jul 10 08:51:22 2019 +0200

    [FLINK-13078][table-common] Add an unresolved user-defined logical type
---
 .../flink/table/types/logical/LogicalTypeRoot.java |   3 +
 .../table/types/logical/LogicalTypeVisitor.java    |   4 +-
 .../types/logical/UnresolvedUserDefinedType.java   | 153 +++++++++++++++++++++
 .../apache/flink/table/types/LogicalTypesTest.java |  11 ++
 4 files changed, 169 insertions(+), 2 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeRoot.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeRoot.java
index 4725876..6ff360a 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeRoot.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeRoot.java
@@ -156,6 +156,9 @@ public enum LogicalTypeRoot {
 		LogicalTypeFamily.EXTENSION),
 
 	SYMBOL(
+		LogicalTypeFamily.EXTENSION),
+
+	UNRESOLVED(
 		LogicalTypeFamily.EXTENSION);
 
 	private final Set<LogicalTypeFamily> families;
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java
index 5bf5858..810174f 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java
@@ -24,8 +24,8 @@ import org.apache.flink.annotation.PublicEvolving;
  * The visitor definition of {@link LogicalType}. The visitor transforms a logical type into
  * instances of {@code R}.
  *
- * <p>Incomplete types such as the {@link TypeInformationAnyType} are visited through the generic
- * {@link #visit(LogicalType)}.
+ * <p>Incomplete types such as the {@link TypeInformationAnyType} or {@link UnresolvedUserDefinedType} are visited
+ * through the generic {@link #visit(LogicalType)}.
  *
  * @param <R> result type
  */
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/UnresolvedUserDefinedType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/UnresolvedUserDefinedType.java
new file mode 100644
index 0000000..2168c8e
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/UnresolvedUserDefinedType.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.utils.EncodingUtils;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Placeholder type of an unresolved user-defined type that is identified by a partially or fully
+ * qualified path ({@code [catalog].[database].[type]}).
+ *
+ * <p>It assumes that a type has been registered in a catalog and just needs to be resolved to a
+ * {@link DistinctType} or {@link StructuredType}.
+ *
+ * <p>Two unresolved types are considered equal if they share the same path in a stable session context.
+ *
+ * @see UserDefinedType
+ */
+@PublicEvolving
+public final class UnresolvedUserDefinedType extends LogicalType {
+
+	private final @Nullable String catalog;
+
+	private final @Nullable String database;
+
+	private final String typeIdentifier;
+
+	public UnresolvedUserDefinedType(
+			boolean isNullable,
+			@Nullable String catalog,
+			@Nullable String database,
+			String typeIdentifier) {
+		super(isNullable, LogicalTypeRoot.UNRESOLVED);
+		this.catalog = catalog;
+		this.database = database;
+		this.typeIdentifier = Preconditions.checkNotNull(
+			typeIdentifier,
+			"Type identifier must not be null.");
+	}
+
+	public UnresolvedUserDefinedType(
+			@Nullable String catalog,
+			@Nullable String database,
+			String typeIdentifier) {
+		this(true, catalog, database, typeIdentifier);
+	}
+
+	public Optional<String> getCatalog() {
+		return Optional.ofNullable(catalog);
+	}
+
+	public Optional<String> getDatabase() {
+		return Optional.ofNullable(database);
+	}
+
+	public String getTypeIdentifier() {
+		return typeIdentifier;
+	}
+
+	@Override
+	public LogicalType copy(boolean isNullable) {
+		return new UnresolvedUserDefinedType(isNullable, catalog, database, typeIdentifier);
+	}
+
+	@Override
+	public String asSummaryString() {
+		final String path = Stream.of(catalog, database, typeIdentifier)
+			.filter(Objects::nonNull)
+			.map(EncodingUtils::escapeIdentifier)
+			.collect(Collectors.joining("."));
+		return withNullability(path);
+	}
+
+	@Override
+	public String asSerializableString() {
+		throw new TableException(
+			"An unresolved user-defined type has no serializable string representation. It " +
+				"needs to be resolved into a proper user-defined type.");
+	}
+
+	@Override
+	public boolean supportsInputConversion(Class<?> clazz) {
+		throw new TableException("An unresolved user-defined type does not support any input conversion.");
+	}
+
+	@Override
+	public boolean supportsOutputConversion(Class<?> clazz) {
+		throw new TableException("An unresolved user-defined type does not support any output conversion.");
+	}
+
+	@Override
+	public Class<?> getDefaultConversion() {
+		throw new TableException("An unresolved user-defined type has no default conversion.");
+	}
+
+	@Override
+	public List<LogicalType> getChildren() {
+		throw new TableException("An unresolved user-defined type cannot return children.");
+	}
+
+	@Override
+	public <R> R accept(LogicalTypeVisitor<R> visitor) {
+		return visitor.visit(this);
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		if (!super.equals(o)) {
+			return false;
+		}
+		UnresolvedUserDefinedType that = (UnresolvedUserDefinedType) o;
+		return Objects.equals(catalog, that.catalog) &&
+			Objects.equals(database, that.database) &&
+			typeIdentifier.equals(that.typeIdentifier);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(super.hashCode(), catalog, database, typeIdentifier);
+	}
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
index 49feb47..6bbbc80 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
@@ -55,6 +55,7 @@ import org.apache.flink.table.types.logical.TimestampKind;
 import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.table.types.logical.TinyIntType;
 import org.apache.flink.table.types.logical.TypeInformationAnyType;
+import org.apache.flink.table.types.logical.UnresolvedUserDefinedType;
 import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.table.types.logical.YearMonthIntervalType;
@@ -610,6 +611,16 @@ public class LogicalTypesTest {
 	}
 
 	@Test
+	public void testUnresolvedUserDefinedType() {
+		final UnresolvedUserDefinedType unresolvedType =
+			new UnresolvedUserDefinedType("catalog", "database", "Type");
+
+		testEquality(unresolvedType, new UnresolvedUserDefinedType("different", "database", "Type"));
+
+		testStringSummary(unresolvedType, "`catalog`.`database`.`Type`");
+	}
+
+	@Test
 	public void testEmptyStringLiterals() {
 		final CharType charType = CharType.ofEmptyLiteral();
 		final VarCharType varcharType = VarCharType.ofEmptyLiteral();