You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/05/23 14:19:21 UTC

[flink] 01/02: [FLINK-12254][table] Update TableSchema to new type system

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 98f3046fae76405b3fcf2b5ef82835a81624498e
Author: Timo Walther <tw...@apache.org>
AuthorDate: Tue May 21 16:45:27 2019 +0200

    [FLINK-12254][table] Update TableSchema to new type system
---
 .../flink/table/client/cli/CliResultView.java      |   2 +-
 .../apache/flink/table/client/cli/CliUtils.java    |   4 +-
 .../org/apache/flink/table/api/TableSchema.java    | 140 +++++++++++++++------
 .../apache/flink/table/api/TableSchemaTest.scala   |   8 +-
 .../api/validation/TableSchemaValidationTest.scala |   6 +-
 5 files changed, 113 insertions(+), 47 deletions(-)

diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliResultView.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliResultView.java
index 67ce5eb..407a04b 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliResultView.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliResultView.java
@@ -145,7 +145,7 @@ public abstract class CliResultView<O extends Enum<O>> extends CliView<O, Void>
 		final CliRowView view = new CliRowView(
 			client,
 			resultDescriptor.getResultSchema().getFieldNames(),
-			CliUtils.typesToString(resultDescriptor.getResultSchema().getFieldTypes()),
+			CliUtils.typesToString(resultDescriptor.getResultSchema().getFieldDataTypes()),
 			getRow(results.get(selectedRow)));
 		view.open(); // enter view
 	}
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliUtils.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliUtils.java
index 77894e8..471d168 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliUtils.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliUtils.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.table.client.cli;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.Row;
 
 import org.jline.utils.AttributedString;
@@ -103,7 +103,7 @@ public final class CliUtils {
 		return fields;
 	}
 
