You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2019/05/31 11:45:05 UTC

[flink] 01/02: [FLINK-12431][table-api-java] Port utility methods for extracting fields information from TypeInformation

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 44c2d7632577b7eeb70ceae73d122be00a18de44
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Tue May 7 12:38:58 2019 +0200

    [FLINK-12431][table-api-java] Port utility methods for extracting fields information from TypeInformation
---
 .../flink/table/typeutils/FieldInfoUtils.java      | 496 +++++++++++++++++++++
 .../expressions/BuiltInFunctionDefinitions.java    |   4 +
 .../table/operations/CalculatedTableFactory.java   |   6 +-
 .../apache/flink/table/api/BatchTableEnvImpl.scala |  23 +-
 .../flink/table/api/StreamTableEnvImpl.scala       |  19 +-
 .../org/apache/flink/table/api/TableEnvImpl.scala  | 220 +--------
 .../flink/table/api/java/StreamTableEnvImpl.scala  |  21 +-
 .../functions/utils/UserDefinedFunctionUtils.scala |   9 +-
 8 files changed, 543 insertions(+), 255 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java
new file mode 100644
index 0000000..a0a3cc6
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java
@@ -0,0 +1,496 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionUtils;
+import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.types.Row;
+
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static java.lang.String.format;
+import static org.apache.flink.table.expressions.BuiltInFunctionDefinitions.TIME_ATTRIBUTES;
+
+/**
+ * Utility classes for extracting names and indices of fields from different {@link TypeInformation}s.
+ */
+public class FieldInfoUtils {
+
+	private static final String ATOMIC_FIELD_NAME = "f0";
+
+	/**
+	 * Describes extracted fields and corresponding indices from a {@link TypeInformation}.
+	 */
+	public static class FieldsInfo {
+		private final String[] fieldNames;
+		private final int[] indices;
+
+		FieldsInfo(String[] fieldNames, int[] indices) {
+			this.fieldNames = fieldNames;
+			this.indices = indices;
+		}
+
+		public String[] getFieldNames() {
+			return fieldNames;
+		}
+
+		public int[] getIndices() {
+			return indices;
+		}
+	}
+
+	/**
+	 * Reference input fields by name:
+	 * All fields in the schema definition are referenced by name
+	 * (and possibly renamed using an alias (as). In this mode, fields can be reordered and
+	 * projected out. Moreover, we can define proctime and rowtime attributes at arbitrary
+	 * positions using arbitrary names (except those that exist in the result schema). This mode
+	 * can be used for any input type, including POJOs.
+	 *
+	 * <p>Reference input fields by position:
+	 * In this mode, fields are simply renamed. Event-time attributes can
+	 * replace the field on their position in the input data (if it is of correct type) or be
+	 * appended at the end. Proctime attributes must be appended at the end. This mode can only be
+	 * used if the input type has a defined field order (tuple, case class, Row) and no of fields
+	 * references a field of the input type.
+	 */
+	public static boolean isReferenceByPosition(CompositeType<?> ct, Expression[] fields) {
+		if (!(ct instanceof TupleTypeInfoBase)) {
+			return false;
+		}
+
+		List<String> inputNames = Arrays.asList(ct.getFieldNames());
+
+		// Use the by-position mode if no of the fields exists in the input.
+		// This prevents confusing cases like ('f2, 'f0, 'myName) for a Tuple3 where fields are renamed
+		// by position but the user might assume reordering instead of renaming.
+		return Arrays.stream(fields).allMatch(f -> {
+			if (f instanceof UnresolvedReferenceExpression) {
+				return !inputNames.contains(((UnresolvedReferenceExpression) f).getName());
+			}
+
+			return true;
+		});
+	}
+
+	/**
+	 * Returns field names and field positions for a given {@link TypeInformation}.
+	 *
+	 * @param inputType The TypeInformation extract the field names and positions from.
+	 * @param <A> The type of the TypeInformation.
+	 * @return A tuple of two arrays holding the field names and corresponding field positions.
+	 */
+	public static <A> FieldsInfo getFieldsInfo(TypeInformation<A> inputType) {
+
+		if (inputType instanceof GenericTypeInfo && inputType.getTypeClass() == Row.class) {
+			throw new TableException(
+				"An input of GenericTypeInfo<Row> cannot be converted to Table. " +
+					"Please specify the type of the input with a RowTypeInfo.");
+		} else {
+			return new FieldsInfo(getFieldNames(inputType), getFieldIndices(inputType));
+		}
+	}
+
+	/**
+	 * Returns field names and field positions for a given {@link TypeInformation} and array of
+	 * {@link Expression}. It does not handle time attributes but considers them in indices.
+	 *
+	 * @param inputType The {@link TypeInformation} against which the {@link Expression}s are evaluated.
+	 * @param exprs     The expressions that define the field names.
+	 * @param <A> The type of the TypeInformation.
+	 * @return A tuple of two arrays holding the field names and corresponding field positions.
+	 */
+	public static <A> FieldsInfo getFieldsInfo(TypeInformation<A> inputType, Expression[] exprs) {
+		validateInputTypeInfo(inputType);
+
+		final Set<FieldInfo> fieldInfos;
+		if (inputType instanceof GenericTypeInfo && inputType.getTypeClass() == Row.class) {
+			throw new TableException(
+				"An input of GenericTypeInfo<Row> cannot be converted to Table. " +
+					"Please specify the type of the input with a RowTypeInfo.");
+		} else if (inputType instanceof TupleTypeInfoBase) {
+			fieldInfos = extractFieldInfosFromTupleType((CompositeType) inputType, exprs);
+		} else if (inputType instanceof PojoTypeInfo) {
+			fieldInfos = extractFieldInfosByNameReference((CompositeType) inputType, exprs);
+		} else {
+			fieldInfos = extractFieldInfoFromAtomicType(exprs);
+		}
+
+		if (fieldInfos.stream().anyMatch(info -> info.getFieldName().equals("*"))) {
+			throw new TableException("Field name can not be '*'.");
+		}
+
+		String[] fieldNames = fieldInfos.stream().map(FieldInfo::getFieldName).toArray(String[]::new);
+		int[] fieldIndices = fieldInfos.stream().mapToInt(FieldInfo::getIndex).toArray();
+		return new FieldsInfo(fieldNames, fieldIndices);
+	}
+
+	/**
+	 * Returns field names for a given {@link TypeInformation}.
+	 *
+	 * @param inputType The TypeInformation extract the field names.
+	 * @param <A> The type of the TypeInformation.
+	 * @return An array holding the field names
+	 */
+	public static <A> String[] getFieldNames(TypeInformation<A> inputType) {
+		validateInputTypeInfo(inputType);
+
+		final String[] fieldNames;
+		if (inputType instanceof CompositeType) {
+			fieldNames = ((CompositeType<A>) inputType).getFieldNames();
+		} else {
+			fieldNames = new String[]{ATOMIC_FIELD_NAME};
+		}
+
+		if (Arrays.asList(fieldNames).contains("*")) {
+			throw new TableException("Field name can not be '*'.");
+		}
+
+		return fieldNames;
+	}
+
+	/**
+	 * Validate if class represented by the typeInfo is static and globally accessible.
+	 *
+	 * @param typeInfo type to check
+	 * @throws TableException if type does not meet these criteria
+	 */
+	public static <A> void validateInputTypeInfo(TypeInformation<A> typeInfo) {
+		Class<A> clazz = typeInfo.getTypeClass();
+		if ((clazz.isMemberClass() && !Modifier.isStatic(clazz.getModifiers())) ||
+			!Modifier.isPublic(clazz.getModifiers()) ||
+			clazz.getCanonicalName() == null) {
+			throw new TableException(format(
+				"Class '%s' described in type information '%s' must be " +
+				"static and globally accessible.", clazz, typeInfo));
+		}
+	}
+
+	/**
+	 * Returns field indexes for a given {@link TypeInformation}.
+	 *
+	 * @param inputType The TypeInformation extract the field positions from.
+	 * @return An array holding the field positions
+	 */
+	public static int[] getFieldIndices(TypeInformation<?> inputType) {
+		return IntStream.range(0, getFieldNames(inputType).length).toArray();
+	}
+
+	/**
+	 * Returns field types for a given {@link TypeInformation}.
+	 *
+	 * @param inputType The TypeInformation to extract field types from.
+	 * @return An array holding the field types.
+	 */
+	public static TypeInformation<?>[] getFieldTypes(TypeInformation<?> inputType) {
+		validateInputTypeInfo(inputType);
+
+		final TypeInformation<?>[] fieldTypes;
+		if (inputType instanceof CompositeType) {
+			int arity = inputType.getArity();
+			CompositeType ct = (CompositeType) inputType;
+			fieldTypes = IntStream.range(0, arity).mapToObj(ct::getTypeAt).toArray(TypeInformation[]::new);
+		} else {
+			fieldTypes = new TypeInformation[]{inputType};
+		}
+
+		return fieldTypes;
+	}
+
+	public static <T> TableSchema calculateTableSchema(
+		TypeInformation<T> typeInfo,
+		int[] fieldIndexes,
+		String[] fieldNames) {
+
+		if (fieldIndexes.length != fieldNames.length) {
+			throw new TableException(String.format(
+				"Number of field names and field indexes must be equal.\n" +
+					"Number of names is %s, number of indexes is %s.\n" +
+					"List of column names: %s.\n" +
+					"List of column indexes: %s.",
+				fieldNames.length,
+				fieldIndexes.length,
+				String.join(", ", fieldNames),
+				Arrays.stream(fieldIndexes).mapToObj(Integer::toString).collect(Collectors.joining(", "))));
+		}
+
+		// check uniqueness of field names
+		Set<String> duplicatedNames = findDuplicates(fieldNames);
+		if (duplicatedNames.size() != 0) {
+
+			throw new TableException(String.format(
+				"Field names must be unique.\n" +
+					"List of duplicate fields: [%s].\n" +
+					"List of all fields: [%s].",
+				String.join(", ", duplicatedNames),
+				String.join(", ", fieldNames)));
+		}
+
+		final TypeInformation[] types;
+		long fieldIndicesCount = Arrays.stream(fieldIndexes).filter(i -> i >= 0).count();
+		if (typeInfo instanceof CompositeType) {
+			CompositeType ct = (CompositeType) typeInfo;
+			// it is ok to leave out fields
+			if (fieldIndicesCount > ct.getArity()) {
+				throw new TableException(String.format(
+					"Arity of type (%s) must not be greater than number of field names %s.",
+					Arrays.toString(ct.getFieldNames()),
+					Arrays.toString(fieldNames)));
+			}
+
+			types = Arrays.stream(fieldIndexes)
+				.mapToObj(idx -> extractTimeMarkerType(idx).orElseGet(() -> ct.getTypeAt(idx)))
+				.toArray(TypeInformation[]::new);
+		} else {
+			if (fieldIndicesCount > 1) {
+				throw new TableException(
+					"Non-composite input type may have only a single field and its index must be 0.");
+			}
+
+			types = Arrays.stream(fieldIndexes)
+				.mapToObj(idx -> extractTimeMarkerType(idx).orElse(typeInfo))
+				.toArray(TypeInformation[]::new);
+		}
+
+		return new TableSchema(fieldNames, types);
+	}
+
+	private static Optional<TypeInformation<?>> extractTimeMarkerType(int idx) {
+		switch (idx) {
+			case TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER:
+				return Optional.of(TimeIndicatorTypeInfo.ROWTIME_INDICATOR);
+			case TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER:
+				return Optional.of(TimeIndicatorTypeInfo.PROCTIME_INDICATOR);
+			case TimeIndicatorTypeInfo.ROWTIME_BATCH_MARKER:
+			case TimeIndicatorTypeInfo.PROCTIME_BATCH_MARKER:
+				return Optional.of(Types.SQL_TIMESTAMP);
+			default:
+				return Optional.empty();
+		}
+	}
+
+
+
+	/* Utility methods */
+
+	private static Set<FieldInfo> extractFieldInfoFromAtomicType(Expression[] exprs) {
+		boolean referenced = false;
+		FieldInfo fieldInfo = null;
+		for (Expression expr : exprs) {
+			if (expr instanceof UnresolvedReferenceExpression) {
+				if (referenced) {
+					throw new TableException("Only the first field can reference an atomic type.");
+				} else {
+					referenced = true;
+					fieldInfo = new FieldInfo(((UnresolvedReferenceExpression) expr).getName(), 0);
+				}
+			} else if (!isTimeAttribute(expr)) { // IGNORE Time attributes
+				throw new TableException("Field reference expression expected.");
+			}
+		}
+
+		if (fieldInfo != null) {
+			return Collections.singleton(fieldInfo);
+		}
+
+		return Collections.emptySet();
+	}
+
+	private static <A> Set<FieldInfo> extractFieldInfosByNameReference(CompositeType inputType, Expression[] exprs) {
+		ExprToFieldInfo exprToFieldInfo = new ExprToFieldInfo(inputType);
+		return Arrays.stream(exprs)
+			.map(expr -> expr.accept(exprToFieldInfo))
+			.filter(Optional::isPresent)
+			.map(Optional::get)
+			.collect(Collectors.toCollection(LinkedHashSet::new));
+	}
+
+	private static <A> Set<FieldInfo> extractFieldInfosFromTupleType(CompositeType inputType, Expression[] exprs) {
+		boolean isRefByPos = isReferenceByPosition((CompositeType<?>) inputType, exprs);
+
+		if (isRefByPos) {
+			return IntStream.range(0, exprs.length)
+				.mapToObj(idx -> exprs[idx].accept(new IndexedExprToFieldInfo(idx)))
+				.filter(Optional::isPresent)
+				.map(Optional::get)
+				.collect(Collectors.toCollection(LinkedHashSet::new));
+		} else {
+			return extractFieldInfosByNameReference(inputType, exprs);
+		}
+	}
+
+	private static class FieldInfo {
+		private final String fieldName;
+		private final int index;
+
+		FieldInfo(String fieldName, int index) {
+			this.fieldName = fieldName;
+			this.index = index;
+		}
+
+		public String getFieldName() {
+			return fieldName;
+		}
+
+		public int getIndex() {
+			return index;
+		}
+	}
+
+	private static class IndexedExprToFieldInfo extends ApiExpressionDefaultVisitor<Optional<FieldInfo>> {
+
+		private final int index;
+
+		private IndexedExprToFieldInfo(int index) {
+			this.index = index;
+		}
+
+		@Override
+		public Optional<FieldInfo> visitUnresolvedReference(UnresolvedReferenceExpression unresolvedReference) {
+			String fieldName = unresolvedReference.getName();
+			return Optional.of(new FieldInfo(fieldName, index));
+		}
+
+		@Override
+		public Optional<FieldInfo> visitCall(CallExpression call) {
+			if (call.getFunctionDefinition() == BuiltInFunctionDefinitions.AS) {
+				List<Expression> children = call.getChildren();
+				Expression origExpr = children.get(0);
+				String newName = ExpressionUtils.extractValue(children.get(1), Types.STRING)
+					.orElseThrow(() ->
+						new TableException("Alias expects string literal as new name. Got: " + children.get(1)));
+
+				if (origExpr instanceof UnresolvedReferenceExpression) {
+					throw new TableException(
+						format("Alias '%s' is not allowed if other fields are referenced by position.", newName));
+				} else if (isTimeAttribute(origExpr)) {
+					return Optional.empty();
+				}
+			} else if (isTimeAttribute(call)) {
+				return Optional.empty();
+			}
+
+			return defaultMethod(call);
+		}
+
+		@Override
+		protected Optional<FieldInfo> defaultMethod(Expression expression) {
+			throw new TableException("Field reference expression or alias on field expression expected.");
+		}
+	}
+
+	private static class ExprToFieldInfo extends ApiExpressionDefaultVisitor<Optional<FieldInfo>> {
+
+		private final CompositeType ct;
+
+		private ExprToFieldInfo(CompositeType ct) {
+			this.ct = ct;
+		}
+
+		@Override
+		public Optional<FieldInfo> visitUnresolvedReference(UnresolvedReferenceExpression unresolvedReference) {
+			String fieldName = unresolvedReference.getName();
+			return referenceByName(fieldName, ct).map(idx -> new FieldInfo(fieldName, idx));
+		}
+
+		@Override
+		public Optional<FieldInfo> visitCall(CallExpression call) {
+			if (call.getFunctionDefinition() == BuiltInFunctionDefinitions.AS) {
+				List<Expression> children = call.getChildren();
+				Expression origExpr = children.get(0);
+				String newName = ExpressionUtils.extractValue(children.get(1), Types.STRING)
+					.orElseThrow(() ->
+						new TableException("Alias expects string literal as new name. Got: " + children.get(1)));
+
+				if (origExpr instanceof UnresolvedReferenceExpression) {
+					return referenceByName(((UnresolvedReferenceExpression) origExpr).getName(), ct)
+						.map(idx -> new FieldInfo(newName, idx));
+				} else if (isTimeAttribute(origExpr)) {
+					return Optional.empty();
+				}
+			} else if (isTimeAttribute(call)) {
+				return Optional.empty();
+			}
+
+			return defaultMethod(call);
+		}
+
+		@Override
+		protected Optional<FieldInfo> defaultMethod(Expression expression) {
+			throw new TableException("Field reference expression or alias on field expression expected.");
+		}
+	}
+
+	private static boolean isTimeAttribute(Expression origExpr) {
+		return origExpr instanceof CallExpression &&
+			TIME_ATTRIBUTES.contains(((CallExpression) origExpr).getFunctionDefinition());
+	}
+
+	private static Optional<Integer> referenceByName(String name, CompositeType<?> ct) {
+		int inputIdx = ct.getFieldIndex(name);
+		if (inputIdx < 0) {
+			throw new TableException(format(
+				"%s is not a field of type %s. Expected: %s}",
+				name,
+				ct,
+				String.join(", ", ct.getFieldNames())));
+		} else {
+			return Optional.of(inputIdx);
+		}
+	}
+
+	private static <T> Set<T> findDuplicates(T[] array) {
+		Set<T> duplicates = new HashSet<>();
+		Set<T> seenElements = new HashSet<>();
+
+		for (T t : array) {
+			if (seenElements.contains(t)) {
+				duplicates.add(t);
+			} else {
+				seenElements.add(t);
+			}
+		}
+
+		return duplicates;
+	}
+
+	private FieldInfoUtils() {
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/BuiltInFunctionDefinitions.java
index a326be8..e74e129 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/BuiltInFunctionDefinitions.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/BuiltInFunctionDefinitions.java
@@ -343,6 +343,10 @@ public final class BuiltInFunctionDefinitions {
 		WINDOW_START, WINDOW_END, PROCTIME, ROWTIME
 	));
 
+	public static final Set<FunctionDefinition> TIME_ATTRIBUTES = new HashSet<>(Arrays.asList(
+		PROCTIME, ROWTIME
+	));
+
 	public static final List<FunctionDefinition> ORDERING = Arrays.asList(ORDER_ASC, ORDER_DESC);
 
 	public static List<FunctionDefinition> getDefinitions() {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/CalculatedTableFactory.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/CalculatedTableFactory.java
index 3f2953b..860b551 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/CalculatedTableFactory.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/CalculatedTableFactory.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.operations;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.table.api.TableEnvImpl$;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor;
@@ -30,6 +29,7 @@ import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionUtils;
 import org.apache.flink.table.expressions.FunctionDefinition;
 import org.apache.flink.table.expressions.TableFunctionDefinition;
+import org.apache.flink.table.typeutils.FieldInfoUtils;
 
 import java.util.Collections;
 import java.util.List;
@@ -103,7 +103,7 @@ public class CalculatedTableFactory {
 
 			String[] fieldNames;
 			if (aliasesSize == 0) {
-				fieldNames = TableEnvImpl$.MODULE$.getFieldNames(resultType);
+				fieldNames = FieldInfoUtils.getFieldNames(resultType);
 			} else if (aliasesSize != callArity) {
 				throw new ValidationException(String.format(
 					"List of column aliases must have same degree as table; " +
@@ -116,7 +116,7 @@ public class CalculatedTableFactory {
 				fieldNames = aliases.toArray(new String[aliasesSize]);
 			}
 
-			TypeInformation<?>[] fieldTypes = TableEnvImpl$.MODULE$.getFieldTypes(resultType);
+			TypeInformation<?>[] fieldTypes = FieldInfoUtils.getFieldTypes(resultType);
 
 			return new CalculatedTableOperation(
 				tableFunctionDefinition.getTableFunction(),
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvImpl.scala
index 312aed9..d0ee440 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvImpl.scala
@@ -33,7 +33,8 @@ import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
 import org.apache.flink.table.catalog.CatalogManager
 import org.apache.flink.table.descriptors.{BatchTableDescriptor, ConnectorDescriptor}
 import org.apache.flink.table.explain.PlanJsonParser
-import org.apache.flink.table.expressions.{Expression, TimeAttribute}
+import org.apache.flink.table.expressions.BuiltInFunctionDefinitions.TIME_ATTRIBUTES
+import org.apache.flink.table.expressions.{CallExpression, Expression}
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.dataset.DataSetRel
 import org.apache.flink.table.plan.rules.FlinkRuleSets
@@ -41,6 +42,7 @@ import org.apache.flink.table.plan.schema._
 import org.apache.flink.table.runtime.MapRunner
 import org.apache.flink.table.sinks._
 import org.apache.flink.table.sources.{BatchTableSource, TableSource}
+import org.apache.flink.table.typeutils.FieldInfoUtils.{getFieldsInfo, validateInputTypeInfo}
 import org.apache.flink.types.Row
 
 /**
@@ -319,11 +321,11 @@ abstract class BatchTableEnvImpl(
     */
   protected def registerDataSetInternal[T](name: String, dataSet: DataSet[T]): Unit = {
 
-    val (fieldNames, fieldIndexes) = getFieldInfo[T](dataSet.getType)
+    val fieldInfo = getFieldsInfo[T](dataSet.getType)
     val dataSetTable = new DataSetTable[T](
       dataSet,
-      fieldIndexes,
-      fieldNames
+      fieldInfo.getIndices,
+      fieldInfo.getFieldNames
     )
     registerTableInternal(name, dataSetTable)
   }
@@ -341,18 +343,19 @@ abstract class BatchTableEnvImpl(
       name: String, dataSet: DataSet[T], fields: Array[Expression]): Unit = {
 
     val inputType = dataSet.getType
-    val bridgedFields = fields.map(expressionBridge.bridge).toArray[Expression]
 
-    val (fieldNames, fieldIndexes) = getFieldInfo[T](
+    val fieldsInfo = getFieldsInfo[T](
       inputType,
-      bridgedFields)
+      fields)
 
-    if (bridgedFields.exists(_.isInstanceOf[TimeAttribute])) {
+    if (fields.exists(f =>
+      f.isInstanceOf[CallExpression] &&
+        TIME_ATTRIBUTES.contains(f.asInstanceOf[CallExpression].getFunctionDefinition))) {
       throw new ValidationException(
         ".rowtime and .proctime time indicators are not allowed in a batch environment.")
     }
 
-    val dataSetTable = new DataSetTable[T](dataSet, fieldIndexes, fieldNames)
+    val dataSetTable = new DataSetTable[T](dataSet, fieldsInfo.getIndices, fieldsInfo.getFieldNames)
     registerTableInternal(name, dataSetTable)
   }
 
@@ -416,7 +419,7 @@ abstract class BatchTableEnvImpl(
       logicalPlan: RelNode,
       logicalType: RelDataType,
       queryConfig: BatchQueryConfig)(implicit tpe: TypeInformation[A]): DataSet[A] = {
-    TableEnvImpl.validateType(tpe)
+    validateInputTypeInfo(tpe)
 
     logicalPlan match {
       case node: DataSetRel =>
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/StreamTableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/StreamTableEnvImpl.scala
index 62d44a2..40ecada 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/StreamTableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/StreamTableEnvImpl.scala
@@ -52,6 +52,7 @@ import org.apache.flink.table.runtime.{CRowMapRunner, OutputRowtimeProcessFuncti
 import org.apache.flink.table.sinks._
 import org.apache.flink.table.sources.{StreamTableSource, TableSource, TableSourceUtil}
 import org.apache.flink.table.typeutils.{TimeIndicatorTypeInfo, TypeCheckUtils}
+import org.apache.flink.table.typeutils.FieldInfoUtils.{getFieldsInfo, isReferenceByPosition}
 
 import _root_.scala.collection.JavaConverters._
 
@@ -443,11 +444,11 @@ abstract class StreamTableEnvImpl(
     name: String,
     dataStream: DataStream[T]): Unit = {
 
-    val (fieldNames, fieldIndexes) = getFieldInfo[T](dataStream.getType)
+    val fieldInfo = getFieldsInfo[T](dataStream.getType)
     val dataStreamTable = new DataStreamTable[T](
       dataStream,
-      fieldIndexes,
-      fieldNames
+      fieldInfo.getIndices,
+      fieldInfo.getFieldNames
     )
     registerTableInternal(name, dataStreamTable)
   }
@@ -468,13 +469,12 @@ abstract class StreamTableEnvImpl(
     : Unit = {
 
     val streamType = dataStream.getType
-    val bridgedFields = fields.map(expressionBridge.bridge).toArray[Expression]
 
     // get field names and types for all non-replaced fields
-    val (fieldNames, fieldIndexes) = getFieldInfo[T](streamType, bridgedFields)
+    val fieldsInfo = getFieldsInfo[T](streamType, fields)
 
     // validate and extract time attributes
-    val (rowtime, proctime) = validateAndExtractTimeAttributes(streamType, bridgedFields)
+    val (rowtime, proctime) = validateAndExtractTimeAttributes(streamType, fields)
 
     // check if event-time is enabled
     if (rowtime.isDefined && execEnv.getStreamTimeCharacteristic != TimeCharacteristic.EventTime) {
@@ -484,8 +484,8 @@ abstract class StreamTableEnvImpl(
     }
 
     // adjust field indexes and field names
-    val indexesWithIndicatorFields = adjustFieldIndexes(fieldIndexes, rowtime, proctime)
-    val namesWithIndicatorFields = adjustFieldNames(fieldNames, rowtime, proctime)
+    val indexesWithIndicatorFields = adjustFieldIndexes(fieldsInfo.getIndices, rowtime, proctime)
+    val namesWithIndicatorFields = adjustFieldNames(fieldsInfo.getFieldNames, rowtime, proctime)
 
     val dataStreamTable = new DataStreamTable[T](
       dataStream,
@@ -593,7 +593,8 @@ abstract class StreamTableEnvImpl(
       }
     }
 
-    exprs.zipWithIndex.foreach {
+    val bridgedFields = exprs.map(expressionBridge.bridge).toArray[Expression]
+    bridgedFields.zipWithIndex.foreach {
       case (RowtimeAttribute(UnresolvedFieldReference(name)), idx) =>
         extractRowtime(idx, name, None)
 
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/TableEnvImpl.scala
index 5ada652..5dd46cc 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/TableEnvImpl.scala
@@ -37,9 +37,7 @@ import org.apache.calcite.tools._
 import org.apache.flink.annotation.VisibleForTesting
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.typeutils.{RowTypeInfo, _}
-import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, PojoTypeInfo, TupleTypeInfoBase}
 import org.apache.flink.table.calcite._
 import org.apache.flink.table.catalog._
 import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction}
@@ -720,153 +718,6 @@ abstract class TableEnvImpl(
     planningConfigurationBuilder.createFlinkPlanner(currentCatalogName, currentDatabase)
   }
 
-  /**
-    * Reference input fields by name:
-    * All fields in the schema definition are referenced by name
-    * (and possibly renamed using an alias (as). In this mode, fields can be reordered and
-    * projected out. Moreover, we can define proctime and rowtime attributes at arbitrary
-    * positions using arbitrary names (except those that exist in the result schema). This mode
-    * can be used for any input type, including POJOs.
-    *
-    * Reference input fields by position:
-    * In this mode, fields are simply renamed. Event-time attributes can
-    * replace the field on their position in the input data (if it is of correct type) or be
-    * appended at the end. Proctime attributes must be appended at the end. This mode can only be
-    * used if the input type has a defined field order (tuple, case class, Row) and no of fields
-    * references a field of the input type.
-    */
-  protected def isReferenceByPosition(ct: CompositeType[_], fields: Array[Expression]): Boolean = {
-    if (!ct.isInstanceOf[TupleTypeInfoBase[_]]) {
-      return false
-    }
-
-    val inputNames = ct.getFieldNames
-
-    // Use the by-position mode if no of the fields exists in the input.
-    // This prevents confusing cases like ('f2, 'f0, 'myName) for a Tuple3 where fields are renamed
-    // by position but the user might assume reordering instead of renaming.
-    fields.forall {
-      case UnresolvedFieldReference(name) => !inputNames.contains(name)
-      case _ => true
-    }
-  }
-
-  /**
-    * Returns field names and field positions for a given [[TypeInformation]].
-    *
-    * @param inputType The TypeInformation extract the field names and positions from.
-    * @tparam A The type of the TypeInformation.
-    * @return A tuple of two arrays holding the field names and corresponding field positions.
-    */
-  protected[flink] def getFieldInfo[A](inputType: TypeInformation[A]):
-  (Array[String], Array[Int]) = {
-
-    if (inputType.isInstanceOf[GenericTypeInfo[A]] && inputType.getTypeClass == classOf[Row]) {
-      throw new TableException(
-        "An input of GenericTypeInfo<Row> cannot be converted to Table. " +
-          "Please specify the type of the input with a RowTypeInfo.")
-    } else {
-      (TableEnvImpl.getFieldNames(inputType), TableEnvImpl.getFieldIndices(inputType))
-    }
-  }
-
-  /**
-    * Returns field names and field positions for a given [[TypeInformation]] and [[Array]] of
-    * [[Expression]]. It does not handle time attributes but considers them in indices.
-    *
-    * @param inputType The [[TypeInformation]] against which the [[Expression]]s are evaluated.
-    * @param exprs     The expressions that define the field names.
-    * @tparam A The type of the TypeInformation.
-    * @return A tuple of two arrays holding the field names and corresponding field positions.
-    */
-  protected def getFieldInfo[A](
-      inputType: TypeInformation[A],
-      exprs: Array[Expression])
-    : (Array[String], Array[Int]) = {
-
-    TableEnvImpl.validateType(inputType)
-
-    def referenceByName(name: String, ct: CompositeType[_]): Option[Int] = {
-      val inputIdx = ct.getFieldIndex(name)
-      if (inputIdx < 0) {
-        throw new TableException(s"$name is not a field of type $ct. " +
-                s"Expected: ${ct.getFieldNames.mkString(", ")}")
-      } else {
-        Some(inputIdx)
-      }
-    }
-
-    val indexedNames: Array[(Int, String)] = inputType match {
-
-      case g: GenericTypeInfo[A] if g.getTypeClass == classOf[Row] =>
-        throw new TableException(
-          "An input of GenericTypeInfo<Row> cannot be converted to Table. " +
-            "Please specify the type of the input with a RowTypeInfo.")
-
-      case t: TupleTypeInfoBase[A] if t.isInstanceOf[TupleTypeInfo[A]] ||
-        t.isInstanceOf[CaseClassTypeInfo[A]] || t.isInstanceOf[RowTypeInfo] =>
-
-        // determine schema definition mode (by position or by name)
-        val isRefByPos = isReferenceByPosition(t, exprs)
-
-        exprs.zipWithIndex flatMap {
-          case (UnresolvedFieldReference(name: String), idx) =>
-            if (isRefByPos) {
-              Some((idx, name))
-            } else {
-              referenceByName(name, t).map((_, name))
-            }
-          case (Alias(UnresolvedFieldReference(origName), name: String, _), _) =>
-            if (isRefByPos) {
-              throw new TableException(
-                s"Alias '$name' is not allowed if other fields are referenced by position.")
-            } else {
-              referenceByName(origName, t).map((_, name))
-            }
-          case (_: TimeAttribute, _) | (Alias(_: TimeAttribute, _, _), _) =>
-            None
-          case _ => throw new TableException(
-            "Field reference expression or alias on field expression expected.")
-        }
-
-      case p: PojoTypeInfo[A] =>
-        exprs flatMap {
-          case UnresolvedFieldReference(name: String) =>
-            referenceByName(name, p).map((_, name))
-          case Alias(UnresolvedFieldReference(origName), name: String, _) =>
-            referenceByName(origName, p).map((_, name))
-          case _: TimeAttribute | Alias(_: TimeAttribute, _, _) =>
-            None
-          case _ => throw new TableException(
-            "Field reference expression or alias on field expression expected.")
-        }
-
-      case _: TypeInformation[_] => // atomic or other custom type information
-        var referenced = false
-        exprs flatMap {
-          case _: TimeAttribute =>
-            None
-          case UnresolvedFieldReference(_) if referenced =>
-            // only accept the first field for an atomic type
-            throw new TableException("Only the first field can reference an atomic type.")
-          case UnresolvedFieldReference(name: String) =>
-            referenced = true
-            // first field reference is mapped to atomic type
-            Some((0, name))
-          case _ => throw new TableException(
-            "Field reference expression expected.")
-        }
-    }
-
-    val (fieldIndexes, fieldNames) = indexedNames.unzip
-
-    if (fieldNames.contains("*")) {
-      throw new TableException("Field name can not be '*'.")
-    }
-
-    (fieldNames, fieldIndexes)
-  }
-
   protected def generateRowConverterFunction[OUT](
       inputTypeInfo: TypeInformation[Row],
       schema: RowSchema,
@@ -980,72 +831,3 @@ abstract class TableEnvImpl(
     Some(generated)
   }
 }
-
-/**
-  * Object to instantiate a [[TableEnvImpl]] depending on the batch or stream execution environment.
-  */
-object TableEnvImpl {
-
-  /**
-    * Returns field names for a given [[TypeInformation]].
-    *
-    * @param inputType The TypeInformation extract the field names.
-    * @tparam A The type of the TypeInformation.
-    * @return An array holding the field names
-    */
-  def getFieldNames[A](inputType: TypeInformation[A]): Array[String] = {
-    validateType(inputType)
-
-    val fieldNames: Array[String] = inputType match {
-      case t: CompositeType[_] => t.getFieldNames
-      case _: TypeInformation[_] => Array("f0")
-    }
-
-    if (fieldNames.contains("*")) {
-      throw new TableException("Field name can not be '*'.")
-    }
-
-    fieldNames
-  }
-
-  /**
-    * Validate if class represented by the typeInfo is static and globally accessible
-    * @param typeInfo type to check
-    * @throws TableException if type does not meet these criteria
-    */
-  def validateType(typeInfo: TypeInformation[_]): Unit = {
-    val clazz = typeInfo.getTypeClass
-    if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
-      !Modifier.isPublic(clazz.getModifiers) ||
-      clazz.getCanonicalName == null) {
-      throw new TableException(
-        s"Class '$clazz' described in type information '$typeInfo' must be " +
-        s"static and globally accessible.")
-    }
-  }
-
-  /**
-    * Returns field indexes for a given [[TypeInformation]].
-    *
-    * @param inputType The TypeInformation extract the field positions from.
-    * @return An array holding the field positions
-    */
-  def getFieldIndices(inputType: TypeInformation[_]): Array[Int] = {
-    getFieldNames(inputType).indices.toArray
-  }
-
-  /**
-    * Returns field types for a given [[TypeInformation]].
-    *
-    * @param inputType The TypeInformation to extract field types from.
-    * @return An array holding the field types.
-    */
-  def getFieldTypes(inputType: TypeInformation[_]): Array[TypeInformation[_]] = {
-    validateType(inputType)
-
-    inputType match {
-      case ct: CompositeType[_] => 0.until(ct.getArity).map(i => ct.getTypeAt(i)).toArray
-      case t: TypeInformation[_] => Array(t.asInstanceOf[TypeInformation[_]])
-    }
-  }
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvImpl.scala
index d876195..235370e 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvImpl.scala
@@ -17,17 +17,18 @@
  */
 package org.apache.flink.table.api.java
 
+import _root_.java.lang.{Boolean => JBool}
+
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor}
 import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
-import org.apache.flink.table.api._
-import org.apache.flink.table.functions.{AggregateFunction, TableAggregateFunction, TableFunction, UserDefinedAggregateFunction}
-import org.apache.flink.table.expressions.ExpressionParser
+import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor}
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-import _root_.java.lang.{Boolean => JBool}
-
+import org.apache.flink.table.api._
 import org.apache.flink.table.catalog.CatalogManager
+import org.apache.flink.table.expressions.ExpressionParser
+import org.apache.flink.table.functions.{AggregateFunction, TableAggregateFunction, TableFunction, UserDefinedAggregateFunction}
+import org.apache.flink.table.typeutils.FieldInfoUtils
 
 import _root_.scala.collection.JavaConverters._
 
@@ -95,7 +96,7 @@ class StreamTableEnvImpl(
       clazz: Class[T],
       queryConfig: StreamQueryConfig): DataStream[T] = {
     val typeInfo = TypeExtractor.createTypeInfo(clazz)
-    TableEnvImpl.validateType(typeInfo)
+    FieldInfoUtils.validateInputTypeInfo(typeInfo)
     translate[T](table, queryConfig, updatesAsRetraction = false, withChangeFlag = false)(typeInfo)
   }
 
@@ -103,7 +104,7 @@ class StreamTableEnvImpl(
       table: Table,
       typeInfo: TypeInformation[T],
       queryConfig: StreamQueryConfig): DataStream[T] = {
-    TableEnvImpl.validateType(typeInfo)
+    FieldInfoUtils.validateInputTypeInfo(typeInfo)
     translate[T](table, queryConfig, updatesAsRetraction = false, withChangeFlag = false)(typeInfo)
   }
 
@@ -127,7 +128,7 @@ class StreamTableEnvImpl(
       queryConfig: StreamQueryConfig): DataStream[JTuple2[JBool, T]] = {
 
     val typeInfo = TypeExtractor.createTypeInfo(clazz)
-    TableEnvImpl.validateType(typeInfo)
+    FieldInfoUtils.validateInputTypeInfo(typeInfo)
     val resultType = new TupleTypeInfo[JTuple2[JBool, T]](Types.BOOLEAN, typeInfo)
     translate[JTuple2[JBool, T]](
       table,
@@ -141,7 +142,7 @@ class StreamTableEnvImpl(
       typeInfo: TypeInformation[T],
       queryConfig: StreamQueryConfig): DataStream[JTuple2[JBool, T]] = {
 
-    TableEnvImpl.validateType(typeInfo)
+    FieldInfoUtils.validateInputTypeInfo(typeInfo)
     val resultTypeInfo = new TupleTypeInfo[JTuple2[JBool, T]](
       Types.BOOLEAN,
       typeInfo
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
index f8443b2..777aa8e 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
@@ -34,11 +34,12 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo, TypeExtractor}
 import org.apache.flink.table.api.dataview._
-import org.apache.flink.table.api.{TableEnvImpl, TableException, ValidationException}
+import org.apache.flink.table.api.{TableException, ValidationException}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.dataview._
 import org.apache.flink.table.functions._
 import org.apache.flink.table.plan.schema.FlinkTableFunctionImpl
+import org.apache.flink.table.typeutils.FieldInfoUtils
 import org.apache.flink.util.InstantiationUtil
 
 import scala.collection.mutable
@@ -699,9 +700,9 @@ object UserDefinedFunctionUtils {
   def getFieldInfo(inputType: TypeInformation[_])
     : (Array[String], Array[Int], Array[TypeInformation[_]]) = {
 
-    (TableEnvImpl.getFieldNames(inputType),
-    TableEnvImpl.getFieldIndices(inputType),
-    TableEnvImpl.getFieldTypes(inputType))
+    (FieldInfoUtils.getFieldNames(inputType),
+      FieldInfoUtils.getFieldIndices(inputType),
+      FieldInfoUtils.getFieldTypes(inputType))
   }
 
   /**