You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/05/03 12:40:44 UTC

[flink] branch master updated: [FLINK-12253][table-common] Introduce a logical type skeleton

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d11607  [FLINK-12253][table-common] Introduce a logical type skeleton
1d11607 is described below

commit 1d11607c732b5cc6cdf40b76e28e9812bec0fab6
Author: Timo Walther <tw...@apache.org>
AuthorDate: Tue Apr 23 13:15:34 2019 +0200

    [FLINK-12253][table-common] Introduce a logical type skeleton
---
 .../flink/table/types/logical/LogicalType.java     | 212 +++++++++++++++++++++
 .../table/types/logical/LogicalTypeFamily.java     |  60 ++++++
 .../flink/table/types/logical/LogicalTypeRoot.java | 167 ++++++++++++++++
 .../table/types/logical/LogicalTypeVisitor.java    |  33 ++++
 .../apache/flink/table/types/LogicalTypesTest.java | 125 ++++++++++++
 5 files changed, 597 insertions(+)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java
new file mode 100644
index 0000000..6ea827c
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * A logical type that describes the data type of a value. It does not imply a concrete physical
+ * representation for transmission or storage but defines the boundaries between JVM-based languages
+ * and the table ecosystem.
+ *
+ * <p>The definition of a logical type is similar to the SQL standard's "data type" terminology but
+ * also contains information about the nullability of a value for efficient handling of scalar
+ * expressions.
+ *
+ * <p>Subclasses of this class define characteristics of built-in or user-defined types.
+ *
+ * <p>Instances of this class describe the fully parameterized, immutable type with additional
+ * information such as numeric precision or expected length.
+ *
+ * <p>NOTE: A logical type is just a description of a type, a planner or runtime might not support
+ * every type in every logical precision yet!
+ */
+@PublicEvolving
+public abstract class LogicalType implements Serializable {
+
+	private final boolean isNullable;
+
+	private final LogicalTypeRoot typeRoot;
+
+	public LogicalType(boolean isNullable, LogicalTypeRoot typeRoot) {
+		this.isNullable = isNullable;
+		this.typeRoot = Preconditions.checkNotNull(typeRoot);
+	}
+
+	/**
+	 * Returns whether a value of this type can be {@code null}.
+	 */
+	public boolean isNullable() {
+		return isNullable;
+	}
+
+	/**
+	 * Returns the root of this type. It is an essential description without additional parameters.
+	 */
+	public LogicalTypeRoot getTypeRoot() {
+		return typeRoot;
+	}
+
+	/**
+	 * Returns a deep copy of this type with possibly different nullability.
+	 *
+	 * @param isNullable the intended nullability of the copied type
+	 * @return a deep copy
+	 */
+	public abstract LogicalType copy(boolean isNullable);
+
+	/**
+	 * Returns a deep copy of this type. It requires an implementation of {@link #copy(boolean)}.
+	 *
+	 * @return a deep copy
+	 */
+	public final LogicalType copy() {
+		return copy(isNullable);
+	}
+
+	/**
+	 * Returns a string that fully serializes this instance. The serialized string can be used for
+	 * transmitting or persisting a type.
+	 *
+	 * @return detailed string for transmission or persistence
+	 */
+	public abstract String asSerializableString();
+
+	/**
+	 * Returns a string that summarizes this type for printing to a console. An implementation might
+	 * shorten long names or skips very specific properties.
+	 *
+	 * <p>Use {@link #asSerializableString()} for a type string that fully serializes
+	 * this instance.
+	 *
+	 * @return summary string of this type for debugging purposes
+	 */
+	public String asSummaryString() {
+		return asSerializableString();
+	}
+
+	/**
+	 * Returns whether an instance of the given class can be represented as a value of this logical
+	 * type when entering the table ecosystem. This method helps for the interoperability between
+	 * JVM-based languages and the relational type system.
+	 *
+	 * <p>A supported conversion directly maps an input class to a logical type without loss of
+	 * precision or type widening.
+	 *
+	 * <p>For example, {@code java.lang.Long} or {@code long} can be used as input for {@code BIGINT}
+	 * independent of the set nullability.
+	 *
+	 * @param clazz input class to be converted into this logical type
+	 * @return flag that indicates if instances of this class can be used as input into the table
+	 * ecosystem
+	 * @see #getDefaultConversion()
+	 */
+	public abstract boolean supportsInputConversion(Class<?> clazz);
+
+	/**
+	 * Returns whether a value of this logical type can be represented as an instance of the given
+	 * class when leaving the table ecosystem. This method helps for the interoperability between
+	 * JVM-based languages and the relational type system.
+	 *
+	 * <p>A supported conversion directly maps a logical type to an output class without loss of
+	 * precision or type widening.
+	 *
+	 * <p>For example, {@code java.lang.Long} or {@code long} can be used as output for {@code BIGINT}
+	 * if the type is not nullable. If the type is nullable, only {@code java.lang.Long} can represent
+	 * this.
+	 *
+	 * @param clazz output class to be converted from this logical type
+	 * @return flag that indicates if instances of this class can be used as output from the table
+	 * ecosystem
+	 * @see #getDefaultConversion()
+	 */
+	public abstract boolean supportsOutputConversion(Class<?> clazz);
+
+	/**
+	 * Returns the default conversion class. A value of this logical type is expected to be an instance
+	 * of the given class when entering or is represented as an instance of the given class when
+	 * leaving the table ecosystem if no other conversion has been specified.
+	 *
+	 * <p>For example, {@code java.lang.Long} is the default input and output for {@code BIGINT}.
+	 *
+	 * @return default class to represent values of this logical type
+	 * @see #supportsInputConversion(Class)
+	 * @see #supportsOutputConversion(Class)
+	 */
+	public abstract Class<?> getDefaultConversion();
+
+	public abstract List<LogicalType> getChildren();
+
+	public abstract <R> R accept(LogicalTypeVisitor<R> visitor);
+
+	@Override
+	public String toString() {
+		return asSummaryString();
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		LogicalType that = (LogicalType) o;
+		return isNullable == that.isNullable && typeRoot == that.typeRoot;
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(isNullable, typeRoot);
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	protected String withNullability(String format, Object... params) {
+		if (!isNullable) {
+			return String.format(format + " NOT NULL", params);
+		}
+		return String.format(format, params);
+	}
+
+	protected static Set<String> conversionSet(String... elements) {
+		return new HashSet<>(Arrays.asList(elements));
+	}
+
+	protected static String escapeBackticks(String s) {
+		return s.replace("`", "``");
+	}
+
+	protected static String escapeSingleQuotes(String s) {
+		return s.replace("'", "''");
+	}
+
+	protected static String escapeIdentifier(String s) {
+		return "`" + escapeBackticks(s) + "`";
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeFamily.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeFamily.java
new file mode 100644
index 0000000..0e817b4
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeFamily.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * An enumeration of logical type families for clustering {@link LogicalTypeRoot}s into categories.
+ *
+ * <p>The enumeration is very close to the SQL standard in terms of naming and completeness. However,
+ * it reflects just a subset of the evolving standard and contains some extensions (indicated
+ * by {@code EXTENSION}).
+ */
+@PublicEvolving
+public enum LogicalTypeFamily {
+
+	PREDEFINED,
+
+	CONSTRUCTED,
+
+	USER_DEFINED,
+
+	CHARACTER_STRING,
+
+	BINARY_STRING,
+
+	NUMERIC,
+
+	EXACT_NUMERIC,
+
+	APPROXIMATE_NUMERIC,
+
+	DATETIME,
+
+	TIME,
+
+	TIMESTAMP,
+
+	INTERVAL,
+
+	COLLECTION,
+
+	EXTENSION
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeRoot.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeRoot.java
new file mode 100644
index 0000000..d17443b
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeRoot.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Set;
+
+/**
+ * An enumeration of logical type roots containing static information about logical data types.
+ *
+ * <p>A root is an essential description of a {@link LogicalType} without additional parameters. For
+ * example, a parameterized logical type {@code DECIMAL(12,3)} possesses all characteristics of its
+ * root {@code DECIMAL}. Additionally, a logical type root enables efficient comparision during the
+ * evaluation of types.
+ *
+ * <p>The enumeration is very close to the SQL standard in terms of naming and completeness. However,
+ * it reflects just a subset of the evolving standard and contains some extensions (such as {@code NULL}
+ * or {@code ANY}).
+ *
+ * <p>See the type-implementing classes for a more detailed description of each type.
+ */
+@PublicEvolving
+public enum LogicalTypeRoot {
+
+	CHAR(
+		LogicalTypeFamily.PREDEFINED,
+		LogicalTypeFamily.CHARACTER_STRING),
+
+	VARCHAR(
+		LogicalTypeFamily.PREDEFINED,
+		LogicalTypeFamily.CHARACTER_STRING),
+
+	BOOLEAN(
+		LogicalTypeFamily.PREDEFINED),
+
+	BINARY(
+		LogicalTypeFamily.PREDEFINED,
+		LogicalTypeFamily.BINARY_STRING),
+
+	VARBINARY(
+		LogicalTypeFamily.PREDEFINED,
+		LogicalTypeFamily.BINARY_STRING),
+
+	DECIMAL(
+		LogicalTypeFamily.PREDEFINED,
+		LogicalTypeFamily.NUMERIC,
+		LogicalTypeFamily.EXACT_NUMERIC),
+
+	TINYINT(
+		LogicalTypeFamily.PREDEFINED,
+		LogicalTypeFamily.NUMERIC,
+		LogicalTypeFamily.EXACT_NUMERIC),
+
+	SMALLINT(
+		LogicalTypeFamily.PREDEFINED,
+		LogicalTypeFamily.NUMERIC,
+		LogicalTypeFamily.EXACT_NUMERIC),
+
+	INTEGER(
+		LogicalTypeFamily.PREDEFINED,
+		LogicalTypeFamily.NUMERIC,
+		LogicalTypeFamily.EXACT_NUMERIC),
+
+	BIGINT(
+		LogicalTypeFamily.PREDEFINED,
+		LogicalTypeFamily.NUMERIC,
+		LogicalTypeFamily.EXACT_NUMERIC),
+
+	FLOAT(
+		LogicalTypeFamily.PREDEFINED,
+		LogicalTypeFamily.NUMERIC,
+		LogicalTypeFamily.APPROXIMATE_NUMERIC),
+
+	DOUBLE(
+		LogicalTypeFamily.PREDEFINED,
+		LogicalTypeFamily.NUMERIC,
+		LogicalTypeFamily.APPROXIMATE_NUMERIC),
+
+	DATE(
+		LogicalTypeFamily.PREDEFINED,
+		LogicalTypeFamily.DATETIME),
+
+	TIME_WITHOUT_TIME_ZONE(
+		LogicalTypeFamily.PREDEFINED,
+		LogicalTypeFamily.DATETIME),
+
+	TIMESTAMP_WITHOUT_TIME_ZONE(
+		LogicalTypeFamily.PREDEFINED,
+		LogicalTypeFamily.DATETIME,
+		LogicalTypeFamily.TIMESTAMP),
+
+	TIMESTAMP_WITH_TIME_ZONE(
+		LogicalTypeFamily.PREDEFINED,
+		LogicalTypeFamily.DATETIME,
+		LogicalTypeFamily.TIMESTAMP),
+
+	TIMESTAMP_WITH_LOCAL_TIME_ZONE(
+		LogicalTypeFamily.PREDEFINED,
+		LogicalTypeFamily.DATETIME,
+		LogicalTypeFamily.TIMESTAMP,
+		LogicalTypeFamily.EXTENSION),
+
+	INTERVAL_YEAR_MONTH(
+		LogicalTypeFamily.PREDEFINED,
+		LogicalTypeFamily.INTERVAL),
+
+	INTERVAL_DAY_TIME(
+		LogicalTypeFamily.PREDEFINED,
+		LogicalTypeFamily.INTERVAL),
+
+	ARRAY(
+		LogicalTypeFamily.CONSTRUCTED,
+		LogicalTypeFamily.COLLECTION),
+
+	MULTISET(
+		LogicalTypeFamily.CONSTRUCTED,
+		LogicalTypeFamily.COLLECTION),
+
+	MAP(
+		LogicalTypeFamily.CONSTRUCTED,
+		LogicalTypeFamily.COLLECTION,
+		LogicalTypeFamily.EXTENSION),
+
+	ROW(
+		LogicalTypeFamily.CONSTRUCTED),
+
+	DISTINCT_TYPE(
+		LogicalTypeFamily.USER_DEFINED),
+
+	STRUCTURED_TYPE(
+		LogicalTypeFamily.USER_DEFINED),
+
+	NULL(
+		LogicalTypeFamily.EXTENSION),
+
+	ANY(
+		LogicalTypeFamily.EXTENSION);
+
+	private final Set<LogicalTypeFamily> families;
+
+	LogicalTypeRoot(LogicalTypeFamily firstFamily, LogicalTypeFamily... otherFamilies) {
+		this.families = Collections.unmodifiableSet(EnumSet.of(firstFamily, otherFamilies));
+	}
+
+	public Set<LogicalTypeFamily> getFamilies() {
+		return families;
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java
new file mode 100644
index 0000000..272ea91
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * The visitor definition of {@link LogicalType}. The visitor transforms a logical type into
+ * instances of {@code R}.
+ *
+ * @param <R> result type
+ */
+@PublicEvolving
+public interface LogicalTypeVisitor<R> {
+
+	R visit(LogicalType other);
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
new file mode 100644
index 0000000..83a515f
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.junit.Assert;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for subclasses of {@link org.apache.flink.table.types.logical.LogicalType}.
+ */
+public class LogicalTypesTest {
+
+	// --------------------------------------------------------------------------------------------
+
+	private static void testAll(
+		LogicalType nullableType,
+		String serializableString,
+		String summaryString,
+		Class[] supportedInputClasses,
+		Class[] supportedOutputClasses,
+		LogicalType[] children,
+		LogicalType otherType) {
+
+		testEquality(nullableType, otherType);
+
+		testNullability(nullableType);
+
+		testJavaSerializability(nullableType);
+
+		testStringSerializability(nullableType, serializableString);
+
+		testStringSummary(nullableType, summaryString);
+
+		testConversions(nullableType, supportedInputClasses, supportedOutputClasses);
+
+		testChildren(nullableType, children);
+	}
+
+	private static void testEquality(LogicalType nullableType, LogicalType otherType) {
+		assertTrue(nullableType.isNullable());
+
+		assertEquals(nullableType, nullableType);
+		assertEquals(nullableType.hashCode(), nullableType.hashCode());
+
+		assertEquals(nullableType, nullableType.copy());
+
+		assertNotEquals(nullableType, otherType);
+		assertNotEquals(nullableType.hashCode(), otherType.hashCode());
+	}
+
+	private static void testNullability(LogicalType nullableType) {
+		final LogicalType notNullInstance = nullableType.copy(false);
+
+		assertNotEquals(nullableType, notNullInstance);
+
+		assertFalse(notNullInstance.isNullable());
+	}
+
+	private static void testJavaSerializability(LogicalType serializableType) {
+		try {
+			final LogicalType deserializedInstance = InstantiationUtil.deserializeObject(
+				InstantiationUtil.serializeObject(serializableType),
+				LogicalTypesTest.class.getClassLoader());
+
+			assertEquals(serializableType, deserializedInstance);
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	private static void testStringSerializability(LogicalType serializableType, String serializableString) {
+		Assert.assertEquals(serializableString, serializableType.asSerializableString());
+	}
+
+	private static void testStringSummary(LogicalType type, String summaryString) {
+		Assert.assertEquals(summaryString, type.asSummaryString());
+	}
+
+	private static void testConversions(LogicalType type, Class[] inputs, Class[] outputs) {
+		for (Class<?> clazz : inputs) {
+			assertTrue(type.supportsInputConversion(clazz));
+		}
+
+		for (Class<?> clazz : outputs) {
+			assertTrue(type.supportsOutputConversion(clazz));
+		}
+
+		assertTrue(type.supportsInputConversion(type.getDefaultConversion()));
+
+		assertTrue(type.supportsOutputConversion(type.getDefaultConversion()));
+
+		assertFalse(type.supportsOutputConversion(LogicalTypesTest.class));
+
+		assertFalse(type.supportsInputConversion(LogicalTypesTest.class));
+	}
+
+	private static void testChildren(LogicalType type, LogicalType[] children) {
+		assertEquals(Arrays.asList(children), type.getChildren());
+	}
+}