You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/05/06 09:53:30 UTC

[flink] branch master updated: [FLINK-12253][table-common] Add an ANY type

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c384aea  [FLINK-12253][table-common] Add an ANY type
c384aea is described below

commit c384aea8116aa9ad514bbb59fd8578bb285a4e6f
Author: Timo Walther <tw...@apache.org>
AuthorDate: Thu May 2 14:51:54 2019 +0200

    [FLINK-12253][table-common] Add an ANY type
---
 .../apache/flink/table/types/logical/AnyType.java  | 157 +++++++++++++++++++++
 .../table/types/logical/LogicalTypeVisitor.java    |   5 +
 .../apache/flink/table/types/logical/NullType.java |   4 +-
 .../types/logical/TypeInformationAnyType.java      | 148 +++++++++++++++++++
 .../apache/flink/table/utils/EncodingUtils.java    |   6 +-
 .../apache/flink/table/types/LogicalTypesTest.java |  44 ++++++
 6 files changed, 361 insertions(+), 3 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/AnyType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/AnyType.java
new file mode 100644
index 0000000..4f17539
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/AnyType.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.utils.EncodingUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Logical type of an arbitrary serialized type. This type is a black box within the table ecosystem
+ * and is only deserialized at the edges. The any type is an extension to the SQL standard.
+ *
+ * <p>The serialized string representation is {@code ANY(c, s)} where {@code c} is the originating
+ * class and {@code s} is the serialized {@link TypeSerializerSnapshot} in Base64 encoding.
+ *
+ * @param <T> originating class for this type
+ */
+@PublicEvolving
+public final class AnyType<T> extends LogicalType {
+
+	private static final String FORMAT = "ANY(%s, %s)";
+
+	private static final Set<String> INPUT_OUTPUT_CONVERSION = conversionSet(
+		byte[].class.getName(),
+		"org.apache.flink.table.dataformat.BinaryGeneric");
+
+	private final Class<T> clazz;
+
+	private final TypeSerializer<T> serializer;
+
+	private transient String serializerString;
+
+	public AnyType(boolean isNullable, Class<T> clazz, TypeSerializer<T> serializer) {
+		super(isNullable, LogicalTypeRoot.ANY);
+		this.clazz = Preconditions.checkNotNull(clazz, "Class must not be null.");
+		this.serializer = Preconditions.checkNotNull(serializer, "Serializer must not be null.");
+	}
+
+	public AnyType(Class<T> clazz, TypeSerializer<T> serializer) {
+		this(true, clazz, serializer);
+	}
+
+	public Class<T> getOriginatingClass() {
+		return clazz;
+	}
+
+	public TypeSerializer<T> getTypeSerializer() {
+		return serializer;
+	}
+
+	@Override
+	public LogicalType copy(boolean isNullable) {
+		return new AnyType<>(isNullable, clazz, serializer.duplicate());
+	}
+
+	@Override
+	public String asSummaryString() {
+		return withNullability(FORMAT, clazz.getName(), "...");
+	}
+
+	@Override
+	public String asSerializableString() {
+		return withNullability(FORMAT, clazz.getName(), getOrCreateSerializerString());
+	}
+
+	@Override
+	public boolean supportsInputConversion(Class<?> clazz) {
+		return this.clazz.isAssignableFrom(clazz) ||
+			INPUT_OUTPUT_CONVERSION.contains(clazz.getName());
+	}
+
+	@Override
+	public boolean supportsOutputConversion(Class<?> clazz) {
+		return clazz.isAssignableFrom(this.clazz) ||
+			INPUT_OUTPUT_CONVERSION.contains(clazz.getName());
+	}
+
+	@Override
+	public Class<?> getDefaultConversion() {
+		return clazz;
+	}
+
+	@Override
+	public List<LogicalType> getChildren() {
+		return Collections.emptyList();
+	}
+
+	@Override
+	public <R> R accept(LogicalTypeVisitor<R> visitor) {
+		return visitor.visit(this);
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		if (!super.equals(o)) {
+			return false;
+		}
+		AnyType<?> anyType = (AnyType<?>) o;
+		return clazz.equals(anyType.clazz) && serializer.equals(anyType.serializer);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(super.hashCode(), clazz, serializer);
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private String getOrCreateSerializerString() {
+		if (serializerString == null) {
+			final DataOutputSerializer outputSerializer = new DataOutputSerializer(128);
+			try {
+				serializer.snapshotConfiguration().writeSnapshot(outputSerializer);
+				serializerString = EncodingUtils.encodeBytesToBase64(outputSerializer.getCopyOfBuffer());
+				return serializerString;
+			} catch (Exception e) {
+				throw new TableException(String.format(
+					"Unable to generate a string representation of the serializer snapshot of '%s' " +
+						"describing the class '%s' for the ANY type.",
+					serializer.getClass().getName(),
+					clazz.toString()));
+			}
+		}
+		return serializerString;
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java
index bab6a96..9a3190f 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java
@@ -24,6 +24,9 @@ import org.apache.flink.annotation.PublicEvolving;
  * The visitor definition of {@link LogicalType}. The visitor transforms a logical type into
  * instances of {@code R}.
  *
+ * <p>Incomplete types such as the {@link TypeInformationAnyType} are visited through the generic
+ * {@link #visit(LogicalType)}.
+ *
  * @param <R> result type
  */
 @PublicEvolving
@@ -81,5 +84,7 @@ public interface LogicalTypeVisitor<R> {
 
 	R visit(NullType nullType);
 
+	R visit(AnyType anyType);
+
 	R visit(LogicalType other);
 }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/NullType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/NullType.java
index 827fc28..17cbc69 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/NullType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/NullType.java
@@ -37,7 +37,7 @@ import java.util.List;
 @PublicEvolving
 public final class NullType extends LogicalType {
 
-	private static final String DEFAULT_FORMAT = "NULL";
+	private static final String FORMAT = "NULL";
 
 	private static final Class<?> INPUT_CONVERSION = Object.class;
 
@@ -59,7 +59,7 @@ public final class NullType extends LogicalType {
 
 	@Override
 	public String asSerializableString() {
-		return DEFAULT_FORMAT;
+		return FORMAT;
 	}
 
 	@Override
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TypeInformationAnyType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TypeInformationAnyType.java
new file mode 100644
index 0000000..2c1c0dc
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TypeInformationAnyType.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Placeholder type of an arbitrary serialized type backed by {@link TypeInformation}. This type is
+ * a black box within the table ecosystem and is only deserialized at the edges. The any type is an
+ * extension to the SQL standard.
+ *
+ * <p>Compared to an {@link AnyType}, this type does not contain a {@link TypeSerializer} yet. The
+ * serializer will be generated from the enclosed {@link TypeInformation} but needs access to the
+ * {@link ExecutionConfig} of the current execution environment. Thus, this type is just a placeholder
+ * for the fully resolved {@link AnyType} returned by {@link #resolve(ExecutionConfig)}.
+ *
+ * <p>This type has no serializable string representation.
+ *
+ * <p>If no type information is supplied, generic type serialization for {@link Object} is used.
+ */
+@PublicEvolving
+public final class TypeInformationAnyType<T> extends LogicalType {
+
+	private static final String FORMAT = "ANY(%s, ?)";
+
+	private static final Set<String> INPUT_OUTPUT_CONVERSION = conversionSet(
+		byte[].class.getName(),
+		"org.apache.flink.table.dataformat.BinaryGeneric");
+
+	private static final TypeInformation<?> DEFAULT_TYPE_INFO = Types.GENERIC(Object.class);
+
+	private final TypeInformation<T> typeInfo;
+
+	public TypeInformationAnyType(boolean isNullable, TypeInformation<T> typeInfo) {
+		super(isNullable, LogicalTypeRoot.ANY);
+		this.typeInfo = Preconditions.checkNotNull(typeInfo, "Type information must not be null.");
+	}
+
+	public TypeInformationAnyType(TypeInformation<T> typeInfo) {
+		this(true, typeInfo);
+	}
+
+	@SuppressWarnings("unchecked")
+	public TypeInformationAnyType() {
+		this(true, (TypeInformation<T>) DEFAULT_TYPE_INFO);
+	}
+
+	public TypeInformation<T> getTypeInformation() {
+		return typeInfo;
+	}
+
+	@Internal
+	public AnyType<T> resolve(ExecutionConfig config) {
+		return new AnyType<>(isNullable(), typeInfo.getTypeClass(), typeInfo.createSerializer(config));
+	}
+
+	@Override
+	public LogicalType copy(boolean isNullable) {
+		return new TypeInformationAnyType<>(isNullable, typeInfo); // we must assume immutability here
+	}
+
+	@Override
+	public String asSummaryString() {
+		return withNullability(FORMAT, typeInfo.getTypeClass().getName());
+	}
+
+	@Override
+	public String asSerializableString() {
+		throw new TableException(
+			"An any type backed by type information has no serializable string representation. It " +
+				"needs to be resolved into a proper any type.");
+	}
+
+	@Override
+	public boolean supportsInputConversion(Class<?> clazz) {
+		return typeInfo.getTypeClass().isAssignableFrom(clazz) ||
+			INPUT_OUTPUT_CONVERSION.contains(clazz.getName());
+	}
+
+	@Override
+	public boolean supportsOutputConversion(Class<?> clazz) {
+		return clazz.isAssignableFrom(typeInfo.getTypeClass()) ||
+			INPUT_OUTPUT_CONVERSION.contains(clazz.getName());
+	}
+
+	@Override
+	public Class<?> getDefaultConversion() {
+		return typeInfo.getTypeClass();
+	}
+
+	@Override
+	public List<LogicalType> getChildren() {
+		return Collections.emptyList();
+	}
+
+	@Override
+	public <R> R accept(LogicalTypeVisitor<R> visitor) {
+		return visitor.visit(this);
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		if (!super.equals(o)) {
+			return false;
+		}
+		TypeInformationAnyType<?> that = (TypeInformationAnyType<?>) o;
+		return typeInfo.equals(that.typeInfo);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(super.hashCode(), typeInfo);
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java
index 5531082..35e57ae 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java
@@ -93,8 +93,12 @@ public abstract class EncodingUtils {
 		return loadClass(qualifiedName, Thread.currentThread().getContextClassLoader());
 	}
 
+	public static String encodeBytesToBase64(byte[] bytes) {
+		return new String(java.util.Base64.getEncoder().encode(bytes), UTF_8);
+	}
+
 	public static String encodeStringToBase64(String string) {
-		return new String(java.util.Base64.getEncoder().encode(string.getBytes(UTF_8)), UTF_8);
+		return encodeBytesToBase64(string.getBytes(UTF_8));
 	}
 
 	public static String decodeBase64ToString(String base64) {
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
index ec452a8..e03e15d 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
@@ -18,6 +18,12 @@
 
 package org.apache.flink.table.types;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.table.types.logical.AnyType;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.BinaryType;
@@ -41,6 +47,7 @@ import org.apache.flink.table.types.logical.StructuredType;
 import org.apache.flink.table.types.logical.TimeType;
 import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.TypeInformationAnyType;
 import org.apache.flink.table.types.logical.UserDefinedType;
 import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.table.types.logical.VarCharType;
@@ -454,6 +461,43 @@ public class LogicalTypesTest {
 		assertFalse(nullType.supportsOutputConversion(int.class));
 	}
 
+	@Test
+	public void testTypeInformationAnyType() {
+		final TypeInformationAnyType<?> anyType = new TypeInformationAnyType<>(Types.TUPLE(Types.STRING, Types.INT));
+
+		testEquality(anyType, new TypeInformationAnyType<>(Types.TUPLE(Types.STRING, Types.LONG)));
+
+		testStringSummary(anyType, "ANY(org.apache.flink.api.java.tuple.Tuple2, ?)");
+
+		testNullability(anyType);
+
+		testJavaSerializability(anyType);
+
+		testConversions(anyType, new Class[]{Tuple2.class}, new Class[]{Tuple.class});
+	}
+
+	@Test
+	public void testAnyType() {
+		testAll(
+			new AnyType<>(Human.class, new KryoSerializer<>(Human.class, new ExecutionConfig())),
+				"ANY(org.apache.flink.table.types.LogicalTypesTest$Human, " +
+					"ADNvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLkxvZ2ljYWxUeXBlc1Rlc3QkSHVtYW4AAATyxpo9cAA" +
+					"AAAIAM29yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuTG9naWNhbFR5cGVzVGVzdCRIdW1hbgEAAAA1AD" +
+					"NvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLkxvZ2ljYWxUeXBlc1Rlc3QkSHVtYW4BAAAAOQAzb3JnL" +
+					"mFwYWNoZS5mbGluay50YWJsZS50eXBlcy5Mb2dpY2FsVHlwZXNUZXN0JEh1bWFuAAAAAAApb3JnLmFwYWNo" +
+					"ZS5hdnJvLmdlbmVyaWMuR2VuZXJpY0RhdGEkQXJyYXkBAAAAKwApb3JnLmFwYWNoZS5hdnJvLmdlbmVyaWM" +
+					"uR2VuZXJpY0RhdGEkQXJyYXkBAAAAtgBVb3JnLmFwYWNoZS5mbGluay5hcGkuamF2YS50eXBldXRpbHMucn" +
+					"VudGltZS5rcnlvLlNlcmlhbGl6ZXJzJER1bW15QXZyb1JlZ2lzdGVyZWRDbGFzcwAAAAEAWW9yZy5hcGFja" +
+					"GUuZmxpbmsuYXBpLmphdmEudHlwZXV0aWxzLnJ1bnRpbWUua3J5by5TZXJpYWxpemVycyREdW1teUF2cm9L" +
+					"cnlvU2VyaWFsaXplckNsYXNzAAAE8saaPXAAAAAAAAAE8saaPXAAAAAA)",
+			"ANY(org.apache.flink.table.types.LogicalTypesTest$Human, ...)",
+			new Class[]{Human.class, User.class}, // every User is Human
+			new Class[]{Human.class},
+			new LogicalType[]{},
+			new AnyType<>(User.class, new KryoSerializer<>(User.class, new ExecutionConfig()))
+		);
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	private static void testAll(