You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/05/06 09:15:34 UTC

[flink] branch master updated: [FLINK-12253][table-common] Add a ROW type

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d22018  [FLINK-12253][table-common] Add a ROW type
1d22018 is described below

commit 1d22018dc4886a2842fffd9fac1224f862085239
Author: Timo Walther <tw...@apache.org>
AuthorDate: Thu May 2 12:07:44 2019 +0200

    [FLINK-12253][table-common] Add a ROW type
---
 .../table/types/logical/LogicalTypeVisitor.java    |   2 +
 .../apache/flink/table/types/logical/RowType.java  | 234 +++++++++++++++++++++
 .../apache/flink/table/types/LogicalTypesTest.java |  21 ++
 3 files changed, 257 insertions(+)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java
index 79dc207..2e1d3de 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java
@@ -73,5 +73,7 @@ public interface LogicalTypeVisitor<R> {
 
 	R visit(MapType mapType);
 
+	R visit(RowType rowType);
+
 	R visit(LogicalType other);
 }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/RowType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/RowType.java
new file mode 100644
index 0000000..24630f0
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/RowType.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Logical type of a sequence of fields. A field consists of a field name, field type, and an optional
+ * description. The most specific type of a row of a table is a row type. In this case, each column
+ * of the row corresponds to the field of the row type that has the same ordinal position as the
+ * column. Compared to the SQL standard, an optional field description simplifies the handling with
+ * complex structures.
+ *
+ * <p>The serialized string representation is {@code ROW<n0 t0 'd0', n1 t1 'd1', ...>} where
+ * {@code n} is the name of a field, {@code t} is the logical type of a field, {@code d} is the description
+ * of a field.
+ */
+@PublicEvolving
+public final class RowType extends LogicalType {
+
+	private static final String FORMAT = "ROW<%s>";
+
+	private static final Set<String> INPUT_OUTPUT_CONVERSION = conversionSet(
+		Row.class.getName(),
+		"org.apache.flink.table.dataformat.BaseRow");
+
+	private static final Class<?> DEFAULT_CONVERSION = Row.class;
+
+	/**
+	 * Describes a field of a {@link RowType}.
+	 */
+	public static final class RowField implements Serializable {
+
+		private static final String FIELD_FORMAT_WITH_DESCRIPTION = "%s %s '%s'";
+
+		private static final String FIELD_FORMAT_NO_DESCRIPTION = "%s %s";
+
+		private final String name;
+
+		private final LogicalType type;
+
+		private final @Nullable String description;
+
+		public RowField(String name, LogicalType type, @Nullable String description) {
+			this.name = Preconditions.checkNotNull(name, "Field name must not be null.");
+			this.type = Preconditions.checkNotNull(type, "Field type must not be null.");
+			this.description = description;
+		}
+
+		public RowField(String name, LogicalType type) {
+			this(name, type, null);
+		}
+
+		public String getName() {
+			return name;
+		}
+
+		public LogicalType getType() {
+			return type;
+		}
+
+		public Optional<String> getDescription() {
+			return Optional.ofNullable(description);
+		}
+
+		public RowField copy() {
+			return new RowField(name, type.copy(), description);
+		}
+
+		public String asSummaryString() {
+			return formatString(type.asSummaryString(), true);
+		}
+
+		public String asSerializableString() {
+			return formatString(type.asSerializableString(), false);
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			RowField rowField = (RowField) o;
+			return name.equals(rowField.name) &&
+				type.equals(rowField.type) &&
+				Objects.equals(description, rowField.description);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(name, type, description);
+		}
+
+		private String formatString(String typeString, boolean excludeDescription) {
+			if (description == null) {
+				return String.format(FIELD_FORMAT_NO_DESCRIPTION,
+					escapeIdentifier(name),
+					typeString);
+			} else if (excludeDescription) {
+				return String.format(FIELD_FORMAT_WITH_DESCRIPTION,
+					escapeIdentifier(name),
+					typeString,
+					"...");
+			} else {
+				return String.format(FIELD_FORMAT_WITH_DESCRIPTION,
+					escapeIdentifier(name),
+					typeString,
+					escapeSingleQuotes(description));
+			}
+		}
+	}
+
+	private final List<RowField> fields;
+
+	public RowType(boolean isNullable, List<RowField> fields) {
+		super(isNullable, LogicalTypeRoot.ROW);
+		this.fields = Collections.unmodifiableList(
+			new ArrayList<>(
+				Preconditions.checkNotNull(fields, "Fields must not be null.")));
+	}
+
+	public RowType(List<RowField> fields) {
+		this(true, fields);
+	}
+
+	public List<RowField> getFields() {
+		return fields;
+	}
+
+	@Override
+	public LogicalType copy(boolean isNullable) {
+		return new RowType(
+			isNullable,
+			fields.stream().map(RowField::copy).collect(Collectors.toList()));
+	}
+
+	@Override
+	public String asSummaryString() {
+		return withNullability(
+			FORMAT,
+			fields.stream()
+				.map(RowField::asSummaryString)
+				.collect(Collectors.joining(", ")));
+	}
+
+	@Override
+	public String asSerializableString() {
+		return withNullability(
+			FORMAT,
+			fields.stream()
+				.map(RowField::asSerializableString)
+				.collect(Collectors.joining(", ")));
+	}
+
+	@Override
+	public boolean supportsInputConversion(Class<?> clazz) {
+		return INPUT_OUTPUT_CONVERSION.contains(clazz.getName());
+	}
+
+	@Override
+	public boolean supportsOutputConversion(Class<?> clazz) {
+		return INPUT_OUTPUT_CONVERSION.contains(clazz.getName());
+	}
+
+	@Override
+	public Class<?> getDefaultConversion() {
+		return DEFAULT_CONVERSION;
+	}
+
+	@Override
+	public List<LogicalType> getChildren() {
+		return Collections.unmodifiableList(
+			fields.stream()
+				.map(RowField::getType)
+				.collect(Collectors.toList()));
+	}
+
+	@Override
+	public <R> R accept(LogicalTypeVisitor<R> visitor) {
+		return visitor.visit(this);
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		if (!super.equals(o)) {
+			return false;
+		}
+		RowType rowType = (RowType) o;
+		return fields.equals(rowType.fields);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(super.hashCode(), fields);
+	}
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
index 2fe082d..fc339af 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.SmallIntType;
 import org.apache.flink.table.types.logical.TimeType;
 import org.apache.flink.table.types.logical.TimestampType;
@@ -41,6 +42,7 @@ import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.table.types.logical.YearMonthIntervalType;
 import org.apache.flink.table.types.logical.ZonedTimestampType;
+import org.apache.flink.types.Row;
 import org.apache.flink.util.InstantiationUtil;
 
 import org.junit.Assert;
@@ -370,6 +372,25 @@ public class LogicalTypesTest {
 		);
 	}
 
+	@Test
+	public void testRowType() {
+		testAll(
+			new RowType(
+				Arrays.asList(
+					new RowType.RowField("a", new VarCharType(), "Someone's desc."),
+					new RowType.RowField("b`", new TimestampType()))),
+			"ROW<`a` VARCHAR(1) 'Someone''s desc.', `b``` TIMESTAMP(6)>",
+			"ROW<`a` VARCHAR(1) '...', `b``` TIMESTAMP(6)>",
+			new Class[]{Row.class},
+			new Class[]{Row.class},
+			new LogicalType[]{new VarCharType(), new TimestampType()},
+			new RowType(
+				Arrays.asList(
+					new RowType.RowField("a", new VarCharType(), "Different desc."),
+					new RowType.RowField("b`", new TimestampType())))
+		);
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	private static void testAll(