-	public static String[] typesToString(TypeInformation<?>[] types) {
+	public static String[] typesToString(DataType[] types) {
 		final String[] typesAsString = new String[types.length];
 		for (int i = 0; i < types.length; i++) {
 			typesAsString[i] = types[i].toString();
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
index 385f9a0..c6804f9 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
@@ -20,8 +20,9 @@ package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.TypeConversions;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
@@ -33,10 +34,17 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.Field;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo;
+import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
 
 /**
-  * A table schema that represents a table's structure with field names and types.
-  */
+ * A table schema that represents a table's structure with field names and data types.
+ */
 @PublicEvolving
 public class TableSchema {
 
@@ -44,20 +52,20 @@ public class TableSchema {
 
 	private final String[] fieldNames;
 
-	private final TypeInformation<?>[] fieldTypes;
+	private final DataType[] fieldDataTypes;
 
 	private final Map<String, Integer> fieldNameToIndex;
 
-	public TableSchema(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+	private TableSchema(String[] fieldNames, DataType[] fieldDataTypes) {
 		this.fieldNames = Preconditions.checkNotNull(fieldNames);
-		this.fieldTypes = Preconditions.checkNotNull(fieldTypes);
+		this.fieldDataTypes = Preconditions.checkNotNull(fieldDataTypes);
 
-		if (fieldNames.length != fieldTypes.length) {
+		if (fieldNames.length != fieldDataTypes.length) {
 			throw new TableException(
-				"Number of field names and field types must be equal.\n" +
-				"Number of names is " + fieldNames.length + ", number of types is " + fieldTypes.length + ".\n" +
-				"List of field names: " + Arrays.toString(fieldNames) + "\n" +
-				"List of field types: " + Arrays.toString(fieldTypes));
+				"Number of field names and field data types must be equal.\n" +
+					"Number of names is " + fieldNames.length + ", number of data types is " + fieldDataTypes.length + ".\n" +
+					"List of field names: " + Arrays.toString(fieldNames) + "\n" +
+					"List of field data types: " + Arrays.toString(fieldDataTypes));
 		}
 
 		// validate and create name to index mapping
@@ -66,7 +74,7 @@ public class TableSchema {
 		final Set<String> uniqueNames = new HashSet<>();
 		for (int i = 0; i < fieldNames.length; i++) {
 			// check for null
-			Preconditions.checkNotNull(fieldTypes[i]);
+			Preconditions.checkNotNull(fieldDataTypes[i]);
 			final String fieldName = Preconditions.checkNotNull(fieldNames[i]);
 
 			// collect indices
@@ -82,50 +90,84 @@ public class TableSchema {
 		if (!duplicateNames.isEmpty()) {
 			throw new TableException(
 				"Field names must be unique.\n" +
-				"List of duplicate fields: " + duplicateNames.toString() + "\n" +
-				"List of all fields: " + Arrays.toString(fieldNames));
+					"List of duplicate fields: " + duplicateNames.toString() + "\n" +
+					"List of all fields: " + Arrays.toString(fieldNames));
 		}
 	}
 
 	/**
+	 * @deprecated Use the {@link Builder} instead.
+	 */
+	@Deprecated
+	public TableSchema(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+		this(fieldNames, fromLegacyInfoToDataType(fieldTypes));
+	}
+
+	/**
 	 * Returns a deep copy of the table schema.
 	 */
 	public TableSchema copy() {
-		return new TableSchema(fieldNames.clone(), fieldTypes.clone());
+		return new TableSchema(fieldNames.clone(), fieldDataTypes.clone());
 	}
 
 	/**
-	 * Returns all field type information as an array.
+	 * Returns all field data types as an array.
 	 */
+	public DataType[] getFieldDataTypes() {
+		return fieldDataTypes;
+	}
+
+	/**
+	 * @deprecated Use {@link #getFieldDataTypes()} instead.
+	 */
+	@Deprecated
 	public TypeInformation<?>[] getFieldTypes() {
-		return fieldTypes;
+		return fromDataTypeToLegacyInfo(fieldDataTypes);
 	}
 
 	/**
-	 * Returns the specified type information for the given field index.
+	 * Returns the specified data type for the given field index.
 	 *
 	 * @param fieldIndex the index of the field
 	 */
-	public Optional<TypeInformation<?>> getFieldType(int fieldIndex) {
-		if (fieldIndex < 0 || fieldIndex >= fieldTypes.length) {
+	public Optional<DataType> getFieldDataType(int fieldIndex) {
+		if (fieldIndex < 0 || fieldIndex >= fieldDataTypes.length) {
 			return Optional.empty();
 		}
-		return Optional.of(fieldTypes[fieldIndex]);
+		return Optional.of(fieldDataTypes[fieldIndex]);
 	}
 
 	/**
-	 * Returns the specified type information for the given field name.
+	 * @deprecated Use {@link #getFieldDataType(int)}} instead.
+	 */
+	@Deprecated
+	public Optional<TypeInformation<?>> getFieldType(int fieldIndex) {
+		return getFieldDataType(fieldIndex)
+			.map(TypeConversions::fromDataTypeToLegacyInfo);
+	}
+
+	/**
+	 * Returns the specified data type for the given field name.
 	 *
 	 * @param fieldName the name of the field
 	 */
-	public Optional<TypeInformation<?>> getFieldType(String fieldName) {
+	public Optional<DataType> getFieldDataType(String fieldName) {
 		if (fieldNameToIndex.containsKey(fieldName)) {
-			return Optional.of(fieldTypes[fieldNameToIndex.get(fieldName)]);
+			return Optional.of(fieldDataTypes[fieldNameToIndex.get(fieldName)]);
 		}
 		return Optional.empty();
 	}
 
 	/**
+	 * @deprecated Use {@link #getFieldDataType(String)} instead.
+	 */
+	@Deprecated
+	public Optional<TypeInformation<?>> getFieldType(String fieldName) {
+		return getFieldDataType(fieldName)
+			.map(TypeConversions::fromDataTypeToLegacyInfo);
+	}
+
+	/**
 	 * Returns the number of fields.
 	 */
 	public int getFieldCount() {
@@ -152,10 +194,22 @@ public class TableSchema {
 	}
 
 	/**
-	 * Converts a table schema into a (nested) type information describing a {@link Row}.
+	 * Converts a table schema into a (nested) data type describing a {@link DataTypes#ROW(Field...)}.
+	 */
+	public DataType toRowDataType() {
+		final Field[] fields = IntStream.range(0, fieldDataTypes.length)
+			.mapToObj(i -> FIELD(fieldNames[i], fieldDataTypes[i]))
+			.toArray(Field[]::new);
+		return ROW(fields);
+	}
+
+	/**
+	 * @deprecated Use {@link #toRowDataType()} instead.
 	 */
+	@Deprecated
+	@SuppressWarnings("unchecked")
 	public TypeInformation<Row> toRowType() {
-		return Types.ROW_NAMED(fieldNames, fieldTypes);
+		return (TypeInformation<Row>) fromDataTypeToLegacyInfo(toRowDataType());
 	}
 
 	@Override
@@ -163,7 +217,7 @@ public class TableSchema {
 		final StringBuilder sb = new StringBuilder();
 		sb.append("root\n");
 		for (int i = 0; i < fieldNames.length; i++) {
-			sb.append(" |-- ").append(fieldNames[i]).append(": ").append(fieldTypes[i]).append('\n');
+			sb.append(" |-- ").append(fieldNames[i]).append(": ").append(fieldDataTypes[i]).append('\n');
 		}
 		return sb.toString();
 	}
@@ -178,13 +232,13 @@ public class TableSchema {
 		}
 		TableSchema schema = (TableSchema) o;
 		return Arrays.equals(fieldNames, schema.fieldNames) &&
-			Arrays.equals(fieldTypes, schema.fieldTypes);
+			Arrays.equals(fieldDataTypes, schema.fieldDataTypes);
 	}
 
 	@Override
 	public int hashCode() {
 		int result = Arrays.hashCode(fieldNames);
-		result = 31 * result + Arrays.hashCode(fieldTypes);
+		result = 31 * result + Arrays.hashCode(fieldDataTypes);
 		return result;
 	}
 
@@ -196,7 +250,10 @@ public class TableSchema {
 	 *
 	 * @param typeInfo The {@link TypeInformation} from which the table schema is generated.
 	 * @return The table schema that was generated from the given {@link TypeInformation}.
+	 *
+	 * @deprecated This method will be removed soon. Use {@link DataTypes} to declare types.
 	 */
+	@Deprecated
 	public static TableSchema fromTypeInfo(TypeInformation<?> typeInfo) {
 		if (typeInfo instanceof CompositeType<?>) {
 			final CompositeType<?> compositeType = (CompositeType<?>) typeInfo;
@@ -228,32 +285,41 @@ public class TableSchema {
 
 		private List<String> fieldNames;
 
-		private List<TypeInformation<?>> fieldTypes;
+		private List<DataType> fieldDataTypes;
 
 		public Builder() {
 			fieldNames = new ArrayList<>();
-			fieldTypes = new ArrayList<>();
+			fieldDataTypes = new ArrayList<>();
 		}
 
 		/**
-		 * Add a field with name and type. The call order of this method determines the order
-		 * of fields in the schema.
+		 * Add a field with name and data type.
+		 *
+		 * <p>The call order of this method determines the order of fields in the schema.
 		 */
-		public Builder field(String name, TypeInformation<?> type) {
+		public Builder field(String name, DataType dataType) {
 			Preconditions.checkNotNull(name);
-			Preconditions.checkNotNull(type);
+			Preconditions.checkNotNull(dataType);
 			fieldNames.add(name);
-			fieldTypes.add(type);
+			fieldDataTypes.add(dataType);
 			return this;
 		}
 
 		/**
+		 * @deprecated Use {@link #field(String, DataType)} instead.
+		 */
+		@Deprecated
+		public Builder field(String name, TypeInformation<?> typeInfo) {
+			return field(name, fromLegacyInfoToDataType(typeInfo));
+		}
+
+		/**
 		 * Returns a {@link TableSchema} instance.
 		 */
 		public TableSchema build() {
 			return new TableSchema(
 				fieldNames.toArray(new String[0]),
-				fieldTypes.toArray(new TypeInformation<?>[0]));
+				fieldDataTypes.toArray(new DataType[0]));
 		}
 	}
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableSchemaTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableSchemaTest.scala
index eaa852f..a5b23a6 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableSchemaTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableSchemaTest.scala
@@ -39,8 +39,8 @@ class TableSchemaTest extends TableTestBase {
     assertEquals(Types.STRING, schema.getFieldTypes.apply(1))
 
     val expectedString = "root\n" +
-      " |-- a: Integer\n" +
-      " |-- b: String\n"
+      " |-- a: INT\n" +
+      " |-- b: STRING\n"
     assertEquals(expectedString, schema.toString)
 
     assertTrue(!schema.getFieldName(3).isPresent)
@@ -61,8 +61,8 @@ class TableSchemaTest extends TableTestBase {
     assertEquals(Types.STRING, schema.getFieldTypes.apply(1))
 
     val expectedString = "root\n" +
-      " |-- a: Integer\n" +
-      " |-- b: String\n"
+      " |-- a: INT\n" +
+      " |-- b: STRING\n"
     assertEquals(expectedString, schema.toString)
 
     assertTrue(!schema.getFieldName(3).isPresent)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/validation/TableSchemaValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/validation/TableSchemaValidationTest.scala
index 44b49f9..ad5b3a1 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/validation/TableSchemaValidationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/validation/TableSchemaValidationTest.scala
@@ -28,10 +28,10 @@ class TableSchemaValidationTest extends TableTestBase {
   def testColumnNameAndColumnTypeNotEqual() {
     thrown.expect(classOf[TableException])
     thrown.expectMessage(
-      "Number of field names and field types must be equal.\n" +
-        "Number of names is 3, number of types is 2.\n" +
+      "Number of field names and field data types must be equal.\n" +
+        "Number of names is 3, number of data types is 2.\n" +
         "List of field names: [a, b, c]\n" +
-        "List of field types: [Integer, String]")
+        "List of field data types: [INT, STRING]")
 
     val fieldNames = Array("a", "b", "c")
     val typeInfos: Array[TypeInformation[_]] = Array(