You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/05/23 14:19:21 UTC
[flink] 01/02: [FLINK-12254][table] Update TableSchema to new type
system
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 98f3046fae76405b3fcf2b5ef82835a81624498e
Author: Timo Walther <tw...@apache.org>
AuthorDate: Tue May 21 16:45:27 2019 +0200
[FLINK-12254][table] Update TableSchema to new type system
---
.../flink/table/client/cli/CliResultView.java | 2 +-
.../apache/flink/table/client/cli/CliUtils.java | 4 +-
.../org/apache/flink/table/api/TableSchema.java | 140 +++++++++++++++------
.../apache/flink/table/api/TableSchemaTest.scala | 8 +-
.../api/validation/TableSchemaValidationTest.scala | 6 +-
5 files changed, 113 insertions(+), 47 deletions(-)
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliResultView.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliResultView.java
index 67ce5eb..407a04b 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliResultView.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliResultView.java
@@ -145,7 +145,7 @@ public abstract class CliResultView<O extends Enum<O>> extends CliView<O, Void>
final CliRowView view = new CliRowView(
client,
resultDescriptor.getResultSchema().getFieldNames(),
- CliUtils.typesToString(resultDescriptor.getResultSchema().getFieldTypes()),
+ CliUtils.typesToString(resultDescriptor.getResultSchema().getFieldDataTypes()),
getRow(results.get(selectedRow)));
view.open(); // enter view
}
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliUtils.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliUtils.java
index 77894e8..471d168 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliUtils.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliUtils.java
@@ -18,8 +18,8 @@
package org.apache.flink.table.client.cli;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.jline.utils.AttributedString;
@@ -103,7 +103,7 @@ public final class CliUtils {
return fields;
}
- public static String[] typesToString(TypeInformation<?>[] types) {
+ public static String[] typesToString(DataType[] types) {
final String[] typesAsString = new String[types.length];
for (int i = 0; i < types.length; i++) {
typesAsString[i] = types[i].toString();
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
index 385f9a0..c6804f9 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
@@ -20,8 +20,9 @@ package org.apache.flink.table.api;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
@@ -33,10 +34,17 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.Field;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo;
+import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
/**
- * A table schema that represents a table's structure with field names and types.
- */
+ * A table schema that represents a table's structure with field names and data types.
+ */
@PublicEvolving
public class TableSchema {
@@ -44,20 +52,20 @@ public class TableSchema {
private final String[] fieldNames;
- private final TypeInformation<?>[] fieldTypes;
+ private final DataType[] fieldDataTypes;
private final Map<String, Integer> fieldNameToIndex;
- public TableSchema(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+ private TableSchema(String[] fieldNames, DataType[] fieldDataTypes) {
this.fieldNames = Preconditions.checkNotNull(fieldNames);
- this.fieldTypes = Preconditions.checkNotNull(fieldTypes);
+ this.fieldDataTypes = Preconditions.checkNotNull(fieldDataTypes);
- if (fieldNames.length != fieldTypes.length) {
+ if (fieldNames.length != fieldDataTypes.length) {
throw new TableException(
- "Number of field names and field types must be equal.\n" +
- "Number of names is " + fieldNames.length + ", number of types is " + fieldTypes.length + ".\n" +
- "List of field names: " + Arrays.toString(fieldNames) + "\n" +
- "List of field types: " + Arrays.toString(fieldTypes));
+ "Number of field names and field data types must be equal.\n" +
+ "Number of names is " + fieldNames.length + ", number of data types is " + fieldDataTypes.length + ".\n" +
+ "List of field names: " + Arrays.toString(fieldNames) + "\n" +
+ "List of field data types: " + Arrays.toString(fieldDataTypes));
}
// validate and create name to index mapping
@@ -66,7 +74,7 @@ public class TableSchema {
final Set<String> uniqueNames = new HashSet<>();
for (int i = 0; i < fieldNames.length; i++) {
// check for null
- Preconditions.checkNotNull(fieldTypes[i]);
+ Preconditions.checkNotNull(fieldDataTypes[i]);
final String fieldName = Preconditions.checkNotNull(fieldNames[i]);
// collect indices
@@ -82,50 +90,84 @@ public class TableSchema {
if (!duplicateNames.isEmpty()) {
throw new TableException(
"Field names must be unique.\n" +
- "List of duplicate fields: " + duplicateNames.toString() + "\n" +
- "List of all fields: " + Arrays.toString(fieldNames));
+ "List of duplicate fields: " + duplicateNames.toString() + "\n" +
+ "List of all fields: " + Arrays.toString(fieldNames));
}
}
/**
+ * @deprecated Use the {@link Builder} instead.
+ */
+ @Deprecated
+ public TableSchema(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+ this(fieldNames, fromLegacyInfoToDataType(fieldTypes));
+ }
+
+ /**
* Returns a deep copy of the table schema.
*/
public TableSchema copy() {
- return new TableSchema(fieldNames.clone(), fieldTypes.clone());
+ return new TableSchema(fieldNames.clone(), fieldDataTypes.clone());
}
/**
- * Returns all field type information as an array.
+ * Returns all field data types as an array.
*/
+ public DataType[] getFieldDataTypes() {
+ return fieldDataTypes;
+ }
+
+ /**
+ * @deprecated Use {@link #getFieldDataTypes()} instead.
+ */
+ @Deprecated
public TypeInformation<?>[] getFieldTypes() {
- return fieldTypes;
+ return fromDataTypeToLegacyInfo(fieldDataTypes);
}
/**
- * Returns the specified type information for the given field index.
+ * Returns the specified data type for the given field index.
*
* @param fieldIndex the index of the field
*/
- public Optional<TypeInformation<?>> getFieldType(int fieldIndex) {
- if (fieldIndex < 0 || fieldIndex >= fieldTypes.length) {
+ public Optional<DataType> getFieldDataType(int fieldIndex) {
+ if (fieldIndex < 0 || fieldIndex >= fieldDataTypes.length) {
return Optional.empty();
}
- return Optional.of(fieldTypes[fieldIndex]);
+ return Optional.of(fieldDataTypes[fieldIndex]);
}
/**
- * Returns the specified type information for the given field name.
+ * @deprecated Use {@link #getFieldDataType(int)}} instead.
+ */
+ @Deprecated
+ public Optional<TypeInformation<?>> getFieldType(int fieldIndex) {
+ return getFieldDataType(fieldIndex)
+ .map(TypeConversions::fromDataTypeToLegacyInfo);
+ }
+
+ /**
+ * Returns the specified data type for the given field name.
*
* @param fieldName the name of the field
*/
- public Optional<TypeInformation<?>> getFieldType(String fieldName) {
+ public Optional<DataType> getFieldDataType(String fieldName) {
if (fieldNameToIndex.containsKey(fieldName)) {
- return Optional.of(fieldTypes[fieldNameToIndex.get(fieldName)]);
+ return Optional.of(fieldDataTypes[fieldNameToIndex.get(fieldName)]);
}
return Optional.empty();
}
/**
+ * @deprecated Use {@link #getFieldDataType(String)} instead.
+ */
+ @Deprecated
+ public Optional<TypeInformation<?>> getFieldType(String fieldName) {
+ return getFieldDataType(fieldName)
+ .map(TypeConversions::fromDataTypeToLegacyInfo);
+ }
+
+ /**
* Returns the number of fields.
*/
public int getFieldCount() {
@@ -152,10 +194,22 @@ public class TableSchema {
}
/**
- * Converts a table schema into a (nested) type information describing a {@link Row}.
+ * Converts a table schema into a (nested) data type describing a {@link DataTypes#ROW(Field...)}.
+ */
+ public DataType toRowDataType() {
+ final Field[] fields = IntStream.range(0, fieldDataTypes.length)
+ .mapToObj(i -> FIELD(fieldNames[i], fieldDataTypes[i]))
+ .toArray(Field[]::new);
+ return ROW(fields);
+ }
+
+ /**
+ * @deprecated Use {@link #toRowDataType()} instead.
*/
+ @Deprecated
+ @SuppressWarnings("unchecked")
public TypeInformation<Row> toRowType() {
- return Types.ROW_NAMED(fieldNames, fieldTypes);
+ return (TypeInformation<Row>) fromDataTypeToLegacyInfo(toRowDataType());
}
@Override
@@ -163,7 +217,7 @@ public class TableSchema {
final StringBuilder sb = new StringBuilder();
sb.append("root\n");
for (int i = 0; i < fieldNames.length; i++) {
- sb.append(" |-- ").append(fieldNames[i]).append(": ").append(fieldTypes[i]).append('\n');
+ sb.append(" |-- ").append(fieldNames[i]).append(": ").append(fieldDataTypes[i]).append('\n');
}
return sb.toString();
}
@@ -178,13 +232,13 @@ public class TableSchema {
}
TableSchema schema = (TableSchema) o;
return Arrays.equals(fieldNames, schema.fieldNames) &&
- Arrays.equals(fieldTypes, schema.fieldTypes);
+ Arrays.equals(fieldDataTypes, schema.fieldDataTypes);
}
@Override
public int hashCode() {
int result = Arrays.hashCode(fieldNames);
- result = 31 * result + Arrays.hashCode(fieldTypes);
+ result = 31 * result + Arrays.hashCode(fieldDataTypes);
return result;
}
@@ -196,7 +250,10 @@ public class TableSchema {
*
* @param typeInfo The {@link TypeInformation} from which the table schema is generated.
* @return The table schema that was generated from the given {@link TypeInformation}.
+ *
+ * @deprecated This method will be removed soon. Use {@link DataTypes} to declare types.
*/
+ @Deprecated
public static TableSchema fromTypeInfo(TypeInformation<?> typeInfo) {
if (typeInfo instanceof CompositeType<?>) {
final CompositeType<?> compositeType = (CompositeType<?>) typeInfo;
@@ -228,32 +285,41 @@ public class TableSchema {
private List<String> fieldNames;
- private List<TypeInformation<?>> fieldTypes;
+ private List<DataType> fieldDataTypes;
public Builder() {
fieldNames = new ArrayList<>();
- fieldTypes = new ArrayList<>();
+ fieldDataTypes = new ArrayList<>();
}
/**
- * Add a field with name and type. The call order of this method determines the order
- * of fields in the schema.
+ * Add a field with name and data type.
+ *
+ * <p>The call order of this method determines the order of fields in the schema.
*/
- public Builder field(String name, TypeInformation<?> type) {
+ public Builder field(String name, DataType dataType) {
Preconditions.checkNotNull(name);
- Preconditions.checkNotNull(type);
+ Preconditions.checkNotNull(dataType);
fieldNames.add(name);
- fieldTypes.add(type);
+ fieldDataTypes.add(dataType);
return this;
}
/**
+ * @deprecated Use {@link #field(String, DataType)} instead.
+ */
+ @Deprecated
+ public Builder field(String name, TypeInformation<?> typeInfo) {
+ return field(name, fromLegacyInfoToDataType(typeInfo));
+ }
+
+ /**
* Returns a {@link TableSchema} instance.
*/
public TableSchema build() {
return new TableSchema(
fieldNames.toArray(new String[0]),
- fieldTypes.toArray(new TypeInformation<?>[0]));
+ fieldDataTypes.toArray(new DataType[0]));
}
}
}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableSchemaTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableSchemaTest.scala
index eaa852f..a5b23a6 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableSchemaTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableSchemaTest.scala
@@ -39,8 +39,8 @@ class TableSchemaTest extends TableTestBase {
assertEquals(Types.STRING, schema.getFieldTypes.apply(1))
val expectedString = "root\n" +
- " |-- a: Integer\n" +
- " |-- b: String\n"
+ " |-- a: INT\n" +
+ " |-- b: STRING\n"
assertEquals(expectedString, schema.toString)
assertTrue(!schema.getFieldName(3).isPresent)
@@ -61,8 +61,8 @@ class TableSchemaTest extends TableTestBase {
assertEquals(Types.STRING, schema.getFieldTypes.apply(1))
val expectedString = "root\n" +
- " |-- a: Integer\n" +
- " |-- b: String\n"
+ " |-- a: INT\n" +
+ " |-- b: STRING\n"
assertEquals(expectedString, schema.toString)
assertTrue(!schema.getFieldName(3).isPresent)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/validation/TableSchemaValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/validation/TableSchemaValidationTest.scala
index 44b49f9..ad5b3a1 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/validation/TableSchemaValidationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/validation/TableSchemaValidationTest.scala
@@ -28,10 +28,10 @@ class TableSchemaValidationTest extends TableTestBase {
def testColumnNameAndColumnTypeNotEqual() {
thrown.expect(classOf[TableException])
thrown.expectMessage(
- "Number of field names and field types must be equal.\n" +
- "Number of names is 3, number of types is 2.\n" +
+ "Number of field names and field data types must be equal.\n" +
+ "Number of names is 3, number of data types is 2.\n" +
"List of field names: [a, b, c]\n" +
- "List of field types: [Integer, String]")
+ "List of field data types: [INT, STRING]")
val fieldNames = Array("a", "b", "c")
val typeInfos: Array[TypeInformation[_]] = Array(