You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2020/05/16 11:00:31 UTC
[flink] 02/05: [hotfix][table-common] Make the usage of structured
types easier
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 96a02ab0d016f915cd6cc5c3952948bfc1387990
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed May 13 17:07:10 2020 +0200
[hotfix][table-common] Make the usage of structured types easier
---
.../org/apache/flink/table/types/logical/DistinctType.java | 8 +-------
.../org/apache/flink/table/types/logical/StructuredType.java | 10 ++++++----
.../org/apache/flink/table/types/logical/UserDefinedType.java | 2 +-
.../flink/table/types/logical/utils/LogicalTypeDuplicator.java | 4 ++--
.../java/org/apache/flink/table/types/LogicalTypesTest.java | 2 +-
5 files changed, 11 insertions(+), 15 deletions(-)
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DistinctType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DistinctType.java
index 3c6f261..54c4b93 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DistinctType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DistinctType.java
@@ -19,7 +19,6 @@
package org.apache.flink.table.types.logical;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.util.Preconditions;
@@ -104,15 +103,10 @@ public final class DistinctType extends UserDefinedType {
return sourceType;
}
- public ObjectIdentifier getObjectIdentifier() {
- return getOptionalObjectIdentifier()
- .orElseThrow(() -> new TableException("Object identifier expected."));
- }
-
@Override
public LogicalType copy(boolean isNullable) {
return new DistinctType(
- getObjectIdentifier(),
+ getObjectIdentifier().orElseThrow(IllegalStateException::new),
sourceType.copy(isNullable),
getDescription().orElse(null));
}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/StructuredType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/StructuredType.java
index cf8cfda..add2c62 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/StructuredType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/StructuredType.java
@@ -319,8 +319,8 @@ public final class StructuredType extends UserDefinedType {
public LogicalType copy(boolean isNullable) {
return new StructuredType(
isNullable,
- getOptionalObjectIdentifier().orElse(null),
- attributes.stream().map(StructuredAttribute::copy).collect(Collectors.toList()),
+ getObjectIdentifier().orElse(null),
+ attributes,
isFinal(),
isInstantiable,
comparision,
@@ -331,11 +331,13 @@ public final class StructuredType extends UserDefinedType {
@Override
public String asSummaryString() {
- if (getOptionalObjectIdentifier().isPresent()) {
+ if (getObjectIdentifier().isPresent()) {
return asSerializableString();
}
assert implementationClass != null;
- return implementationClass.getName();
+ // we use *class* to make it visible that this type is unregistered and not confuse it
+ // with catalog types
+ return "*" + implementationClass.getName() + "*";
}
@Override
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/UserDefinedType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/UserDefinedType.java
index f727cbd..cb1babd 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/UserDefinedType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/UserDefinedType.java
@@ -62,7 +62,7 @@ public abstract class UserDefinedType extends LogicalType {
this.description = description;
}
- public Optional<ObjectIdentifier> getOptionalObjectIdentifier() {
+ public Optional<ObjectIdentifier> getObjectIdentifier() {
return Optional.ofNullable(objectIdentifier);
}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeDuplicator.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeDuplicator.java
index 341669d..f6088d8 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeDuplicator.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeDuplicator.java
@@ -88,7 +88,7 @@ public class LogicalTypeDuplicator extends LogicalTypeDefaultVisitor<LogicalType
@Override
public LogicalType visit(DistinctType distinctType) {
final DistinctType.Builder builder = DistinctType.newBuilder(
- distinctType.getObjectIdentifier(),
+ distinctType.getObjectIdentifier().orElseThrow(IllegalStateException::new),
distinctType.getSourceType().accept(this));
distinctType.getDescription().ifPresent(builder::description);
return builder.build();
@@ -121,7 +121,7 @@ public class LogicalTypeDuplicator extends LogicalTypeDefaultVisitor<LogicalType
// --------------------------------------------------------------------------------------------
private StructuredType.Builder instantiateStructuredBuilder(StructuredType structuredType) {
- final Optional<ObjectIdentifier> identifier = structuredType.getOptionalObjectIdentifier();
+ final Optional<ObjectIdentifier> identifier = structuredType.getObjectIdentifier();
final Optional<Class<?>> implementationClass = structuredType.getImplementationClass();
if (identifier.isPresent() && implementationClass.isPresent()) {
return StructuredType.newBuilder(identifier.get(), implementationClass.get());
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
index d60059e..b79e2ae 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
@@ -674,7 +674,7 @@ public class LogicalTypesTest {
testInvalidStringSerializability(structuredType);
- testStringSummary(structuredType, User.class.getName());
+ testStringSummary(structuredType, "*" + User.class.getName() + "*");
testConversions(
structuredType,