You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/05/06 09:53:30 UTC
[flink] branch master updated: [FLINK-12253][table-common] Add an
ANY type
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c384aea [FLINK-12253][table-common] Add an ANY type
c384aea is described below
commit c384aea8116aa9ad514bbb59fd8578bb285a4e6f
Author: Timo Walther <tw...@apache.org>
AuthorDate: Thu May 2 14:51:54 2019 +0200
[FLINK-12253][table-common] Add an ANY type
---
.../apache/flink/table/types/logical/AnyType.java | 157 +++++++++++++++++++++
.../table/types/logical/LogicalTypeVisitor.java | 5 +
.../apache/flink/table/types/logical/NullType.java | 4 +-
.../types/logical/TypeInformationAnyType.java | 148 +++++++++++++++++++
.../apache/flink/table/utils/EncodingUtils.java | 6 +-
.../apache/flink/table/types/LogicalTypesTest.java | 44 ++++++
6 files changed, 361 insertions(+), 3 deletions(-)
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/AnyType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/AnyType.java
new file mode 100644
index 0000000..4f17539
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/AnyType.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.utils.EncodingUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Logical type of an arbitrary serialized type. This type is a black box within the table ecosystem
+ * and is only deserialized at the edges. The any type is an extension to the SQL standard.
+ *
+ * <p>The serialized string representation is {@code ANY(c, s)} where {@code c} is the originating
+ * class and {@code s} is the serialized {@link TypeSerializerSnapshot} in Base64 encoding.
+ *
+ * @param <T> originating class for this type
+ */
+@PublicEvolving
+public final class AnyType<T> extends LogicalType {
+
+ private static final String FORMAT = "ANY(%s, %s)";
+
+ private static final Set<String> INPUT_OUTPUT_CONVERSION = conversionSet(
+ byte[].class.getName(),
+ "org.apache.flink.table.dataformat.BinaryGeneric");
+
+ private final Class<T> clazz;
+
+ private final TypeSerializer<T> serializer;
+
+ private transient String serializerString;
+
+ public AnyType(boolean isNullable, Class<T> clazz, TypeSerializer<T> serializer) {
+ super(isNullable, LogicalTypeRoot.ANY);
+ this.clazz = Preconditions.checkNotNull(clazz, "Class must not be null.");
+ this.serializer = Preconditions.checkNotNull(serializer, "Serializer must not be null.");
+ }
+
+ public AnyType(Class<T> clazz, TypeSerializer<T> serializer) {
+ this(true, clazz, serializer);
+ }
+
+ public Class<T> getOriginatingClass() {
+ return clazz;
+ }
+
+ public TypeSerializer<T> getTypeSerializer() {
+ return serializer;
+ }
+
+ @Override
+ public LogicalType copy(boolean isNullable) {
+ return new AnyType<>(isNullable, clazz, serializer.duplicate());
+ }
+
+ @Override
+ public String asSummaryString() {
+ return withNullability(FORMAT, clazz.getName(), "...");
+ }
+
+ @Override
+ public String asSerializableString() {
+ return withNullability(FORMAT, clazz.getName(), getOrCreateSerializerString());
+ }
+
+ @Override
+ public boolean supportsInputConversion(Class<?> clazz) {
+ return this.clazz.isAssignableFrom(clazz) ||
+ INPUT_OUTPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public boolean supportsOutputConversion(Class<?> clazz) {
+ return clazz.isAssignableFrom(this.clazz) ||
+ INPUT_OUTPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public Class<?> getDefaultConversion() {
+ return clazz;
+ }
+
+ @Override
+ public List<LogicalType> getChildren() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public <R> R accept(LogicalTypeVisitor<R> visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ AnyType<?> anyType = (AnyType<?>) o;
+ return clazz.equals(anyType.clazz) && serializer.equals(anyType.serializer);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), clazz, serializer);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private String getOrCreateSerializerString() {
+ if (serializerString == null) {
+ final DataOutputSerializer outputSerializer = new DataOutputSerializer(128);
+ try {
+ serializer.snapshotConfiguration().writeSnapshot(outputSerializer);
+ serializerString = EncodingUtils.encodeBytesToBase64(outputSerializer.getCopyOfBuffer());
+ return serializerString;
+ } catch (Exception e) {
+ throw new TableException(String.format(
+ "Unable to generate a string representation of the serializer snapshot of '%s' " +
+ "describing the class '%s' for the ANY type.",
+ serializer.getClass().getName(),
+ clazz.toString()));
+ }
+ }
+ return serializerString;
+ }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java
index bab6a96..9a3190f 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java
@@ -24,6 +24,9 @@ import org.apache.flink.annotation.PublicEvolving;
* The visitor definition of {@link LogicalType}. The visitor transforms a logical type into
* instances of {@code R}.
*
+ * <p>Incomplete types such as the {@link TypeInformationAnyType} are visited through the generic
+ * {@link #visit(LogicalType)}.
+ *
* @param <R> result type
*/
@PublicEvolving
@@ -81,5 +84,7 @@ public interface LogicalTypeVisitor<R> {
R visit(NullType nullType);
+ R visit(AnyType anyType);
+
R visit(LogicalType other);
}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/NullType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/NullType.java
index 827fc28..17cbc69 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/NullType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/NullType.java
@@ -37,7 +37,7 @@ import java.util.List;
@PublicEvolving
public final class NullType extends LogicalType {
- private static final String DEFAULT_FORMAT = "NULL";
+ private static final String FORMAT = "NULL";
private static final Class<?> INPUT_CONVERSION = Object.class;
@@ -59,7 +59,7 @@ public final class NullType extends LogicalType {
@Override
public String asSerializableString() {
- return DEFAULT_FORMAT;
+ return FORMAT;
}
@Override
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TypeInformationAnyType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TypeInformationAnyType.java
new file mode 100644
index 0000000..2c1c0dc
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TypeInformationAnyType.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Placeholder type of an arbitrary serialized type backed by {@link TypeInformation}. This type is
+ * a black box within the table ecosystem and is only deserialized at the edges. The any type is an
+ * extension to the SQL standard.
+ *
+ * <p>Compared to an {@link AnyType}, this type does not contain a {@link TypeSerializer} yet. The
+ * serializer will be generated from the enclosed {@link TypeInformation} but needs access to the
+ * {@link ExecutionConfig} of the current execution environment. Thus, this type is just a placeholder
+ * for the fully resolved {@link AnyType} returned by {@link #resolve(ExecutionConfig)}.
+ *
+ * <p>This type has no serializable string representation.
+ *
+ * <p>If no type information is supplied, generic type serialization for {@link Object} is used.
+ */
+@PublicEvolving
+public final class TypeInformationAnyType<T> extends LogicalType {
+
+ private static final String FORMAT = "ANY(%s, ?)";
+
+ private static final Set<String> INPUT_OUTPUT_CONVERSION = conversionSet(
+ byte[].class.getName(),
+ "org.apache.flink.table.dataformat.BinaryGeneric");
+
+ private static final TypeInformation<?> DEFAULT_TYPE_INFO = Types.GENERIC(Object.class);
+
+ private final TypeInformation<T> typeInfo;
+
+ public TypeInformationAnyType(boolean isNullable, TypeInformation<T> typeInfo) {
+ super(isNullable, LogicalTypeRoot.ANY);
+ this.typeInfo = Preconditions.checkNotNull(typeInfo, "Type information must not be null.");
+ }
+
+ public TypeInformationAnyType(TypeInformation<T> typeInfo) {
+ this(true, typeInfo);
+ }
+
+ @SuppressWarnings("unchecked")
+ public TypeInformationAnyType() {
+ this(true, (TypeInformation<T>) DEFAULT_TYPE_INFO);
+ }
+
+ public TypeInformation<T> getTypeInformation() {
+ return typeInfo;
+ }
+
+ @Internal
+ public AnyType<T> resolve(ExecutionConfig config) {
+ return new AnyType<>(isNullable(), typeInfo.getTypeClass(), typeInfo.createSerializer(config));
+ }
+
+ @Override
+ public LogicalType copy(boolean isNullable) {
+ return new TypeInformationAnyType<>(isNullable, typeInfo); // we must assume immutability here
+ }
+
+ @Override
+ public String asSummaryString() {
+ return withNullability(FORMAT, typeInfo.getTypeClass().getName());
+ }
+
+ @Override
+ public String asSerializableString() {
+ throw new TableException(
+ "An any type backed by type information has no serializable string representation. It " +
+ "needs to be resolved into a proper any type.");
+ }
+
+ @Override
+ public boolean supportsInputConversion(Class<?> clazz) {
+ return typeInfo.getTypeClass().isAssignableFrom(clazz) ||
+ INPUT_OUTPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public boolean supportsOutputConversion(Class<?> clazz) {
+ return clazz.isAssignableFrom(typeInfo.getTypeClass()) ||
+ INPUT_OUTPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public Class<?> getDefaultConversion() {
+ return typeInfo.getTypeClass();
+ }
+
+ @Override
+ public List<LogicalType> getChildren() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public <R> R accept(LogicalTypeVisitor<R> visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ TypeInformationAnyType<?> that = (TypeInformationAnyType<?>) o;
+ return typeInfo.equals(that.typeInfo);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), typeInfo);
+ }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java
index 5531082..35e57ae 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java
@@ -93,8 +93,12 @@ public abstract class EncodingUtils {
return loadClass(qualifiedName, Thread.currentThread().getContextClassLoader());
}
+ public static String encodeBytesToBase64(byte[] bytes) {
+ return new String(java.util.Base64.getEncoder().encode(bytes), UTF_8);
+ }
+
public static String encodeStringToBase64(String string) {
- return new String(java.util.Base64.getEncoder().encode(string.getBytes(UTF_8)), UTF_8);
+ return encodeBytesToBase64(string.getBytes(UTF_8));
}
public static String decodeBase64ToString(String base64) {
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
index ec452a8..e03e15d 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
@@ -18,6 +18,12 @@
package org.apache.flink.table.types;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.table.types.logical.AnyType;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BinaryType;
@@ -41,6 +47,7 @@ import org.apache.flink.table.types.logical.StructuredType;
import org.apache.flink.table.types.logical.TimeType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.TypeInformationAnyType;
import org.apache.flink.table.types.logical.UserDefinedType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;
@@ -454,6 +461,43 @@ public class LogicalTypesTest {
assertFalse(nullType.supportsOutputConversion(int.class));
}
+ @Test
+ public void testTypeInformationAnyType() {
+ final TypeInformationAnyType<?> anyType = new TypeInformationAnyType<>(Types.TUPLE(Types.STRING, Types.INT));
+
+ testEquality(anyType, new TypeInformationAnyType<>(Types.TUPLE(Types.STRING, Types.LONG)));
+
+ testStringSummary(anyType, "ANY(org.apache.flink.api.java.tuple.Tuple2, ?)");
+
+ testNullability(anyType);
+
+ testJavaSerializability(anyType);
+
+ testConversions(anyType, new Class[]{Tuple2.class}, new Class[]{Tuple.class});
+ }
+
+ @Test
+ public void testAnyType() {
+ testAll(
+ new AnyType<>(Human.class, new KryoSerializer<>(Human.class, new ExecutionConfig())),
+ "ANY(org.apache.flink.table.types.LogicalTypesTest$Human, " +
+ "ADNvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLkxvZ2ljYWxUeXBlc1Rlc3QkSHVtYW4AAATyxpo9cAA" +
+ "AAAIAM29yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuTG9naWNhbFR5cGVzVGVzdCRIdW1hbgEAAAA1AD" +
+ "NvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLkxvZ2ljYWxUeXBlc1Rlc3QkSHVtYW4BAAAAOQAzb3JnL" +
+ "mFwYWNoZS5mbGluay50YWJsZS50eXBlcy5Mb2dpY2FsVHlwZXNUZXN0JEh1bWFuAAAAAAApb3JnLmFwYWNo" +
+ "ZS5hdnJvLmdlbmVyaWMuR2VuZXJpY0RhdGEkQXJyYXkBAAAAKwApb3JnLmFwYWNoZS5hdnJvLmdlbmVyaWM" +
+ "uR2VuZXJpY0RhdGEkQXJyYXkBAAAAtgBVb3JnLmFwYWNoZS5mbGluay5hcGkuamF2YS50eXBldXRpbHMucn" +
+ "VudGltZS5rcnlvLlNlcmlhbGl6ZXJzJER1bW15QXZyb1JlZ2lzdGVyZWRDbGFzcwAAAAEAWW9yZy5hcGFja" +
+ "GUuZmxpbmsuYXBpLmphdmEudHlwZXV0aWxzLnJ1bnRpbWUua3J5by5TZXJpYWxpemVycyREdW1teUF2cm9L" +
+ "cnlvU2VyaWFsaXplckNsYXNzAAAE8saaPXAAAAAAAAAE8saaPXAAAAAA)",
+ "ANY(org.apache.flink.table.types.LogicalTypesTest$Human, ...)",
+ new Class[]{Human.class, User.class}, // every User is Human
+ new Class[]{Human.class},
+ new LogicalType[]{},
+ new AnyType<>(User.class, new KryoSerializer<>(User.class, new ExecutionConfig()))
+ );
+ }
+
// --------------------------------------------------------------------------------------------
private static void testAll(