You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/07/18 15:32:03 UTC
[flink] 05/06: [FLINK-13078][table-common] Add an unresolved
user-defined logical type
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 29fcd0c65900256222e954b7d91743053c379694
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed Jul 10 08:51:22 2019 +0200
[FLINK-13078][table-common] Add an unresolved user-defined logical type
---
.../flink/table/types/logical/LogicalTypeRoot.java | 3 +
.../table/types/logical/LogicalTypeVisitor.java | 4 +-
.../types/logical/UnresolvedUserDefinedType.java | 153 +++++++++++++++++++++
.../apache/flink/table/types/LogicalTypesTest.java | 11 ++
4 files changed, 169 insertions(+), 2 deletions(-)
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeRoot.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeRoot.java
index 4725876..6ff360a 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeRoot.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeRoot.java
@@ -156,6 +156,9 @@ public enum LogicalTypeRoot {
LogicalTypeFamily.EXTENSION),
SYMBOL(
+ LogicalTypeFamily.EXTENSION),
+
+ UNRESOLVED(
LogicalTypeFamily.EXTENSION);
private final Set<LogicalTypeFamily> families;
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java
index 5bf5858..810174f 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java
@@ -24,8 +24,8 @@ import org.apache.flink.annotation.PublicEvolving;
* The visitor definition of {@link LogicalType}. The visitor transforms a logical type into
* instances of {@code R}.
*
- * <p>Incomplete types such as the {@link TypeInformationAnyType} are visited through the generic
- * {@link #visit(LogicalType)}.
+ * <p>Incomplete types such as the {@link TypeInformationAnyType} or {@link UnresolvedUserDefinedType} are visited
+ * through the generic {@link #visit(LogicalType)}.
*
* @param <R> result type
*/
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/UnresolvedUserDefinedType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/UnresolvedUserDefinedType.java
new file mode 100644
index 0000000..2168c8e
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/UnresolvedUserDefinedType.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.utils.EncodingUtils;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Placeholder type of an unresolved user-defined type that is identified by a partially or fully
+ * qualified path ({@code [catalog].[database].[type]}).
+ *
+ * <p>It assumes that a type has been registered in a catalog and just needs to be resolved to a
+ * {@link DistinctType} or {@link StructuredType}.
+ *
+ * <p>Two unresolved types are considered equal if they share the same path in a stable session context.
+ *
+ * @see UserDefinedType
+ */
+@PublicEvolving
+public final class UnresolvedUserDefinedType extends LogicalType {
+
+ private final @Nullable String catalog;
+
+ private final @Nullable String database;
+
+ private final String typeIdentifier;
+
+ public UnresolvedUserDefinedType(
+ boolean isNullable,
+ @Nullable String catalog,
+ @Nullable String database,
+ String typeIdentifier) {
+ super(isNullable, LogicalTypeRoot.UNRESOLVED);
+ this.catalog = catalog;
+ this.database = database;
+ this.typeIdentifier = Preconditions.checkNotNull(
+ typeIdentifier,
+ "Type identifier must not be null.");
+ }
+
+ public UnresolvedUserDefinedType(
+ @Nullable String catalog,
+ @Nullable String database,
+ String typeIdentifier) {
+ this(true, catalog, database, typeIdentifier);
+ }
+
+ public Optional<String> getCatalog() {
+ return Optional.ofNullable(catalog);
+ }
+
+ public Optional<String> getDatabase() {
+ return Optional.ofNullable(database);
+ }
+
+ public String getTypeIdentifier() {
+ return typeIdentifier;
+ }
+
+ @Override
+ public LogicalType copy(boolean isNullable) {
+ return new UnresolvedUserDefinedType(isNullable, catalog, database, typeIdentifier);
+ }
+
+ @Override
+ public String asSummaryString() {
+ final String path = Stream.of(catalog, database, typeIdentifier)
+ .filter(Objects::nonNull)
+ .map(EncodingUtils::escapeIdentifier)
+ .collect(Collectors.joining("."));
+ return withNullability(path);
+ }
+
+ @Override
+ public String asSerializableString() {
+ throw new TableException(
+ "An unresolved user-defined type has no serializable string representation. It " +
+ "needs to be resolved into a proper user-defined type.");
+ }
+
+ @Override
+ public boolean supportsInputConversion(Class<?> clazz) {
+ throw new TableException("An unresolved user-defined type does not support any input conversion.");
+ }
+
+ @Override
+ public boolean supportsOutputConversion(Class<?> clazz) {
+ throw new TableException("An unresolved user-defined type does not support any output conversion.");
+ }
+
+ @Override
+ public Class<?> getDefaultConversion() {
+ throw new TableException("An unresolved user-defined type has no default conversion.");
+ }
+
+ @Override
+ public List<LogicalType> getChildren() {
+ throw new TableException("An unresolved user-defined type cannot return children.");
+ }
+
+ @Override
+ public <R> R accept(LogicalTypeVisitor<R> visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ UnresolvedUserDefinedType that = (UnresolvedUserDefinedType) o;
+ return Objects.equals(catalog, that.catalog) &&
+ Objects.equals(database, that.database) &&
+ typeIdentifier.equals(that.typeIdentifier);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), catalog, database, typeIdentifier);
+ }
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
index 49feb47..6bbbc80 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
@@ -55,6 +55,7 @@ import org.apache.flink.table.types.logical.TimestampKind;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.TypeInformationAnyType;
+import org.apache.flink.table.types.logical.UnresolvedUserDefinedType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.YearMonthIntervalType;
@@ -610,6 +611,16 @@ public class LogicalTypesTest {
}
@Test
+ public void testUnresolvedUserDefinedType() {
+ final UnresolvedUserDefinedType unresolvedType =
+ new UnresolvedUserDefinedType("catalog", "database", "Type");
+
+ testEquality(unresolvedType, new UnresolvedUserDefinedType("different", "database", "Type"));
+
+ testStringSummary(unresolvedType, "`catalog`.`database`.`Type`");
+ }
+
+ @Test
public void testEmptyStringLiterals() {
final CharType charType = CharType.ofEmptyLiteral();
final VarCharType varcharType = VarCharType.ofEmptyLiteral();