You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/05/03 12:40:44 UTC
[flink] branch master updated: [FLINK-12253][table-common]
Introduce a logical type skeleton
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 1d11607 [FLINK-12253][table-common] Introduce a logical type skeleton
1d11607 is described below
commit 1d11607c732b5cc6cdf40b76e28e9812bec0fab6
Author: Timo Walther <tw...@apache.org>
AuthorDate: Tue Apr 23 13:15:34 2019 +0200
[FLINK-12253][table-common] Introduce a logical type skeleton
---
.../flink/table/types/logical/LogicalType.java | 212 +++++++++++++++++++++
.../table/types/logical/LogicalTypeFamily.java | 60 ++++++
.../flink/table/types/logical/LogicalTypeRoot.java | 167 ++++++++++++++++
.../table/types/logical/LogicalTypeVisitor.java | 33 ++++
.../apache/flink/table/types/LogicalTypesTest.java | 125 ++++++++++++
5 files changed, 597 insertions(+)
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java
new file mode 100644
index 0000000..6ea827c
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * A logical type that describes the data type of a value. It does not imply a concrete physical
+ * representation for transmission or storage but defines the boundaries between JVM-based languages
+ * and the table ecosystem.
+ *
+ * <p>The definition of a logical type is similar to the SQL standard's "data type" terminology but
+ * also contains information about the nullability of a value for efficient handling of scalar
+ * expressions.
+ *
+ * <p>Subclasses of this class define characteristics of built-in or user-defined types.
+ *
+ * <p>Instances of this class describe the fully parameterized, immutable type with additional
+ * information such as numeric precision or expected length.
+ *
+ * <p>NOTE: A logical type is just a description of a type, a planner or runtime might not support
+ * every type in every logical precision yet!
+ */
+@PublicEvolving
+public abstract class LogicalType implements Serializable {
+
+ private final boolean isNullable;
+
+ private final LogicalTypeRoot typeRoot;
+
+ public LogicalType(boolean isNullable, LogicalTypeRoot typeRoot) {
+ this.isNullable = isNullable;
+ this.typeRoot = Preconditions.checkNotNull(typeRoot);
+ }
+
+ /**
+ * Returns whether a value of this type can be {@code null}.
+ */
+ public boolean isNullable() {
+ return isNullable;
+ }
+
+ /**
+ * Returns the root of this type. It is an essential description without additional parameters.
+ */
+ public LogicalTypeRoot getTypeRoot() {
+ return typeRoot;
+ }
+
+ /**
+ * Returns a deep copy of this type with possibly different nullability.
+ *
+ * @param isNullable the intended nullability of the copied type
+ * @return a deep copy
+ */
+ public abstract LogicalType copy(boolean isNullable);
+
+ /**
+ * Returns a deep copy of this type. It requires an implementation of {@link #copy(boolean)}.
+ *
+ * @return a deep copy
+ */
+ public final LogicalType copy() {
+ return copy(isNullable);
+ }
+
+ /**
+ * Returns a string that fully serializes this instance. The serialized string can be used for
+ * transmitting or persisting a type.
+ *
+ * @return detailed string for transmission or persistence
+ */
+ public abstract String asSerializableString();
+
+ /**
+ * Returns a string that summarizes this type for printing to a console. An implementation might
+ * shorten long names or skips very specific properties.
+ *
+ * <p>Use {@link #asSerializableString()} for a type string that fully serializes
+ * this instance.
+ *
+ * @return summary string of this type for debugging purposes
+ */
+ public String asSummaryString() {
+ return asSerializableString();
+ }
+
+ /**
+ * Returns whether an instance of the given class can be represented as a value of this logical
+ * type when entering the table ecosystem. This method helps for the interoperability between
+ * JVM-based languages and the relational type system.
+ *
+ * <p>A supported conversion directly maps an input class to a logical type without loss of
+ * precision or type widening.
+ *
+ * <p>For example, {@code java.lang.Long} or {@code long} can be used as input for {@code BIGINT}
+ * independent of the set nullability.
+ *
+ * @param clazz input class to be converted into this logical type
+ * @return flag that indicates if instances of this class can be used as input into the table
+ * ecosystem
+ * @see #getDefaultConversion()
+ */
+ public abstract boolean supportsInputConversion(Class<?> clazz);
+
+ /**
+ * Returns whether a value of this logical type can be represented as an instance of the given
+ * class when leaving the table ecosystem. This method helps for the interoperability between
+ * JVM-based languages and the relational type system.
+ *
+ * <p>A supported conversion directly maps a logical type to an output class without loss of
+ * precision or type widening.
+ *
+ * <p>For example, {@code java.lang.Long} or {@code long} can be used as output for {@code BIGINT}
+ * if the type is not nullable. If the type is nullable, only {@code java.lang.Long} can represent
+ * this.
+ *
+ * @param clazz output class to be converted from this logical type
+ * @return flag that indicates if instances of this class can be used as output from the table
+ * ecosystem
+ * @see #getDefaultConversion()
+ */
+ public abstract boolean supportsOutputConversion(Class<?> clazz);
+
+ /**
+ * Returns the default conversion class. A value of this logical type is expected to be an instance
+ * of the given class when entering or is represented as an instance of the given class when
+ * leaving the table ecosystem if no other conversion has been specified.
+ *
+ * <p>For example, {@code java.lang.Long} is the default input and output for {@code BIGINT}.
+ *
+ * @return default class to represent values of this logical type
+ * @see #supportsInputConversion(Class)
+ * @see #supportsOutputConversion(Class)
+ */
+ public abstract Class<?> getDefaultConversion();
+
+ public abstract List<LogicalType> getChildren();
+
+ public abstract <R> R accept(LogicalTypeVisitor<R> visitor);
+
+ @Override
+ public String toString() {
+ return asSummaryString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ LogicalType that = (LogicalType) o;
+ return isNullable == that.isNullable && typeRoot == that.typeRoot;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(isNullable, typeRoot);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ protected String withNullability(String format, Object... params) {
+ if (!isNullable) {
+ return String.format(format + " NOT NULL", params);
+ }
+ return String.format(format, params);
+ }
+
+ protected static Set<String> conversionSet(String... elements) {
+ return new HashSet<>(Arrays.asList(elements));
+ }
+
+ protected static String escapeBackticks(String s) {
+ return s.replace("`", "``");
+ }
+
+ protected static String escapeSingleQuotes(String s) {
+ return s.replace("'", "''");
+ }
+
+ protected static String escapeIdentifier(String s) {
+ return "`" + escapeBackticks(s) + "`";
+ }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeFamily.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeFamily.java
new file mode 100644
index 0000000..0e817b4
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeFamily.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * An enumeration of logical type families for clustering {@link LogicalTypeRoot}s into categories.
+ *
+ * <p>The enumeration is very close to the SQL standard in terms of naming and completeness. However,
+ * it reflects just a subset of the evolving standard and contains some extensions (indicated
+ * by {@code EXTENSION}).
+ */
+@PublicEvolving
+public enum LogicalTypeFamily {
+
+ PREDEFINED,
+
+ CONSTRUCTED,
+
+ USER_DEFINED,
+
+ CHARACTER_STRING,
+
+ BINARY_STRING,
+
+ NUMERIC,
+
+ EXACT_NUMERIC,
+
+ APPROXIMATE_NUMERIC,
+
+ DATETIME,
+
+ TIME,
+
+ TIMESTAMP,
+
+ INTERVAL,
+
+ COLLECTION,
+
+ EXTENSION
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeRoot.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeRoot.java
new file mode 100644
index 0000000..d17443b
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeRoot.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Set;
+
+/**
+ * An enumeration of logical type roots containing static information about logical data types.
+ *
+ * <p>A root is an essential description of a {@link LogicalType} without additional parameters. For
+ * example, a parameterized logical type {@code DECIMAL(12,3)} possesses all characteristics of its
+ * root {@code DECIMAL}. Additionally, a logical type root enables efficient comparision during the
+ * evaluation of types.
+ *
+ * <p>The enumeration is very close to the SQL standard in terms of naming and completeness. However,
+ * it reflects just a subset of the evolving standard and contains some extensions (such as {@code NULL}
+ * or {@code ANY}).
+ *
+ * <p>See the type-implementing classes for a more detailed description of each type.
+ */
+@PublicEvolving
+public enum LogicalTypeRoot {
+
+ CHAR(
+ LogicalTypeFamily.PREDEFINED,
+ LogicalTypeFamily.CHARACTER_STRING),
+
+ VARCHAR(
+ LogicalTypeFamily.PREDEFINED,
+ LogicalTypeFamily.CHARACTER_STRING),
+
+ BOOLEAN(
+ LogicalTypeFamily.PREDEFINED),
+
+ BINARY(
+ LogicalTypeFamily.PREDEFINED,
+ LogicalTypeFamily.BINARY_STRING),
+
+ VARBINARY(
+ LogicalTypeFamily.PREDEFINED,
+ LogicalTypeFamily.BINARY_STRING),
+
+ DECIMAL(
+ LogicalTypeFamily.PREDEFINED,
+ LogicalTypeFamily.NUMERIC,
+ LogicalTypeFamily.EXACT_NUMERIC),
+
+ TINYINT(
+ LogicalTypeFamily.PREDEFINED,
+ LogicalTypeFamily.NUMERIC,
+ LogicalTypeFamily.EXACT_NUMERIC),
+
+ SMALLINT(
+ LogicalTypeFamily.PREDEFINED,
+ LogicalTypeFamily.NUMERIC,
+ LogicalTypeFamily.EXACT_NUMERIC),
+
+ INTEGER(
+ LogicalTypeFamily.PREDEFINED,
+ LogicalTypeFamily.NUMERIC,
+ LogicalTypeFamily.EXACT_NUMERIC),
+
+ BIGINT(
+ LogicalTypeFamily.PREDEFINED,
+ LogicalTypeFamily.NUMERIC,
+ LogicalTypeFamily.EXACT_NUMERIC),
+
+ FLOAT(
+ LogicalTypeFamily.PREDEFINED,
+ LogicalTypeFamily.NUMERIC,
+ LogicalTypeFamily.APPROXIMATE_NUMERIC),
+
+ DOUBLE(
+ LogicalTypeFamily.PREDEFINED,
+ LogicalTypeFamily.NUMERIC,
+ LogicalTypeFamily.APPROXIMATE_NUMERIC),
+
+ DATE(
+ LogicalTypeFamily.PREDEFINED,
+ LogicalTypeFamily.DATETIME),
+
+ TIME_WITHOUT_TIME_ZONE(
+ LogicalTypeFamily.PREDEFINED,
+ LogicalTypeFamily.DATETIME),
+
+ TIMESTAMP_WITHOUT_TIME_ZONE(
+ LogicalTypeFamily.PREDEFINED,
+ LogicalTypeFamily.DATETIME,
+ LogicalTypeFamily.TIMESTAMP),
+
+ TIMESTAMP_WITH_TIME_ZONE(
+ LogicalTypeFamily.PREDEFINED,
+ LogicalTypeFamily.DATETIME,
+ LogicalTypeFamily.TIMESTAMP),
+
+ TIMESTAMP_WITH_LOCAL_TIME_ZONE(
+ LogicalTypeFamily.PREDEFINED,
+ LogicalTypeFamily.DATETIME,
+ LogicalTypeFamily.TIMESTAMP,
+ LogicalTypeFamily.EXTENSION),
+
+ INTERVAL_YEAR_MONTH(
+ LogicalTypeFamily.PREDEFINED,
+ LogicalTypeFamily.INTERVAL),
+
+ INTERVAL_DAY_TIME(
+ LogicalTypeFamily.PREDEFINED,
+ LogicalTypeFamily.INTERVAL),
+
+ ARRAY(
+ LogicalTypeFamily.CONSTRUCTED,
+ LogicalTypeFamily.COLLECTION),
+
+ MULTISET(
+ LogicalTypeFamily.CONSTRUCTED,
+ LogicalTypeFamily.COLLECTION),
+
+ MAP(
+ LogicalTypeFamily.CONSTRUCTED,
+ LogicalTypeFamily.COLLECTION,
+ LogicalTypeFamily.EXTENSION),
+
+ ROW(
+ LogicalTypeFamily.CONSTRUCTED),
+
+ DISTINCT_TYPE(
+ LogicalTypeFamily.USER_DEFINED),
+
+ STRUCTURED_TYPE(
+ LogicalTypeFamily.USER_DEFINED),
+
+ NULL(
+ LogicalTypeFamily.EXTENSION),
+
+ ANY(
+ LogicalTypeFamily.EXTENSION);
+
+ private final Set<LogicalTypeFamily> families;
+
+ LogicalTypeRoot(LogicalTypeFamily firstFamily, LogicalTypeFamily... otherFamilies) {
+ this.families = Collections.unmodifiableSet(EnumSet.of(firstFamily, otherFamilies));
+ }
+
+ public Set<LogicalTypeFamily> getFamilies() {
+ return families;
+ }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java
new file mode 100644
index 0000000..272ea91
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * The visitor definition of {@link LogicalType}. The visitor transforms a logical type into
+ * instances of {@code R}.
+ *
+ * @param <R> result type
+ */
+@PublicEvolving
+public interface LogicalTypeVisitor<R> {
+
+ R visit(LogicalType other);
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
new file mode 100644
index 0000000..83a515f
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.junit.Assert;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for subclasses of {@link org.apache.flink.table.types.logical.LogicalType}.
+ */
+public class LogicalTypesTest {
+
+ // --------------------------------------------------------------------------------------------
+
+ private static void testAll(
+ LogicalType nullableType,
+ String serializableString,
+ String summaryString,
+ Class[] supportedInputClasses,
+ Class[] supportedOutputClasses,
+ LogicalType[] children,
+ LogicalType otherType) {
+
+ testEquality(nullableType, otherType);
+
+ testNullability(nullableType);
+
+ testJavaSerializability(nullableType);
+
+ testStringSerializability(nullableType, serializableString);
+
+ testStringSummary(nullableType, summaryString);
+
+ testConversions(nullableType, supportedInputClasses, supportedOutputClasses);
+
+ testChildren(nullableType, children);
+ }
+
+ private static void testEquality(LogicalType nullableType, LogicalType otherType) {
+ assertTrue(nullableType.isNullable());
+
+ assertEquals(nullableType, nullableType);
+ assertEquals(nullableType.hashCode(), nullableType.hashCode());
+
+ assertEquals(nullableType, nullableType.copy());
+
+ assertNotEquals(nullableType, otherType);
+ assertNotEquals(nullableType.hashCode(), otherType.hashCode());
+ }
+
+ private static void testNullability(LogicalType nullableType) {
+ final LogicalType notNullInstance = nullableType.copy(false);
+
+ assertNotEquals(nullableType, notNullInstance);
+
+ assertFalse(notNullInstance.isNullable());
+ }
+
+ private static void testJavaSerializability(LogicalType serializableType) {
+ try {
+ final LogicalType deserializedInstance = InstantiationUtil.deserializeObject(
+ InstantiationUtil.serializeObject(serializableType),
+ LogicalTypesTest.class.getClassLoader());
+
+ assertEquals(serializableType, deserializedInstance);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static void testStringSerializability(LogicalType serializableType, String serializableString) {
+ Assert.assertEquals(serializableString, serializableType.asSerializableString());
+ }
+
+ private static void testStringSummary(LogicalType type, String summaryString) {
+ Assert.assertEquals(summaryString, type.asSummaryString());
+ }
+
+ private static void testConversions(LogicalType type, Class[] inputs, Class[] outputs) {
+ for (Class<?> clazz : inputs) {
+ assertTrue(type.supportsInputConversion(clazz));
+ }
+
+ for (Class<?> clazz : outputs) {
+ assertTrue(type.supportsOutputConversion(clazz));
+ }
+
+ assertTrue(type.supportsInputConversion(type.getDefaultConversion()));
+
+ assertTrue(type.supportsOutputConversion(type.getDefaultConversion()));
+
+ assertFalse(type.supportsOutputConversion(LogicalTypesTest.class));
+
+ assertFalse(type.supportsInputConversion(LogicalTypesTest.class));
+ }
+
+ private static void testChildren(LogicalType type, LogicalType[] children) {
+ assertEquals(Arrays.asList(children), type.getChildren());
+ }
+}