You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2019/05/31 11:45:05 UTC
[flink] 01/02: [FLINK-12431][table-api-java] Port utility methods
for extracting fields information from TypeInformation
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 44c2d7632577b7eeb70ceae73d122be00a18de44
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Tue May 7 12:38:58 2019 +0200
[FLINK-12431][table-api-java] Port utility methods for extracting fields information from TypeInformation
---
.../flink/table/typeutils/FieldInfoUtils.java | 496 +++++++++++++++++++++
.../expressions/BuiltInFunctionDefinitions.java | 4 +
.../table/operations/CalculatedTableFactory.java | 6 +-
.../apache/flink/table/api/BatchTableEnvImpl.scala | 23 +-
.../flink/table/api/StreamTableEnvImpl.scala | 19 +-
.../org/apache/flink/table/api/TableEnvImpl.scala | 220 +--------
.../flink/table/api/java/StreamTableEnvImpl.scala | 21 +-
.../functions/utils/UserDefinedFunctionUtils.scala | 9 +-
8 files changed, 543 insertions(+), 255 deletions(-)
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java
new file mode 100644
index 0000000..a0a3cc6
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java
@@ -0,0 +1,496 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionUtils;
+import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.types.Row;
+
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static java.lang.String.format;
+import static org.apache.flink.table.expressions.BuiltInFunctionDefinitions.TIME_ATTRIBUTES;
+
+/**
+ * Utility classes for extracting names and indices of fields from different {@link TypeInformation}s.
+ */
+public class FieldInfoUtils {
+
+ private static final String ATOMIC_FIELD_NAME = "f0";
+
+ /**
+ * Describes extracted fields and corresponding indices from a {@link TypeInformation}.
+ */
+ public static class FieldsInfo {
+ private final String[] fieldNames;
+ private final int[] indices;
+
+ FieldsInfo(String[] fieldNames, int[] indices) {
+ this.fieldNames = fieldNames;
+ this.indices = indices;
+ }
+
+ public String[] getFieldNames() {
+ return fieldNames;
+ }
+
+ public int[] getIndices() {
+ return indices;
+ }
+ }
+
+ /**
+ * Reference input fields by name:
+ * All fields in the schema definition are referenced by name
+ * (and possibly renamed using an alias (as). In this mode, fields can be reordered and
+ * projected out. Moreover, we can define proctime and rowtime attributes at arbitrary
+ * positions using arbitrary names (except those that exist in the result schema). This mode
+ * can be used for any input type, including POJOs.
+ *
+ * <p>Reference input fields by position:
+ * In this mode, fields are simply renamed. Event-time attributes can
+ * replace the field on their position in the input data (if it is of correct type) or be
+ * appended at the end. Proctime attributes must be appended at the end. This mode can only be
+ * used if the input type has a defined field order (tuple, case class, Row) and no of fields
+ * references a field of the input type.
+ */
+ public static boolean isReferenceByPosition(CompositeType<?> ct, Expression[] fields) {
+ if (!(ct instanceof TupleTypeInfoBase)) {
+ return false;
+ }
+
+ List<String> inputNames = Arrays.asList(ct.getFieldNames());
+
+ // Use the by-position mode if no of the fields exists in the input.
+ // This prevents confusing cases like ('f2, 'f0, 'myName) for a Tuple3 where fields are renamed
+ // by position but the user might assume reordering instead of renaming.
+ return Arrays.stream(fields).allMatch(f -> {
+ if (f instanceof UnresolvedReferenceExpression) {
+ return !inputNames.contains(((UnresolvedReferenceExpression) f).getName());
+ }
+
+ return true;
+ });
+ }
+
+ /**
+ * Returns field names and field positions for a given {@link TypeInformation}.
+ *
+ * @param inputType The TypeInformation extract the field names and positions from.
+ * @param <A> The type of the TypeInformation.
+ * @return A tuple of two arrays holding the field names and corresponding field positions.
+ */
+ public static <A> FieldsInfo getFieldsInfo(TypeInformation<A> inputType) {
+
+ if (inputType instanceof GenericTypeInfo && inputType.getTypeClass() == Row.class) {
+ throw new TableException(
+ "An input of GenericTypeInfo<Row> cannot be converted to Table. " +
+ "Please specify the type of the input with a RowTypeInfo.");
+ } else {
+ return new FieldsInfo(getFieldNames(inputType), getFieldIndices(inputType));
+ }
+ }
+
+ /**
+ * Returns field names and field positions for a given {@link TypeInformation} and array of
+ * {@link Expression}. It does not handle time attributes but considers them in indices.
+ *
+ * @param inputType The {@link TypeInformation} against which the {@link Expression}s are evaluated.
+ * @param exprs The expressions that define the field names.
+ * @param <A> The type of the TypeInformation.
+ * @return A tuple of two arrays holding the field names and corresponding field positions.
+ */
+ public static <A> FieldsInfo getFieldsInfo(TypeInformation<A> inputType, Expression[] exprs) {
+ validateInputTypeInfo(inputType);
+
+ final Set<FieldInfo> fieldInfos;
+ if (inputType instanceof GenericTypeInfo && inputType.getTypeClass() == Row.class) {
+ throw new TableException(
+ "An input of GenericTypeInfo<Row> cannot be converted to Table. " +
+ "Please specify the type of the input with a RowTypeInfo.");
+ } else if (inputType instanceof TupleTypeInfoBase) {
+ fieldInfos = extractFieldInfosFromTupleType((CompositeType) inputType, exprs);
+ } else if (inputType instanceof PojoTypeInfo) {
+ fieldInfos = extractFieldInfosByNameReference((CompositeType) inputType, exprs);
+ } else {
+ fieldInfos = extractFieldInfoFromAtomicType(exprs);
+ }
+
+ if (fieldInfos.stream().anyMatch(info -> info.getFieldName().equals("*"))) {
+ throw new TableException("Field name can not be '*'.");
+ }
+
+ String[] fieldNames = fieldInfos.stream().map(FieldInfo::getFieldName).toArray(String[]::new);
+ int[] fieldIndices = fieldInfos.stream().mapToInt(FieldInfo::getIndex).toArray();
+ return new FieldsInfo(fieldNames, fieldIndices);
+ }
+
+ /**
+ * Returns field names for a given {@link TypeInformation}.
+ *
+ * @param inputType The TypeInformation extract the field names.
+ * @param <A> The type of the TypeInformation.
+ * @return An array holding the field names
+ */
+ public static <A> String[] getFieldNames(TypeInformation<A> inputType) {
+ validateInputTypeInfo(inputType);
+
+ final String[] fieldNames;
+ if (inputType instanceof CompositeType) {
+ fieldNames = ((CompositeType<A>) inputType).getFieldNames();
+ } else {
+ fieldNames = new String[]{ATOMIC_FIELD_NAME};
+ }
+
+ if (Arrays.asList(fieldNames).contains("*")) {
+ throw new TableException("Field name can not be '*'.");
+ }
+
+ return fieldNames;
+ }
+
+ /**
+ * Validate if class represented by the typeInfo is static and globally accessible.
+ *
+ * @param typeInfo type to check
+ * @throws TableException if type does not meet these criteria
+ */
+ public static <A> void validateInputTypeInfo(TypeInformation<A> typeInfo) {
+ Class<A> clazz = typeInfo.getTypeClass();
+ if ((clazz.isMemberClass() && !Modifier.isStatic(clazz.getModifiers())) ||
+ !Modifier.isPublic(clazz.getModifiers()) ||
+ clazz.getCanonicalName() == null) {
+ throw new TableException(format(
+ "Class '%s' described in type information '%s' must be " +
+ "static and globally accessible.", clazz, typeInfo));
+ }
+ }
+
+ /**
+ * Returns field indexes for a given {@link TypeInformation}.
+ *
+ * @param inputType The TypeInformation extract the field positions from.
+ * @return An array holding the field positions
+ */
+ public static int[] getFieldIndices(TypeInformation<?> inputType) {
+ return IntStream.range(0, getFieldNames(inputType).length).toArray();
+ }
+
+ /**
+ * Returns field types for a given {@link TypeInformation}.
+ *
+ * @param inputType The TypeInformation to extract field types from.
+ * @return An array holding the field types.
+ */
+ public static TypeInformation<?>[] getFieldTypes(TypeInformation<?> inputType) {
+ validateInputTypeInfo(inputType);
+
+ final TypeInformation<?>[] fieldTypes;
+ if (inputType instanceof CompositeType) {
+ int arity = inputType.getArity();
+ CompositeType ct = (CompositeType) inputType;
+ fieldTypes = IntStream.range(0, arity).mapToObj(ct::getTypeAt).toArray(TypeInformation[]::new);
+ } else {
+ fieldTypes = new TypeInformation[]{inputType};
+ }
+
+ return fieldTypes;
+ }
+
+ public static <T> TableSchema calculateTableSchema(
+ TypeInformation<T> typeInfo,
+ int[] fieldIndexes,
+ String[] fieldNames) {
+
+ if (fieldIndexes.length != fieldNames.length) {
+ throw new TableException(String.format(
+ "Number of field names and field indexes must be equal.\n" +
+ "Number of names is %s, number of indexes is %s.\n" +
+ "List of column names: %s.\n" +
+ "List of column indexes: %s.",
+ fieldNames.length,
+ fieldIndexes.length,
+ String.join(", ", fieldNames),
+ Arrays.stream(fieldIndexes).mapToObj(Integer::toString).collect(Collectors.joining(", "))));
+ }
+
+ // check uniqueness of field names
+ Set<String> duplicatedNames = findDuplicates(fieldNames);
+ if (duplicatedNames.size() != 0) {
+
+ throw new TableException(String.format(
+ "Field names must be unique.\n" +
+ "List of duplicate fields: [%s].\n" +
+ "List of all fields: [%s].",
+ String.join(", ", duplicatedNames),
+ String.join(", ", fieldNames)));
+ }
+
+ final TypeInformation[] types;
+ long fieldIndicesCount = Arrays.stream(fieldIndexes).filter(i -> i >= 0).count();
+ if (typeInfo instanceof CompositeType) {
+ CompositeType ct = (CompositeType) typeInfo;
+ // it is ok to leave out fields
+ if (fieldIndicesCount > ct.getArity()) {
+ throw new TableException(String.format(
+ "Arity of type (%s) must not be greater than number of field names %s.",
+ Arrays.toString(ct.getFieldNames()),
+ Arrays.toString(fieldNames)));
+ }
+
+ types = Arrays.stream(fieldIndexes)
+ .mapToObj(idx -> extractTimeMarkerType(idx).orElseGet(() -> ct.getTypeAt(idx)))
+ .toArray(TypeInformation[]::new);
+ } else {
+ if (fieldIndicesCount > 1) {
+ throw new TableException(
+ "Non-composite input type may have only a single field and its index must be 0.");
+ }
+
+ types = Arrays.stream(fieldIndexes)
+ .mapToObj(idx -> extractTimeMarkerType(idx).orElse(typeInfo))
+ .toArray(TypeInformation[]::new);
+ }
+
+ return new TableSchema(fieldNames, types);
+ }
+
+ private static Optional<TypeInformation<?>> extractTimeMarkerType(int idx) {
+ switch (idx) {
+ case TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER:
+ return Optional.of(TimeIndicatorTypeInfo.ROWTIME_INDICATOR);
+ case TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER:
+ return Optional.of(TimeIndicatorTypeInfo.PROCTIME_INDICATOR);
+ case TimeIndicatorTypeInfo.ROWTIME_BATCH_MARKER:
+ case TimeIndicatorTypeInfo.PROCTIME_BATCH_MARKER:
+ return Optional.of(Types.SQL_TIMESTAMP);
+ default:
+ return Optional.empty();
+ }
+ }
+
+
+
+ /* Utility methods */
+
+ private static Set<FieldInfo> extractFieldInfoFromAtomicType(Expression[] exprs) {
+ boolean referenced = false;
+ FieldInfo fieldInfo = null;
+ for (Expression expr : exprs) {
+ if (expr instanceof UnresolvedReferenceExpression) {
+ if (referenced) {
+ throw new TableException("Only the first field can reference an atomic type.");
+ } else {
+ referenced = true;
+ fieldInfo = new FieldInfo(((UnresolvedReferenceExpression) expr).getName(), 0);
+ }
+ } else if (!isTimeAttribute(expr)) { // IGNORE Time attributes
+ throw new TableException("Field reference expression expected.");
+ }
+ }
+
+ if (fieldInfo != null) {
+ return Collections.singleton(fieldInfo);
+ }
+
+ return Collections.emptySet();
+ }
+
+ private static <A> Set<FieldInfo> extractFieldInfosByNameReference(CompositeType inputType, Expression[] exprs) {
+ ExprToFieldInfo exprToFieldInfo = new ExprToFieldInfo(inputType);
+ return Arrays.stream(exprs)
+ .map(expr -> expr.accept(exprToFieldInfo))
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .collect(Collectors.toCollection(LinkedHashSet::new));
+ }
+
+ private static <A> Set<FieldInfo> extractFieldInfosFromTupleType(CompositeType inputType, Expression[] exprs) {
+ boolean isRefByPos = isReferenceByPosition((CompositeType<?>) inputType, exprs);
+
+ if (isRefByPos) {
+ return IntStream.range(0, exprs.length)
+ .mapToObj(idx -> exprs[idx].accept(new IndexedExprToFieldInfo(idx)))
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .collect(Collectors.toCollection(LinkedHashSet::new));
+ } else {
+ return extractFieldInfosByNameReference(inputType, exprs);
+ }
+ }
+
+ private static class FieldInfo {
+ private final String fieldName;
+ private final int index;
+
+ FieldInfo(String fieldName, int index) {
+ this.fieldName = fieldName;
+ this.index = index;
+ }
+
+ public String getFieldName() {
+ return fieldName;
+ }
+
+ public int getIndex() {
+ return index;
+ }
+ }
+
+ private static class IndexedExprToFieldInfo extends ApiExpressionDefaultVisitor<Optional<FieldInfo>> {
+
+ private final int index;
+
+ private IndexedExprToFieldInfo(int index) {
+ this.index = index;
+ }
+
+ @Override
+ public Optional<FieldInfo> visitUnresolvedReference(UnresolvedReferenceExpression unresolvedReference) {
+ String fieldName = unresolvedReference.getName();
+ return Optional.of(new FieldInfo(fieldName, index));
+ }
+
+ @Override
+ public Optional<FieldInfo> visitCall(CallExpression call) {
+ if (call.getFunctionDefinition() == BuiltInFunctionDefinitions.AS) {
+ List<Expression> children = call.getChildren();
+ Expression origExpr = children.get(0);
+ String newName = ExpressionUtils.extractValue(children.get(1), Types.STRING)
+ .orElseThrow(() ->
+ new TableException("Alias expects string literal as new name. Got: " + children.get(1)));
+
+ if (origExpr instanceof UnresolvedReferenceExpression) {
+ throw new TableException(
+ format("Alias '%s' is not allowed if other fields are referenced by position.", newName));
+ } else if (isTimeAttribute(origExpr)) {
+ return Optional.empty();
+ }
+ } else if (isTimeAttribute(call)) {
+ return Optional.empty();
+ }
+
+ return defaultMethod(call);
+ }
+
+ @Override
+ protected Optional<FieldInfo> defaultMethod(Expression expression) {
+ throw new TableException("Field reference expression or alias on field expression expected.");
+ }
+ }
+
+ private static class ExprToFieldInfo extends ApiExpressionDefaultVisitor<Optional<FieldInfo>> {
+
+ private final CompositeType ct;
+
+ private ExprToFieldInfo(CompositeType ct) {
+ this.ct = ct;
+ }
+
+ @Override
+ public Optional<FieldInfo> visitUnresolvedReference(UnresolvedReferenceExpression unresolvedReference) {
+ String fieldName = unresolvedReference.getName();
+ return referenceByName(fieldName, ct).map(idx -> new FieldInfo(fieldName, idx));
+ }
+
+ @Override
+ public Optional<FieldInfo> visitCall(CallExpression call) {
+ if (call.getFunctionDefinition() == BuiltInFunctionDefinitions.AS) {
+ List<Expression> children = call.getChildren();
+ Expression origExpr = children.get(0);
+ String newName = ExpressionUtils.extractValue(children.get(1), Types.STRING)
+ .orElseThrow(() ->
+ new TableException("Alias expects string literal as new name. Got: " + children.get(1)));
+
+ if (origExpr instanceof UnresolvedReferenceExpression) {
+ return referenceByName(((UnresolvedReferenceExpression) origExpr).getName(), ct)
+ .map(idx -> new FieldInfo(newName, idx));
+ } else if (isTimeAttribute(origExpr)) {
+ return Optional.empty();
+ }
+ } else if (isTimeAttribute(call)) {
+ return Optional.empty();
+ }
+
+ return defaultMethod(call);
+ }
+
+ @Override
+ protected Optional<FieldInfo> defaultMethod(Expression expression) {
+ throw new TableException("Field reference expression or alias on field expression expected.");
+ }
+ }
+
+ private static boolean isTimeAttribute(Expression origExpr) {
+ return origExpr instanceof CallExpression &&
+ TIME_ATTRIBUTES.contains(((CallExpression) origExpr).getFunctionDefinition());
+ }
+
+ private static Optional<Integer> referenceByName(String name, CompositeType<?> ct) {
+ int inputIdx = ct.getFieldIndex(name);
+ if (inputIdx < 0) {
+ throw new TableException(format(
+ "%s is not a field of type %s. Expected: %s}",
+ name,
+ ct,
+ String.join(", ", ct.getFieldNames())));
+ } else {
+ return Optional.of(inputIdx);
+ }
+ }
+
+ private static <T> Set<T> findDuplicates(T[] array) {
+ Set<T> duplicates = new HashSet<>();
+ Set<T> seenElements = new HashSet<>();
+
+ for (T t : array) {
+ if (seenElements.contains(t)) {
+ duplicates.add(t);
+ } else {
+ seenElements.add(t);
+ }
+ }
+
+ return duplicates;
+ }
+
+ private FieldInfoUtils() {
+ }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/BuiltInFunctionDefinitions.java
index a326be8..e74e129 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/BuiltInFunctionDefinitions.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/BuiltInFunctionDefinitions.java
@@ -343,6 +343,10 @@ public final class BuiltInFunctionDefinitions {
WINDOW_START, WINDOW_END, PROCTIME, ROWTIME
));
+ public static final Set<FunctionDefinition> TIME_ATTRIBUTES = new HashSet<>(Arrays.asList(
+ PROCTIME, ROWTIME
+ ));
+
public static final List<FunctionDefinition> ORDERING = Arrays.asList(ORDER_ASC, ORDER_DESC);
public static List<FunctionDefinition> getDefinitions() {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/CalculatedTableFactory.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/CalculatedTableFactory.java
index 3f2953b..860b551 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/CalculatedTableFactory.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/CalculatedTableFactory.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.operations;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.table.api.TableEnvImpl$;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor;
@@ -30,6 +29,7 @@ import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ExpressionUtils;
import org.apache.flink.table.expressions.FunctionDefinition;
import org.apache.flink.table.expressions.TableFunctionDefinition;
+import org.apache.flink.table.typeutils.FieldInfoUtils;
import java.util.Collections;
import java.util.List;
@@ -103,7 +103,7 @@ public class CalculatedTableFactory {
String[] fieldNames;
if (aliasesSize == 0) {
- fieldNames = TableEnvImpl$.MODULE$.getFieldNames(resultType);
+ fieldNames = FieldInfoUtils.getFieldNames(resultType);
} else if (aliasesSize != callArity) {
throw new ValidationException(String.format(
"List of column aliases must have same degree as table; " +
@@ -116,7 +116,7 @@ public class CalculatedTableFactory {
fieldNames = aliases.toArray(new String[aliasesSize]);
}
- TypeInformation<?>[] fieldTypes = TableEnvImpl$.MODULE$.getFieldTypes(resultType);
+ TypeInformation<?>[] fieldTypes = FieldInfoUtils.getFieldTypes(resultType);
return new CalculatedTableOperation(
tableFunctionDefinition.getTableFunction(),
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvImpl.scala
index 312aed9..d0ee440 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvImpl.scala
@@ -33,7 +33,8 @@ import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
import org.apache.flink.table.catalog.CatalogManager
import org.apache.flink.table.descriptors.{BatchTableDescriptor, ConnectorDescriptor}
import org.apache.flink.table.explain.PlanJsonParser
-import org.apache.flink.table.expressions.{Expression, TimeAttribute}
+import org.apache.flink.table.expressions.BuiltInFunctionDefinitions.TIME_ATTRIBUTES
+import org.apache.flink.table.expressions.{CallExpression, Expression}
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.dataset.DataSetRel
import org.apache.flink.table.plan.rules.FlinkRuleSets
@@ -41,6 +42,7 @@ import org.apache.flink.table.plan.schema._
import org.apache.flink.table.runtime.MapRunner
import org.apache.flink.table.sinks._
import org.apache.flink.table.sources.{BatchTableSource, TableSource}
+import org.apache.flink.table.typeutils.FieldInfoUtils.{getFieldsInfo, validateInputTypeInfo}
import org.apache.flink.types.Row
/**
@@ -319,11 +321,11 @@ abstract class BatchTableEnvImpl(
*/
protected def registerDataSetInternal[T](name: String, dataSet: DataSet[T]): Unit = {
- val (fieldNames, fieldIndexes) = getFieldInfo[T](dataSet.getType)
+ val fieldInfo = getFieldsInfo[T](dataSet.getType)
val dataSetTable = new DataSetTable[T](
dataSet,
- fieldIndexes,
- fieldNames
+ fieldInfo.getIndices,
+ fieldInfo.getFieldNames
)
registerTableInternal(name, dataSetTable)
}
@@ -341,18 +343,19 @@ abstract class BatchTableEnvImpl(
name: String, dataSet: DataSet[T], fields: Array[Expression]): Unit = {
val inputType = dataSet.getType
- val bridgedFields = fields.map(expressionBridge.bridge).toArray[Expression]
- val (fieldNames, fieldIndexes) = getFieldInfo[T](
+ val fieldsInfo = getFieldsInfo[T](
inputType,
- bridgedFields)
+ fields)
- if (bridgedFields.exists(_.isInstanceOf[TimeAttribute])) {
+ if (fields.exists(f =>
+ f.isInstanceOf[CallExpression] &&
+ TIME_ATTRIBUTES.contains(f.asInstanceOf[CallExpression].getFunctionDefinition))) {
throw new ValidationException(
".rowtime and .proctime time indicators are not allowed in a batch environment.")
}
- val dataSetTable = new DataSetTable[T](dataSet, fieldIndexes, fieldNames)
+ val dataSetTable = new DataSetTable[T](dataSet, fieldsInfo.getIndices, fieldsInfo.getFieldNames)
registerTableInternal(name, dataSetTable)
}
@@ -416,7 +419,7 @@ abstract class BatchTableEnvImpl(
logicalPlan: RelNode,
logicalType: RelDataType,
queryConfig: BatchQueryConfig)(implicit tpe: TypeInformation[A]): DataSet[A] = {
- TableEnvImpl.validateType(tpe)
+ validateInputTypeInfo(tpe)
logicalPlan match {
case node: DataSetRel =>
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/StreamTableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/StreamTableEnvImpl.scala
index 62d44a2..40ecada 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/StreamTableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/StreamTableEnvImpl.scala
@@ -52,6 +52,7 @@ import org.apache.flink.table.runtime.{CRowMapRunner, OutputRowtimeProcessFuncti
import org.apache.flink.table.sinks._
import org.apache.flink.table.sources.{StreamTableSource, TableSource, TableSourceUtil}
import org.apache.flink.table.typeutils.{TimeIndicatorTypeInfo, TypeCheckUtils}
+import org.apache.flink.table.typeutils.FieldInfoUtils.{getFieldsInfo, isReferenceByPosition}
import _root_.scala.collection.JavaConverters._
@@ -443,11 +444,11 @@ abstract class StreamTableEnvImpl(
name: String,
dataStream: DataStream[T]): Unit = {
- val (fieldNames, fieldIndexes) = getFieldInfo[T](dataStream.getType)
+ val fieldInfo = getFieldsInfo[T](dataStream.getType)
val dataStreamTable = new DataStreamTable[T](
dataStream,
- fieldIndexes,
- fieldNames
+ fieldInfo.getIndices,
+ fieldInfo.getFieldNames
)
registerTableInternal(name, dataStreamTable)
}
@@ -468,13 +469,12 @@ abstract class StreamTableEnvImpl(
: Unit = {
val streamType = dataStream.getType
- val bridgedFields = fields.map(expressionBridge.bridge).toArray[Expression]
// get field names and types for all non-replaced fields
- val (fieldNames, fieldIndexes) = getFieldInfo[T](streamType, bridgedFields)
+ val fieldsInfo = getFieldsInfo[T](streamType, fields)
// validate and extract time attributes
- val (rowtime, proctime) = validateAndExtractTimeAttributes(streamType, bridgedFields)
+ val (rowtime, proctime) = validateAndExtractTimeAttributes(streamType, fields)
// check if event-time is enabled
if (rowtime.isDefined && execEnv.getStreamTimeCharacteristic != TimeCharacteristic.EventTime) {
@@ -484,8 +484,8 @@ abstract class StreamTableEnvImpl(
}
// adjust field indexes and field names
- val indexesWithIndicatorFields = adjustFieldIndexes(fieldIndexes, rowtime, proctime)
- val namesWithIndicatorFields = adjustFieldNames(fieldNames, rowtime, proctime)
+ val indexesWithIndicatorFields = adjustFieldIndexes(fieldsInfo.getIndices, rowtime, proctime)
+ val namesWithIndicatorFields = adjustFieldNames(fieldsInfo.getFieldNames, rowtime, proctime)
val dataStreamTable = new DataStreamTable[T](
dataStream,
@@ -593,7 +593,8 @@ abstract class StreamTableEnvImpl(
}
}
- exprs.zipWithIndex.foreach {
+ val bridgedFields = exprs.map(expressionBridge.bridge).toArray[Expression]
+ bridgedFields.zipWithIndex.foreach {
case (RowtimeAttribute(UnresolvedFieldReference(name)), idx) =>
extractRowtime(idx, name, None)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/TableEnvImpl.scala
index 5ada652..5dd46cc 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/TableEnvImpl.scala
@@ -37,9 +37,7 @@ import org.apache.calcite.tools._
import org.apache.flink.annotation.VisibleForTesting
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.typeutils.{RowTypeInfo, _}
-import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, PojoTypeInfo, TupleTypeInfoBase}
import org.apache.flink.table.calcite._
import org.apache.flink.table.catalog._
import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction}
@@ -720,153 +718,6 @@ abstract class TableEnvImpl(
planningConfigurationBuilder.createFlinkPlanner(currentCatalogName, currentDatabase)
}
- /**
- * Reference input fields by name:
- * All fields in the schema definition are referenced by name
- * (and possibly renamed using an alias (as). In this mode, fields can be reordered and
- * projected out. Moreover, we can define proctime and rowtime attributes at arbitrary
- * positions using arbitrary names (except those that exist in the result schema). This mode
- * can be used for any input type, including POJOs.
- *
- * Reference input fields by position:
- * In this mode, fields are simply renamed. Event-time attributes can
- * replace the field on their position in the input data (if it is of correct type) or be
- * appended at the end. Proctime attributes must be appended at the end. This mode can only be
- * used if the input type has a defined field order (tuple, case class, Row) and no of fields
- * references a field of the input type.
- */
- protected def isReferenceByPosition(ct: CompositeType[_], fields: Array[Expression]): Boolean = {
- if (!ct.isInstanceOf[TupleTypeInfoBase[_]]) {
- return false
- }
-
- val inputNames = ct.getFieldNames
-
- // Use the by-position mode if no of the fields exists in the input.
- // This prevents confusing cases like ('f2, 'f0, 'myName) for a Tuple3 where fields are renamed
- // by position but the user might assume reordering instead of renaming.
- fields.forall {
- case UnresolvedFieldReference(name) => !inputNames.contains(name)
- case _ => true
- }
- }
-
- /**
- * Returns field names and field positions for a given [[TypeInformation]].
- *
- * @param inputType The TypeInformation extract the field names and positions from.
- * @tparam A The type of the TypeInformation.
- * @return A tuple of two arrays holding the field names and corresponding field positions.
- */
- protected[flink] def getFieldInfo[A](inputType: TypeInformation[A]):
- (Array[String], Array[Int]) = {
-
- if (inputType.isInstanceOf[GenericTypeInfo[A]] && inputType.getTypeClass == classOf[Row]) {
- throw new TableException(
- "An input of GenericTypeInfo<Row> cannot be converted to Table. " +
- "Please specify the type of the input with a RowTypeInfo.")
- } else {
- (TableEnvImpl.getFieldNames(inputType), TableEnvImpl.getFieldIndices(inputType))
- }
- }
-
- /**
- * Returns field names and field positions for a given [[TypeInformation]] and [[Array]] of
- * [[Expression]]. It does not handle time attributes but considers them in indices.
- *
- * @param inputType The [[TypeInformation]] against which the [[Expression]]s are evaluated.
- * @param exprs The expressions that define the field names.
- * @tparam A The type of the TypeInformation.
- * @return A tuple of two arrays holding the field names and corresponding field positions.
- */
- protected def getFieldInfo[A](
- inputType: TypeInformation[A],
- exprs: Array[Expression])
- : (Array[String], Array[Int]) = {
-
- TableEnvImpl.validateType(inputType)
-
- def referenceByName(name: String, ct: CompositeType[_]): Option[Int] = {
- val inputIdx = ct.getFieldIndex(name)
- if (inputIdx < 0) {
- throw new TableException(s"$name is not a field of type $ct. " +
- s"Expected: ${ct.getFieldNames.mkString(", ")}")
- } else {
- Some(inputIdx)
- }
- }
-
- val indexedNames: Array[(Int, String)] = inputType match {
-
- case g: GenericTypeInfo[A] if g.getTypeClass == classOf[Row] =>
- throw new TableException(
- "An input of GenericTypeInfo<Row> cannot be converted to Table. " +
- "Please specify the type of the input with a RowTypeInfo.")
-
- case t: TupleTypeInfoBase[A] if t.isInstanceOf[TupleTypeInfo[A]] ||
- t.isInstanceOf[CaseClassTypeInfo[A]] || t.isInstanceOf[RowTypeInfo] =>
-
- // determine schema definition mode (by position or by name)
- val isRefByPos = isReferenceByPosition(t, exprs)
-
- exprs.zipWithIndex flatMap {
- case (UnresolvedFieldReference(name: String), idx) =>
- if (isRefByPos) {
- Some((idx, name))
- } else {
- referenceByName(name, t).map((_, name))
- }
- case (Alias(UnresolvedFieldReference(origName), name: String, _), _) =>
- if (isRefByPos) {
- throw new TableException(
- s"Alias '$name' is not allowed if other fields are referenced by position.")
- } else {
- referenceByName(origName, t).map((_, name))
- }
- case (_: TimeAttribute, _) | (Alias(_: TimeAttribute, _, _), _) =>
- None
- case _ => throw new TableException(
- "Field reference expression or alias on field expression expected.")
- }
-
- case p: PojoTypeInfo[A] =>
- exprs flatMap {
- case UnresolvedFieldReference(name: String) =>
- referenceByName(name, p).map((_, name))
- case Alias(UnresolvedFieldReference(origName), name: String, _) =>
- referenceByName(origName, p).map((_, name))
- case _: TimeAttribute | Alias(_: TimeAttribute, _, _) =>
- None
- case _ => throw new TableException(
- "Field reference expression or alias on field expression expected.")
- }
-
- case _: TypeInformation[_] => // atomic or other custom type information
- var referenced = false
- exprs flatMap {
- case _: TimeAttribute =>
- None
- case UnresolvedFieldReference(_) if referenced =>
- // only accept the first field for an atomic type
- throw new TableException("Only the first field can reference an atomic type.")
- case UnresolvedFieldReference(name: String) =>
- referenced = true
- // first field reference is mapped to atomic type
- Some((0, name))
- case _ => throw new TableException(
- "Field reference expression expected.")
- }
- }
-
- val (fieldIndexes, fieldNames) = indexedNames.unzip
-
- if (fieldNames.contains("*")) {
- throw new TableException("Field name can not be '*'.")
- }
-
- (fieldNames, fieldIndexes)
- }
-
protected def generateRowConverterFunction[OUT](
inputTypeInfo: TypeInformation[Row],
schema: RowSchema,
@@ -980,72 +831,3 @@ abstract class TableEnvImpl(
Some(generated)
}
}
-
-/**
- * Object to instantiate a [[TableEnvImpl]] depending on the batch or stream execution environment.
- */
-object TableEnvImpl {
-
- /**
- * Returns field names for a given [[TypeInformation]].
- *
- * @param inputType The TypeInformation extract the field names.
- * @tparam A The type of the TypeInformation.
- * @return An array holding the field names
- */
- def getFieldNames[A](inputType: TypeInformation[A]): Array[String] = {
- validateType(inputType)
-
- val fieldNames: Array[String] = inputType match {
- case t: CompositeType[_] => t.getFieldNames
- case _: TypeInformation[_] => Array("f0")
- }
-
- if (fieldNames.contains("*")) {
- throw new TableException("Field name can not be '*'.")
- }
-
- fieldNames
- }
-
- /**
- * Validate if class represented by the typeInfo is static and globally accessible
- * @param typeInfo type to check
- * @throws TableException if type does not meet these criteria
- */
- def validateType(typeInfo: TypeInformation[_]): Unit = {
- val clazz = typeInfo.getTypeClass
- if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
- !Modifier.isPublic(clazz.getModifiers) ||
- clazz.getCanonicalName == null) {
- throw new TableException(
- s"Class '$clazz' described in type information '$typeInfo' must be " +
- s"static and globally accessible.")
- }
- }
-
- /**
- * Returns field indexes for a given [[TypeInformation]].
- *
- * @param inputType The TypeInformation extract the field positions from.
- * @return An array holding the field positions
- */
- def getFieldIndices(inputType: TypeInformation[_]): Array[Int] = {
- getFieldNames(inputType).indices.toArray
- }
-
- /**
- * Returns field types for a given [[TypeInformation]].
- *
- * @param inputType The TypeInformation to extract field types from.
- * @return An array holding the field types.
- */
- def getFieldTypes(inputType: TypeInformation[_]): Array[TypeInformation[_]] = {
- validateType(inputType)
-
- inputType match {
- case ct: CompositeType[_] => 0.until(ct.getArity).map(i => ct.getTypeAt(i)).toArray
- case t: TypeInformation[_] => Array(t.asInstanceOf[TypeInformation[_]])
- }
- }
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvImpl.scala
index d876195..235370e 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvImpl.scala
@@ -17,17 +17,18 @@
*/
package org.apache.flink.table.api.java
+import _root_.java.lang.{Boolean => JBool}
+
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor}
import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
-import org.apache.flink.table.api._
-import org.apache.flink.table.functions.{AggregateFunction, TableAggregateFunction, TableFunction, UserDefinedAggregateFunction}
-import org.apache.flink.table.expressions.ExpressionParser
+import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor}
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-import _root_.java.lang.{Boolean => JBool}
-
+import org.apache.flink.table.api._
import org.apache.flink.table.catalog.CatalogManager
+import org.apache.flink.table.expressions.ExpressionParser
+import org.apache.flink.table.functions.{AggregateFunction, TableAggregateFunction, TableFunction, UserDefinedAggregateFunction}
+import org.apache.flink.table.typeutils.FieldInfoUtils
import _root_.scala.collection.JavaConverters._
@@ -95,7 +96,7 @@ class StreamTableEnvImpl(
clazz: Class[T],
queryConfig: StreamQueryConfig): DataStream[T] = {
val typeInfo = TypeExtractor.createTypeInfo(clazz)
- TableEnvImpl.validateType(typeInfo)
+ FieldInfoUtils.validateInputTypeInfo(typeInfo)
translate[T](table, queryConfig, updatesAsRetraction = false, withChangeFlag = false)(typeInfo)
}
@@ -103,7 +104,7 @@ class StreamTableEnvImpl(
table: Table,
typeInfo: TypeInformation[T],
queryConfig: StreamQueryConfig): DataStream[T] = {
- TableEnvImpl.validateType(typeInfo)
+ FieldInfoUtils.validateInputTypeInfo(typeInfo)
translate[T](table, queryConfig, updatesAsRetraction = false, withChangeFlag = false)(typeInfo)
}
@@ -127,7 +128,7 @@ class StreamTableEnvImpl(
queryConfig: StreamQueryConfig): DataStream[JTuple2[JBool, T]] = {
val typeInfo = TypeExtractor.createTypeInfo(clazz)
- TableEnvImpl.validateType(typeInfo)
+ FieldInfoUtils.validateInputTypeInfo(typeInfo)
val resultType = new TupleTypeInfo[JTuple2[JBool, T]](Types.BOOLEAN, typeInfo)
translate[JTuple2[JBool, T]](
table,
@@ -141,7 +142,7 @@ class StreamTableEnvImpl(
typeInfo: TypeInformation[T],
queryConfig: StreamQueryConfig): DataStream[JTuple2[JBool, T]] = {
- TableEnvImpl.validateType(typeInfo)
+ FieldInfoUtils.validateInputTypeInfo(typeInfo)
val resultTypeInfo = new TupleTypeInfo[JTuple2[JBool, T]](
Types.BOOLEAN,
typeInfo
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
index f8443b2..777aa8e 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
@@ -34,11 +34,12 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo, TypeExtractor}
import org.apache.flink.table.api.dataview._
-import org.apache.flink.table.api.{TableEnvImpl, TableException, ValidationException}
+import org.apache.flink.table.api.{TableException, ValidationException}
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.dataview._
import org.apache.flink.table.functions._
import org.apache.flink.table.plan.schema.FlinkTableFunctionImpl
+import org.apache.flink.table.typeutils.FieldInfoUtils
import org.apache.flink.util.InstantiationUtil
import scala.collection.mutable
@@ -699,9 +700,9 @@ object UserDefinedFunctionUtils {
def getFieldInfo(inputType: TypeInformation[_])
: (Array[String], Array[Int], Array[TypeInformation[_]]) = {
- (TableEnvImpl.getFieldNames(inputType),
- TableEnvImpl.getFieldIndices(inputType),
- TableEnvImpl.getFieldTypes(inputType))
+ (FieldInfoUtils.getFieldNames(inputType),
+ FieldInfoUtils.getFieldIndices(inputType),
+ FieldInfoUtils.getFieldTypes(inputType))
}
/**