You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2019/05/31 11:45:04 UTC

[flink] branch master updated (3710bcb -> 3934a7f)

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 3710bcb  [FLINK-6962][table] Add sql parser module and support CREATE / DROP table
     new 44c2d76  [FLINK-12431][table-api-java] Port utility methods for extracting fields information from TypeInformation
     new 3934a7f  [hotfix][table-planner] Removed TableOperationConverterSupplier.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/table/typeutils/FieldInfoUtils.java      | 496 +++++++++++++++++++++
 .../expressions/BuiltInFunctionDefinitions.java    |   4 +
 .../table/operations/CalculatedTableFactory.java   |   6 +-
 .../flink/table/plan/TableOperationConverter.java  |  26 +-
 .../planner/PlanningConfigurationBuilder.java      |  11 +-
 .../apache/flink/table/api/BatchTableEnvImpl.scala |  23 +-
 .../flink/table/api/StreamTableEnvImpl.scala       |  19 +-
 .../org/apache/flink/table/api/TableEnvImpl.scala  | 220 +--------
 .../flink/table/api/java/StreamTableEnvImpl.scala  |  21 +-
 .../functions/utils/UserDefinedFunctionUtils.scala |   9 +-
 .../LogicalCorrelateToTemporalTableJoinRule.scala  |  16 +-
 11 files changed, 560 insertions(+), 291 deletions(-)
 create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java


[flink] 02/02: [hotfix][table-planner] Removed TableOperationConverterSupplier.

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3934a7f3e7abce4f2ef25391bf62a5754fcfdbcf
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Thu May 23 11:06:49 2019 +0200

    [hotfix][table-planner] Removed TableOperationConverterSupplier.
    
    Rather than passing TableOperationConverterSupplier, we just create
    FlinkRelBuilder whenever we need to convert from TableOperation to
    RelNode.
---
 .../flink/table/plan/TableOperationConverter.java  | 26 ++++------------------
 .../planner/PlanningConfigurationBuilder.java      | 11 +++------
 .../LogicalCorrelateToTemporalTableJoinRule.scala  | 16 ++++++++-----
 3 files changed, 17 insertions(+), 36 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/TableOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/TableOperationConverter.java
index 2a32fe3..565bca7 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/TableOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/TableOperationConverter.java
@@ -64,7 +64,6 @@ import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.logical.LogicalTableFunctionScan;
 import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.tools.RelBuilder;
 import org.apache.calcite.tools.RelBuilder.AggCall;
 import org.apache.calcite.tools.RelBuilder.GroupKey;
 
@@ -89,23 +88,7 @@ import static org.apache.flink.table.expressions.FunctionDefinition.Type.AGGREGA
 @Internal
 public class TableOperationConverter extends TableOperationDefaultVisitor<RelNode> {
 
-	/**
-	 * Supplier for {@link TableOperationConverter} that can wrap given {@link RelBuilder}.
-	 */
-	@Internal
-	public static class ToRelConverterSupplier {
-		private final ExpressionBridge<PlannerExpression> expressionBridge;
-
-		public ToRelConverterSupplier(ExpressionBridge<PlannerExpression> expressionBridge) {
-			this.expressionBridge = expressionBridge;
-		}
-
-		public TableOperationConverter get(RelBuilder relBuilder) {
-			return new TableOperationConverter(relBuilder, expressionBridge);
-		}
-	}
-
-	private final RelBuilder relBuilder;
+	private final FlinkRelBuilder relBuilder;
 	private final SingleRelVisitor singleRelVisitor = new SingleRelVisitor();
 	private final ExpressionBridge<PlannerExpression> expressionBridge;
 	private final AggregateVisitor aggregateVisitor = new AggregateVisitor();
@@ -113,7 +96,7 @@ public class TableOperationConverter extends TableOperationDefaultVisitor<RelNod
 	private final JoinExpressionVisitor joinExpressionVisitor = new JoinExpressionVisitor();
 
 	public TableOperationConverter(
-			RelBuilder relBuilder,
+			FlinkRelBuilder relBuilder,
 			ExpressionBridge<PlannerExpression> expressionBridge) {
 		this.relBuilder = relBuilder;
 		this.expressionBridge = expressionBridge;
@@ -148,7 +131,6 @@ public class TableOperationConverter extends TableOperationDefaultVisitor<RelNod
 
 		@Override
 		public RelNode visitWindowAggregate(WindowAggregateTableOperation windowAggregate) {
-			FlinkRelBuilder flinkRelBuilder = (FlinkRelBuilder) relBuilder;
 			List<AggCall> aggregations = windowAggregate.getAggregateExpressions()
 				.stream()
 				.map(this::getAggCall)
@@ -161,7 +143,7 @@ public class TableOperationConverter extends TableOperationDefaultVisitor<RelNod
 				.collect(toList());
 			GroupKey groupKey = relBuilder.groupKey(groupings);
 			LogicalWindow logicalWindow = toLogicalWindow(windowAggregate.getGroupWindow());
-			return flinkRelBuilder.windowAggregate(logicalWindow, groupKey, windowProperties, aggregations).build();
+			return relBuilder.windowAggregate(logicalWindow, groupKey, windowProperties, aggregations).build();
 		}
 
 		/**
@@ -237,7 +219,7 @@ public class TableOperationConverter extends TableOperationDefaultVisitor<RelNod
 				fieldNames);
 			TableFunction<?> tableFunction = calculatedTable.getTableFunction();
 
-			FlinkTypeFactory typeFactory = (FlinkTypeFactory) relBuilder.getTypeFactory();
+			FlinkTypeFactory typeFactory = relBuilder.getTypeFactory();
 			TableSqlFunction sqlFunction = new TableSqlFunction(
 				tableFunction.functionIdentifier(),
 				tableFunction.toString(),
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/PlanningConfigurationBuilder.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/PlanningConfigurationBuilder.java
index 7c2d79a..09a55f5 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/PlanningConfigurationBuilder.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/PlanningConfigurationBuilder.java
@@ -31,7 +31,6 @@ import org.apache.flink.table.catalog.CatalogReader;
 import org.apache.flink.table.codegen.ExpressionReducer;
 import org.apache.flink.table.expressions.ExpressionBridge;
 import org.apache.flink.table.expressions.PlannerExpression;
-import org.apache.flink.table.plan.TableOperationConverter;
 import org.apache.flink.table.plan.cost.DataSetCostFactory;
 import org.apache.flink.table.util.JavaScalaConversionUtil;
 import org.apache.flink.table.validate.FunctionCatalog;
@@ -84,10 +83,9 @@ public class PlanningConfigurationBuilder {
 		this.tableConfig = tableConfig;
 		this.functionCatalog = functionCatalog;
 
-		// create context instances with Flink type factory
-		this.context = Contexts.of(
-			new TableOperationConverter.ToRelConverterSupplier(expressionBridge)
-		);
+		// the converter is needed when calling temporal table functions from SQL, because
+		// they reference a history table represented with a tree of table operations
+		this.context = Contexts.of(expressionBridge);
 
 		this.planner = new VolcanoPlanner(costFactory, context);
 		planner.setExecutor(new ExpressionReducer(tableConfig));
@@ -193,9 +191,6 @@ public class PlanningConfigurationBuilder {
 				getSqlToRelConverterConfig(
 					calciteConfig(tableConfig),
 					expressionBridge))
-			// the converter is needed when calling temporal table functions from SQL, because
-			// they reference a history table represented with a tree of table operations
-			.context(context)
 			// set the executor to evaluate constant expressions
 			.executor(new ExpressionReducer(tableConfig))
 			.build();
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToTemporalTableJoinRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToTemporalTableJoinRule.scala
index f4a6699..0b07f47 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToTemporalTableJoinRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToTemporalTableJoinRule.scala
@@ -34,6 +34,7 @@ import org.apache.flink.table.plan.TableOperationConverter
 import org.apache.flink.table.plan.logical.rel.LogicalTemporalTableJoin
 import org.apache.flink.table.plan.util.RexDefaultVisitor
 import org.apache.flink.util.Preconditions.checkState
+import org.apache.flink.table.calcite.FlinkRelBuilder
 
 class LogicalCorrelateToTemporalTableJoinRule
   extends RelOptRule(
@@ -82,14 +83,17 @@ class LogicalCorrelateToTemporalTableJoinRule
         // If TemporalTableFunction was found, rewrite LogicalCorrelate to TemporalJoin
         val underlyingHistoryTable: TableOperation = rightTemporalTableFunction
           .getUnderlyingHistoryTable
-        val relBuilder = this.relBuilderFactory.create(
-          cluster,
-          leftNode.getTable.getRelOptSchema)
         val rexBuilder = cluster.getRexBuilder
 
-        val converter = call.getPlanner.getContext
-          .unwrap(classOf[TableOperationConverter.ToRelConverterSupplier]).get(relBuilder)
-        val rightNode: RelNode = underlyingHistoryTable.accept(converter)
+        val expressionBridge = call.getPlanner.getContext
+          .unwrap(classOf[ExpressionBridge[PlannerExpression]])
+
+        val relBuilder = new FlinkRelBuilder(call.getPlanner.getContext,
+          cluster,
+          leftNode.getTable.getRelOptSchema,
+          expressionBridge)
+
+        val rightNode: RelNode = relBuilder.tableOperation(underlyingHistoryTable).build()
 
         val rightTimeIndicatorExpression = createRightExpression(
           rexBuilder,


[flink] 01/02: [FLINK-12431][table-api-java] Port utility methods for extracting fields information from TypeInformation

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 44c2d7632577b7eeb70ceae73d122be00a18de44
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Tue May 7 12:38:58 2019 +0200

    [FLINK-12431][table-api-java] Port utility methods for extracting fields information from TypeInformation
---
 .../flink/table/typeutils/FieldInfoUtils.java      | 496 +++++++++++++++++++++
 .../expressions/BuiltInFunctionDefinitions.java    |   4 +
 .../table/operations/CalculatedTableFactory.java   |   6 +-
 .../apache/flink/table/api/BatchTableEnvImpl.scala |  23 +-
 .../flink/table/api/StreamTableEnvImpl.scala       |  19 +-
 .../org/apache/flink/table/api/TableEnvImpl.scala  | 220 +--------
 .../flink/table/api/java/StreamTableEnvImpl.scala  |  21 +-
 .../functions/utils/UserDefinedFunctionUtils.scala |   9 +-
 8 files changed, 543 insertions(+), 255 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java
new file mode 100644
index 0000000..a0a3cc6
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java
@@ -0,0 +1,496 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionUtils;
+import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.types.Row;
+
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static java.lang.String.format;
+import static org.apache.flink.table.expressions.BuiltInFunctionDefinitions.TIME_ATTRIBUTES;
+
+/**
+ * Utility classes for extracting names and indices of fields from different {@link TypeInformation}s.
+ */
+public class FieldInfoUtils {
+
+	private static final String ATOMIC_FIELD_NAME = "f0";
+
+	/**
+	 * Describes extracted fields and corresponding indices from a {@link TypeInformation}.
+	 */
+	public static class FieldsInfo {
+		private final String[] fieldNames;
+		private final int[] indices;
+
+		FieldsInfo(String[] fieldNames, int[] indices) {
+			this.fieldNames = fieldNames;
+			this.indices = indices;
+		}
+
+		public String[] getFieldNames() {
+			return fieldNames;
+		}
+
+		public int[] getIndices() {
+			return indices;
+		}
+	}
+
+	/**
+	 * Reference input fields by name:
+	 * All fields in the schema definition are referenced by name
+	 * (and possibly renamed using an alias (as). In this mode, fields can be reordered and
+	 * projected out. Moreover, we can define proctime and rowtime attributes at arbitrary
+	 * positions using arbitrary names (except those that exist in the result schema). This mode
+	 * can be used for any input type, including POJOs.
+	 *
+	 * <p>Reference input fields by position:
+	 * In this mode, fields are simply renamed. Event-time attributes can
+	 * replace the field on their position in the input data (if it is of correct type) or be
+	 * appended at the end. Proctime attributes must be appended at the end. This mode can only be
+	 * used if the input type has a defined field order (tuple, case class, Row) and no of fields
+	 * references a field of the input type.
+	 */
+	public static boolean isReferenceByPosition(CompositeType<?> ct, Expression[] fields) {
+		if (!(ct instanceof TupleTypeInfoBase)) {
+			return false;
+		}
+
+		List<String> inputNames = Arrays.asList(ct.getFieldNames());
+
+		// Use the by-position mode if no of the fields exists in the input.
+		// This prevents confusing cases like ('f2, 'f0, 'myName) for a Tuple3 where fields are renamed
+		// by position but the user might assume reordering instead of renaming.
+		return Arrays.stream(fields).allMatch(f -> {
+			if (f instanceof UnresolvedReferenceExpression) {
+				return !inputNames.contains(((UnresolvedReferenceExpression) f).getName());
+			}
+
+			return true;
+		});
+	}
+
+	/**
+	 * Returns field names and field positions for a given {@link TypeInformation}.
+	 *
+	 * @param inputType The TypeInformation extract the field names and positions from.
+	 * @param <A> The type of the TypeInformation.
+	 * @return A tuple of two arrays holding the field names and corresponding field positions.
+	 */
+	public static <A> FieldsInfo getFieldsInfo(TypeInformation<A> inputType) {
+
+		if (inputType instanceof GenericTypeInfo && inputType.getTypeClass() == Row.class) {
+			throw new TableException(
+				"An input of GenericTypeInfo<Row> cannot be converted to Table. " +
+					"Please specify the type of the input with a RowTypeInfo.");
+		} else {
+			return new FieldsInfo(getFieldNames(inputType), getFieldIndices(inputType));
+		}
+	}
+
+	/**
+	 * Returns field names and field positions for a given {@link TypeInformation} and array of
+	 * {@link Expression}. It does not handle time attributes but considers them in indices.
+	 *
+	 * @param inputType The {@link TypeInformation} against which the {@link Expression}s are evaluated.
+	 * @param exprs     The expressions that define the field names.
+	 * @param <A> The type of the TypeInformation.
+	 * @return A tuple of two arrays holding the field names and corresponding field positions.
+	 */
+	public static <A> FieldsInfo getFieldsInfo(TypeInformation<A> inputType, Expression[] exprs) {
+		validateInputTypeInfo(inputType);
+
+		final Set<FieldInfo> fieldInfos;
+		if (inputType instanceof GenericTypeInfo && inputType.getTypeClass() == Row.class) {
+			throw new TableException(
+				"An input of GenericTypeInfo<Row> cannot be converted to Table. " +
+					"Please specify the type of the input with a RowTypeInfo.");
+		} else if (inputType instanceof TupleTypeInfoBase) {
+			fieldInfos = extractFieldInfosFromTupleType((CompositeType) inputType, exprs);
+		} else if (inputType instanceof PojoTypeInfo) {
+			fieldInfos = extractFieldInfosByNameReference((CompositeType) inputType, exprs);
+		} else {
+			fieldInfos = extractFieldInfoFromAtomicType(exprs);
+		}
+
+		if (fieldInfos.stream().anyMatch(info -> info.getFieldName().equals("*"))) {
+			throw new TableException("Field name can not be '*'.");
+		}
+
+		String[] fieldNames = fieldInfos.stream().map(FieldInfo::getFieldName).toArray(String[]::new);
+		int[] fieldIndices = fieldInfos.stream().mapToInt(FieldInfo::getIndex).toArray();
+		return new FieldsInfo(fieldNames, fieldIndices);
+	}
+
+	/**
+	 * Returns field names for a given {@link TypeInformation}.
+	 *
+	 * @param inputType The TypeInformation extract the field names.
+	 * @param <A> The type of the TypeInformation.
+	 * @return An array holding the field names
+	 */
+	public static <A> String[] getFieldNames(TypeInformation<A> inputType) {
+		validateInputTypeInfo(inputType);
+
+		final String[] fieldNames;
+		if (inputType instanceof CompositeType) {
+			fieldNames = ((CompositeType<A>) inputType).getFieldNames();
+		} else {
+			fieldNames = new String[]{ATOMIC_FIELD_NAME};
+		}
+
+		if (Arrays.asList(fieldNames).contains("*")) {
+			throw new TableException("Field name can not be '*'.");
+		}
+
+		return fieldNames;
+	}
+
+	/**
+	 * Validate if class represented by the typeInfo is static and globally accessible.
+	 *
+	 * @param typeInfo type to check
+	 * @throws TableException if type does not meet these criteria
+	 */
+	public static <A> void validateInputTypeInfo(TypeInformation<A> typeInfo) {
+		Class<A> clazz = typeInfo.getTypeClass();
+		if ((clazz.isMemberClass() && !Modifier.isStatic(clazz.getModifiers())) ||
+			!Modifier.isPublic(clazz.getModifiers()) ||
+			clazz.getCanonicalName() == null) {
+			throw new TableException(format(
+				"Class '%s' described in type information '%s' must be " +
+				"static and globally accessible.", clazz, typeInfo));
+		}
+	}
+
+	/**
+	 * Returns field indexes for a given {@link TypeInformation}.
+	 *
+	 * @param inputType The TypeInformation extract the field positions from.
+	 * @return An array holding the field positions
+	 */
+	public static int[] getFieldIndices(TypeInformation<?> inputType) {
+		return IntStream.range(0, getFieldNames(inputType).length).toArray();
+	}
+
+	/**
+	 * Returns field types for a given {@link TypeInformation}.
+	 *
+	 * @param inputType The TypeInformation to extract field types from.
+	 * @return An array holding the field types.
+	 */
+	public static TypeInformation<?>[] getFieldTypes(TypeInformation<?> inputType) {
+		validateInputTypeInfo(inputType);
+
+		final TypeInformation<?>[] fieldTypes;
+		if (inputType instanceof CompositeType) {
+			int arity = inputType.getArity();
+			CompositeType ct = (CompositeType) inputType;
+			fieldTypes = IntStream.range(0, arity).mapToObj(ct::getTypeAt).toArray(TypeInformation[]::new);
+		} else {
+			fieldTypes = new TypeInformation[]{inputType};
+		}
+
+		return fieldTypes;
+	}
+
+	public static <T> TableSchema calculateTableSchema(
+		TypeInformation<T> typeInfo,
+		int[] fieldIndexes,
+		String[] fieldNames) {
+
+		if (fieldIndexes.length != fieldNames.length) {
+			throw new TableException(String.format(
+				"Number of field names and field indexes must be equal.\n" +
+					"Number of names is %s, number of indexes is %s.\n" +
+					"List of column names: %s.\n" +
+					"List of column indexes: %s.",
+				fieldNames.length,
+				fieldIndexes.length,
+				String.join(", ", fieldNames),
+				Arrays.stream(fieldIndexes).mapToObj(Integer::toString).collect(Collectors.joining(", "))));
+		}
+
+		// check uniqueness of field names
+		Set<String> duplicatedNames = findDuplicates(fieldNames);
+		if (duplicatedNames.size() != 0) {
+
+			throw new TableException(String.format(
+				"Field names must be unique.\n" +
+					"List of duplicate fields: [%s].\n" +
+					"List of all fields: [%s].",
+				String.join(", ", duplicatedNames),
+				String.join(", ", fieldNames)));
+		}
+
+		final TypeInformation[] types;
+		long fieldIndicesCount = Arrays.stream(fieldIndexes).filter(i -> i >= 0).count();
+		if (typeInfo instanceof CompositeType) {
+			CompositeType ct = (CompositeType) typeInfo;
+			// it is ok to leave out fields
+			if (fieldIndicesCount > ct.getArity()) {
+				throw new TableException(String.format(
+					"Arity of type (%s) must not be greater than number of field names %s.",
+					Arrays.toString(ct.getFieldNames()),
+					Arrays.toString(fieldNames)));
+			}
+
+			types = Arrays.stream(fieldIndexes)
+				.mapToObj(idx -> extractTimeMarkerType(idx).orElseGet(() -> ct.getTypeAt(idx)))
+				.toArray(TypeInformation[]::new);
+		} else {
+			if (fieldIndicesCount > 1) {
+				throw new TableException(
+					"Non-composite input type may have only a single field and its index must be 0.");
+			}
+
+			types = Arrays.stream(fieldIndexes)
+				.mapToObj(idx -> extractTimeMarkerType(idx).orElse(typeInfo))
+				.toArray(TypeInformation[]::new);
+		}
+
+		return new TableSchema(fieldNames, types);
+	}
+
+	private static Optional<TypeInformation<?>> extractTimeMarkerType(int idx) {
+		switch (idx) {
+			case TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER:
+				return Optional.of(TimeIndicatorTypeInfo.ROWTIME_INDICATOR);
+			case TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER:
+				return Optional.of(TimeIndicatorTypeInfo.PROCTIME_INDICATOR);
+			case TimeIndicatorTypeInfo.ROWTIME_BATCH_MARKER:
+			case TimeIndicatorTypeInfo.PROCTIME_BATCH_MARKER:
+				return Optional.of(Types.SQL_TIMESTAMP);
+			default:
+				return Optional.empty();
+		}
+	}
+
+
+
+	/* Utility methods */
+
+	private static Set<FieldInfo> extractFieldInfoFromAtomicType(Expression[] exprs) {
+		boolean referenced = false;
+		FieldInfo fieldInfo = null;
+		for (Expression expr : exprs) {
+			if (expr instanceof UnresolvedReferenceExpression) {
+				if (referenced) {
+					throw new TableException("Only the first field can reference an atomic type.");
+				} else {
+					referenced = true;
+					fieldInfo = new FieldInfo(((UnresolvedReferenceExpression) expr).getName(), 0);
+				}
+			} else if (!isTimeAttribute(expr)) { // IGNORE Time attributes
+				throw new TableException("Field reference expression expected.");
+			}
+		}
+
+		if (fieldInfo != null) {
+			return Collections.singleton(fieldInfo);
+		}
+
+		return Collections.emptySet();
+	}
+
+	private static <A> Set<FieldInfo> extractFieldInfosByNameReference(CompositeType inputType, Expression[] exprs) {
+		ExprToFieldInfo exprToFieldInfo = new ExprToFieldInfo(inputType);
+		return Arrays.stream(exprs)
+			.map(expr -> expr.accept(exprToFieldInfo))
+			.filter(Optional::isPresent)
+			.map(Optional::get)
+			.collect(Collectors.toCollection(LinkedHashSet::new));
+	}
+
+	private static <A> Set<FieldInfo> extractFieldInfosFromTupleType(CompositeType inputType, Expression[] exprs) {
+		boolean isRefByPos = isReferenceByPosition((CompositeType<?>) inputType, exprs);
+
+		if (isRefByPos) {
+			return IntStream.range(0, exprs.length)
+				.mapToObj(idx -> exprs[idx].accept(new IndexedExprToFieldInfo(idx)))
+				.filter(Optional::isPresent)
+				.map(Optional::get)
+				.collect(Collectors.toCollection(LinkedHashSet::new));
+		} else {
+			return extractFieldInfosByNameReference(inputType, exprs);
+		}
+	}
+
+	private static class FieldInfo {
+		private final String fieldName;
+		private final int index;
+
+		FieldInfo(String fieldName, int index) {
+			this.fieldName = fieldName;
+			this.index = index;
+		}
+
+		public String getFieldName() {
+			return fieldName;
+		}
+
+		public int getIndex() {
+			return index;
+		}
+	}
+
+	private static class IndexedExprToFieldInfo extends ApiExpressionDefaultVisitor<Optional<FieldInfo>> {
+
+		private final int index;
+
+		private IndexedExprToFieldInfo(int index) {
+			this.index = index;
+		}
+
+		@Override
+		public Optional<FieldInfo> visitUnresolvedReference(UnresolvedReferenceExpression unresolvedReference) {
+			String fieldName = unresolvedReference.getName();
+			return Optional.of(new FieldInfo(fieldName, index));
+		}
+
+		@Override
+		public Optional<FieldInfo> visitCall(CallExpression call) {
+			if (call.getFunctionDefinition() == BuiltInFunctionDefinitions.AS) {
+				List<Expression> children = call.getChildren();
+				Expression origExpr = children.get(0);
+				String newName = ExpressionUtils.extractValue(children.get(1), Types.STRING)
+					.orElseThrow(() ->
+						new TableException("Alias expects string literal as new name. Got: " + children.get(1)));
+
+				if (origExpr instanceof UnresolvedReferenceExpression) {
+					throw new TableException(
+						format("Alias '%s' is not allowed if other fields are referenced by position.", newName));
+				} else if (isTimeAttribute(origExpr)) {
+					return Optional.empty();
+				}
+			} else if (isTimeAttribute(call)) {
+				return Optional.empty();
+			}
+
+			return defaultMethod(call);
+		}
+
+		@Override
+		protected Optional<FieldInfo> defaultMethod(Expression expression) {
+			throw new TableException("Field reference expression or alias on field expression expected.");
+		}
+	}
+
+	private static class ExprToFieldInfo extends ApiExpressionDefaultVisitor<Optional<FieldInfo>> {
+
+		private final CompositeType ct;
+
+		private ExprToFieldInfo(CompositeType ct) {
+			this.ct = ct;
+		}
+
+		@Override
+		public Optional<FieldInfo> visitUnresolvedReference(UnresolvedReferenceExpression unresolvedReference) {
+			String fieldName = unresolvedReference.getName();
+			return referenceByName(fieldName, ct).map(idx -> new FieldInfo(fieldName, idx));
+		}
+
+		@Override
+		public Optional<FieldInfo> visitCall(CallExpression call) {
+			if (call.getFunctionDefinition() == BuiltInFunctionDefinitions.AS) {
+				List<Expression> children = call.getChildren();
+				Expression origExpr = children.get(0);
+				String newName = ExpressionUtils.extractValue(children.get(1), Types.STRING)
+					.orElseThrow(() ->
+						new TableException("Alias expects string literal as new name. Got: " + children.get(1)));
+
+				if (origExpr instanceof UnresolvedReferenceExpression) {
+					return referenceByName(((UnresolvedReferenceExpression) origExpr).getName(), ct)
+						.map(idx -> new FieldInfo(newName, idx));
+				} else if (isTimeAttribute(origExpr)) {
+					return Optional.empty();
+				}
+			} else if (isTimeAttribute(call)) {
+				return Optional.empty();
+			}
+
+			return defaultMethod(call);
+		}
+
+		@Override
+		protected Optional<FieldInfo> defaultMethod(Expression expression) {
+			throw new TableException("Field reference expression or alias on field expression expected.");
+		}
+	}
+
+	private static boolean isTimeAttribute(Expression origExpr) {
+		return origExpr instanceof CallExpression &&
+			TIME_ATTRIBUTES.contains(((CallExpression) origExpr).getFunctionDefinition());
+	}
+
+	private static Optional<Integer> referenceByName(String name, CompositeType<?> ct) {
+		int inputIdx = ct.getFieldIndex(name);
+		if (inputIdx < 0) {
+			throw new TableException(format(
+				"%s is not a field of type %s. Expected: %s}",
+				name,
+				ct,
+				String.join(", ", ct.getFieldNames())));
+		} else {
+			return Optional.of(inputIdx);
+		}
+	}
+
+	private static <T> Set<T> findDuplicates(T[] array) {
+		Set<T> duplicates = new HashSet<>();
+		Set<T> seenElements = new HashSet<>();
+
+		for (T t : array) {
+			if (seenElements.contains(t)) {
+				duplicates.add(t);
+			} else {
+				seenElements.add(t);
+			}
+		}
+
+		return duplicates;
+	}
+
+	private FieldInfoUtils() {
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/BuiltInFunctionDefinitions.java
index a326be8..e74e129 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/BuiltInFunctionDefinitions.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/BuiltInFunctionDefinitions.java
@@ -343,6 +343,10 @@ public final class BuiltInFunctionDefinitions {
 		WINDOW_START, WINDOW_END, PROCTIME, ROWTIME
 	));
 
+	public static final Set<FunctionDefinition> TIME_ATTRIBUTES = new HashSet<>(Arrays.asList(
+		PROCTIME, ROWTIME
+	));
+
 	public static final List<FunctionDefinition> ORDERING = Arrays.asList(ORDER_ASC, ORDER_DESC);
 
 	public static List<FunctionDefinition> getDefinitions() {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/CalculatedTableFactory.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/CalculatedTableFactory.java
index 3f2953b..860b551 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/CalculatedTableFactory.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/CalculatedTableFactory.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.operations;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.table.api.TableEnvImpl$;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor;
@@ -30,6 +29,7 @@ import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionUtils;
 import org.apache.flink.table.expressions.FunctionDefinition;
 import org.apache.flink.table.expressions.TableFunctionDefinition;
+import org.apache.flink.table.typeutils.FieldInfoUtils;
 
 import java.util.Collections;
 import java.util.List;
@@ -103,7 +103,7 @@ public class CalculatedTableFactory {
 
 			String[] fieldNames;
 			if (aliasesSize == 0) {
-				fieldNames = TableEnvImpl$.MODULE$.getFieldNames(resultType);
+				fieldNames = FieldInfoUtils.getFieldNames(resultType);
 			} else if (aliasesSize != callArity) {
 				throw new ValidationException(String.format(
 					"List of column aliases must have same degree as table; " +
@@ -116,7 +116,7 @@ public class CalculatedTableFactory {
 				fieldNames = aliases.toArray(new String[aliasesSize]);
 			}
 
-			TypeInformation<?>[] fieldTypes = TableEnvImpl$.MODULE$.getFieldTypes(resultType);
+			TypeInformation<?>[] fieldTypes = FieldInfoUtils.getFieldTypes(resultType);
 
 			return new CalculatedTableOperation(
 				tableFunctionDefinition.getTableFunction(),
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvImpl.scala
index 312aed9..d0ee440 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvImpl.scala
@@ -33,7 +33,8 @@ import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
 import org.apache.flink.table.catalog.CatalogManager
 import org.apache.flink.table.descriptors.{BatchTableDescriptor, ConnectorDescriptor}
 import org.apache.flink.table.explain.PlanJsonParser
-import org.apache.flink.table.expressions.{Expression, TimeAttribute}
+import org.apache.flink.table.expressions.BuiltInFunctionDefinitions.TIME_ATTRIBUTES
+import org.apache.flink.table.expressions.{CallExpression, Expression}
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.dataset.DataSetRel
 import org.apache.flink.table.plan.rules.FlinkRuleSets
@@ -41,6 +42,7 @@ import org.apache.flink.table.plan.schema._
 import org.apache.flink.table.runtime.MapRunner
 import org.apache.flink.table.sinks._
 import org.apache.flink.table.sources.{BatchTableSource, TableSource}
+import org.apache.flink.table.typeutils.FieldInfoUtils.{getFieldsInfo, validateInputTypeInfo}
 import org.apache.flink.types.Row
 
 /**
@@ -319,11 +321,11 @@ abstract class BatchTableEnvImpl(
     */
   protected def registerDataSetInternal[T](name: String, dataSet: DataSet[T]): Unit = {
 
-    val (fieldNames, fieldIndexes) = getFieldInfo[T](dataSet.getType)
+    val fieldInfo = getFieldsInfo[T](dataSet.getType)
     val dataSetTable = new DataSetTable[T](
       dataSet,
-      fieldIndexes,
-      fieldNames
+      fieldInfo.getIndices,
+      fieldInfo.getFieldNames
     )
     registerTableInternal(name, dataSetTable)
   }
@@ -341,18 +343,19 @@ abstract class BatchTableEnvImpl(
       name: String, dataSet: DataSet[T], fields: Array[Expression]): Unit = {
 
     val inputType = dataSet.getType
-    val bridgedFields = fields.map(expressionBridge.bridge).toArray[Expression]
 
-    val (fieldNames, fieldIndexes) = getFieldInfo[T](
+    val fieldsInfo = getFieldsInfo[T](
       inputType,
-      bridgedFields)
+      fields)
 
-    if (bridgedFields.exists(_.isInstanceOf[TimeAttribute])) {
+    if (fields.exists(f =>
+      f.isInstanceOf[CallExpression] &&
+        TIME_ATTRIBUTES.contains(f.asInstanceOf[CallExpression].getFunctionDefinition))) {
       throw new ValidationException(
         ".rowtime and .proctime time indicators are not allowed in a batch environment.")
     }
 
-    val dataSetTable = new DataSetTable[T](dataSet, fieldIndexes, fieldNames)
+    val dataSetTable = new DataSetTable[T](dataSet, fieldsInfo.getIndices, fieldsInfo.getFieldNames)
     registerTableInternal(name, dataSetTable)
   }
 
@@ -416,7 +419,7 @@ abstract class BatchTableEnvImpl(
       logicalPlan: RelNode,
       logicalType: RelDataType,
       queryConfig: BatchQueryConfig)(implicit tpe: TypeInformation[A]): DataSet[A] = {
-    TableEnvImpl.validateType(tpe)
+    validateInputTypeInfo(tpe)
 
     logicalPlan match {
       case node: DataSetRel =>
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/StreamTableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/StreamTableEnvImpl.scala
index 62d44a2..40ecada 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/StreamTableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/StreamTableEnvImpl.scala
@@ -52,6 +52,7 @@ import org.apache.flink.table.runtime.{CRowMapRunner, OutputRowtimeProcessFuncti
 import org.apache.flink.table.sinks._
 import org.apache.flink.table.sources.{StreamTableSource, TableSource, TableSourceUtil}
 import org.apache.flink.table.typeutils.{TimeIndicatorTypeInfo, TypeCheckUtils}
+import org.apache.flink.table.typeutils.FieldInfoUtils.{getFieldsInfo, isReferenceByPosition}
 
 import _root_.scala.collection.JavaConverters._
 
@@ -443,11 +444,11 @@ abstract class StreamTableEnvImpl(
     name: String,
     dataStream: DataStream[T]): Unit = {
 
-    val (fieldNames, fieldIndexes) = getFieldInfo[T](dataStream.getType)
+    val fieldInfo = getFieldsInfo[T](dataStream.getType)
     val dataStreamTable = new DataStreamTable[T](
       dataStream,
-      fieldIndexes,
-      fieldNames
+      fieldInfo.getIndices,
+      fieldInfo.getFieldNames
     )
     registerTableInternal(name, dataStreamTable)
   }
@@ -468,13 +469,12 @@ abstract class StreamTableEnvImpl(
     : Unit = {
 
     val streamType = dataStream.getType
-    val bridgedFields = fields.map(expressionBridge.bridge).toArray[Expression]
 
     // get field names and types for all non-replaced fields
-    val (fieldNames, fieldIndexes) = getFieldInfo[T](streamType, bridgedFields)
+    val fieldsInfo = getFieldsInfo[T](streamType, fields)
 
     // validate and extract time attributes
-    val (rowtime, proctime) = validateAndExtractTimeAttributes(streamType, bridgedFields)
+    val (rowtime, proctime) = validateAndExtractTimeAttributes(streamType, fields)
 
     // check if event-time is enabled
     if (rowtime.isDefined && execEnv.getStreamTimeCharacteristic != TimeCharacteristic.EventTime) {
@@ -484,8 +484,8 @@ abstract class StreamTableEnvImpl(
     }
 
     // adjust field indexes and field names
-    val indexesWithIndicatorFields = adjustFieldIndexes(fieldIndexes, rowtime, proctime)
-    val namesWithIndicatorFields = adjustFieldNames(fieldNames, rowtime, proctime)
+    val indexesWithIndicatorFields = adjustFieldIndexes(fieldsInfo.getIndices, rowtime, proctime)
+    val namesWithIndicatorFields = adjustFieldNames(fieldsInfo.getFieldNames, rowtime, proctime)
 
     val dataStreamTable = new DataStreamTable[T](
       dataStream,
@@ -593,7 +593,8 @@ abstract class StreamTableEnvImpl(
       }
     }
 
-    exprs.zipWithIndex.foreach {
+    val bridgedFields = exprs.map(expressionBridge.bridge).toArray[Expression]
+    bridgedFields.zipWithIndex.foreach {
       case (RowtimeAttribute(UnresolvedFieldReference(name)), idx) =>
         extractRowtime(idx, name, None)
 
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/TableEnvImpl.scala
index 5ada652..5dd46cc 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/TableEnvImpl.scala
@@ -37,9 +37,7 @@ import org.apache.calcite.tools._
 import org.apache.flink.annotation.VisibleForTesting
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.typeutils.{RowTypeInfo, _}
-import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, PojoTypeInfo, TupleTypeInfoBase}
 import org.apache.flink.table.calcite._
 import org.apache.flink.table.catalog._
 import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction}
@@ -720,153 +718,6 @@ abstract class TableEnvImpl(
     planningConfigurationBuilder.createFlinkPlanner(currentCatalogName, currentDatabase)
   }
 
-  /**
-    * Reference input fields by name:
-    * All fields in the schema definition are referenced by name
-    * (and possibly renamed using an alias (as). In this mode, fields can be reordered and
-    * projected out. Moreover, we can define proctime and rowtime attributes at arbitrary
-    * positions using arbitrary names (except those that exist in the result schema). This mode
-    * can be used for any input type, including POJOs.
-    *
-    * Reference input fields by position:
-    * In this mode, fields are simply renamed. Event-time attributes can
-    * replace the field on their position in the input data (if it is of correct type) or be
-    * appended at the end. Proctime attributes must be appended at the end. This mode can only be
-    * used if the input type has a defined field order (tuple, case class, Row) and no of fields
-    * references a field of the input type.
-    */
-  protected def isReferenceByPosition(ct: CompositeType[_], fields: Array[Expression]): Boolean = {
-    if (!ct.isInstanceOf[TupleTypeInfoBase[_]]) {
-      return false
-    }
-
-    val inputNames = ct.getFieldNames
-
-    // Use the by-position mode if no of the fields exists in the input.
-    // This prevents confusing cases like ('f2, 'f0, 'myName) for a Tuple3 where fields are renamed
-    // by position but the user might assume reordering instead of renaming.
-    fields.forall {
-      case UnresolvedFieldReference(name) => !inputNames.contains(name)
-      case _ => true
-    }
-  }
-
-  /**
-    * Returns field names and field positions for a given [[TypeInformation]].
-    *
-    * @param inputType The TypeInformation extract the field names and positions from.
-    * @tparam A The type of the TypeInformation.
-    * @return A tuple of two arrays holding the field names and corresponding field positions.
-    */
-  protected[flink] def getFieldInfo[A](inputType: TypeInformation[A]):
-  (Array[String], Array[Int]) = {
-
-    if (inputType.isInstanceOf[GenericTypeInfo[A]] && inputType.getTypeClass == classOf[Row]) {
-      throw new TableException(
-        "An input of GenericTypeInfo<Row> cannot be converted to Table. " +
-          "Please specify the type of the input with a RowTypeInfo.")
-    } else {
-      (TableEnvImpl.getFieldNames(inputType), TableEnvImpl.getFieldIndices(inputType))
-    }
-  }
-
-  /**
-    * Returns field names and field positions for a given [[TypeInformation]] and [[Array]] of
-    * [[Expression]]. It does not handle time attributes but considers them in indices.
-    *
-    * @param inputType The [[TypeInformation]] against which the [[Expression]]s are evaluated.
-    * @param exprs     The expressions that define the field names.
-    * @tparam A The type of the TypeInformation.
-    * @return A tuple of two arrays holding the field names and corresponding field positions.
-    */
-  protected def getFieldInfo[A](
-      inputType: TypeInformation[A],
-      exprs: Array[Expression])
-    : (Array[String], Array[Int]) = {
-
-    TableEnvImpl.validateType(inputType)
-
-    def referenceByName(name: String, ct: CompositeType[_]): Option[Int] = {
-      val inputIdx = ct.getFieldIndex(name)
-      if (inputIdx < 0) {
-        throw new TableException(s"$name is not a field of type $ct. " +
-                s"Expected: ${ct.getFieldNames.mkString(", ")}")
-      } else {
-        Some(inputIdx)
-      }
-    }
-
-    val indexedNames: Array[(Int, String)] = inputType match {
-
-      case g: GenericTypeInfo[A] if g.getTypeClass == classOf[Row] =>
-        throw new TableException(
-          "An input of GenericTypeInfo<Row> cannot be converted to Table. " +
-            "Please specify the type of the input with a RowTypeInfo.")
-
-      case t: TupleTypeInfoBase[A] if t.isInstanceOf[TupleTypeInfo[A]] ||
-        t.isInstanceOf[CaseClassTypeInfo[A]] || t.isInstanceOf[RowTypeInfo] =>
-
-        // determine schema definition mode (by position or by name)
-        val isRefByPos = isReferenceByPosition(t, exprs)
-
-        exprs.zipWithIndex flatMap {
-          case (UnresolvedFieldReference(name: String), idx) =>
-            if (isRefByPos) {
-              Some((idx, name))
-            } else {
-              referenceByName(name, t).map((_, name))
-            }
-          case (Alias(UnresolvedFieldReference(origName), name: String, _), _) =>
-            if (isRefByPos) {
-              throw new TableException(
-                s"Alias '$name' is not allowed if other fields are referenced by position.")
-            } else {
-              referenceByName(origName, t).map((_, name))
-            }
-          case (_: TimeAttribute, _) | (Alias(_: TimeAttribute, _, _), _) =>
-            None
-          case _ => throw new TableException(
-            "Field reference expression or alias on field expression expected.")
-        }
-
-      case p: PojoTypeInfo[A] =>
-        exprs flatMap {
-          case UnresolvedFieldReference(name: String) =>
-            referenceByName(name, p).map((_, name))
-          case Alias(UnresolvedFieldReference(origName), name: String, _) =>
-            referenceByName(origName, p).map((_, name))
-          case _: TimeAttribute | Alias(_: TimeAttribute, _, _) =>
-            None
-          case _ => throw new TableException(
-            "Field reference expression or alias on field expression expected.")
-        }
-
-      case _: TypeInformation[_] => // atomic or other custom type information
-        var referenced = false
-        exprs flatMap {
-          case _: TimeAttribute =>
-            None
-          case UnresolvedFieldReference(_) if referenced =>
-            // only accept the first field for an atomic type
-            throw new TableException("Only the first field can reference an atomic type.")
-          case UnresolvedFieldReference(name: String) =>
-            referenced = true
-            // first field reference is mapped to atomic type
-            Some((0, name))
-          case _ => throw new TableException(
-            "Field reference expression expected.")
-        }
-    }
-
-    val (fieldIndexes, fieldNames) = indexedNames.unzip
-
-    if (fieldNames.contains("*")) {
-      throw new TableException("Field name can not be '*'.")
-    }
-
-    (fieldNames, fieldIndexes)
-  }
-
   protected def generateRowConverterFunction[OUT](
       inputTypeInfo: TypeInformation[Row],
       schema: RowSchema,
@@ -980,72 +831,3 @@ abstract class TableEnvImpl(
     Some(generated)
   }
 }
-
-/**
-  * Object to instantiate a [[TableEnvImpl]] depending on the batch or stream execution environment.
-  */
-object TableEnvImpl {
-
-  /**
-    * Returns field names for a given [[TypeInformation]].
-    *
-    * @param inputType The TypeInformation extract the field names.
-    * @tparam A The type of the TypeInformation.
-    * @return An array holding the field names
-    */
-  def getFieldNames[A](inputType: TypeInformation[A]): Array[String] = {
-    validateType(inputType)
-
-    val fieldNames: Array[String] = inputType match {
-      case t: CompositeType[_] => t.getFieldNames
-      case _: TypeInformation[_] => Array("f0")
-    }
-
-    if (fieldNames.contains("*")) {
-      throw new TableException("Field name can not be '*'.")
-    }
-
-    fieldNames
-  }
-
-  /**
-    * Validate if class represented by the typeInfo is static and globally accessible
-    * @param typeInfo type to check
-    * @throws TableException if type does not meet these criteria
-    */
-  def validateType(typeInfo: TypeInformation[_]): Unit = {
-    val clazz = typeInfo.getTypeClass
-    if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
-      !Modifier.isPublic(clazz.getModifiers) ||
-      clazz.getCanonicalName == null) {
-      throw new TableException(
-        s"Class '$clazz' described in type information '$typeInfo' must be " +
-        s"static and globally accessible.")
-    }
-  }
-
-  /**
-    * Returns field indexes for a given [[TypeInformation]].
-    *
-    * @param inputType The TypeInformation extract the field positions from.
-    * @return An array holding the field positions
-    */
-  def getFieldIndices(inputType: TypeInformation[_]): Array[Int] = {
-    getFieldNames(inputType).indices.toArray
-  }
-
-  /**
-    * Returns field types for a given [[TypeInformation]].
-    *
-    * @param inputType The TypeInformation to extract field types from.
-    * @return An array holding the field types.
-    */
-  def getFieldTypes(inputType: TypeInformation[_]): Array[TypeInformation[_]] = {
-    validateType(inputType)
-
-    inputType match {
-      case ct: CompositeType[_] => 0.until(ct.getArity).map(i => ct.getTypeAt(i)).toArray
-      case t: TypeInformation[_] => Array(t.asInstanceOf[TypeInformation[_]])
-    }
-  }
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvImpl.scala
index d876195..235370e 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvImpl.scala
@@ -17,17 +17,18 @@
  */
 package org.apache.flink.table.api.java
 
+import _root_.java.lang.{Boolean => JBool}
+
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor}
 import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
-import org.apache.flink.table.api._
-import org.apache.flink.table.functions.{AggregateFunction, TableAggregateFunction, TableFunction, UserDefinedAggregateFunction}
-import org.apache.flink.table.expressions.ExpressionParser
+import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor}
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-import _root_.java.lang.{Boolean => JBool}
-
+import org.apache.flink.table.api._
 import org.apache.flink.table.catalog.CatalogManager
+import org.apache.flink.table.expressions.ExpressionParser
+import org.apache.flink.table.functions.{AggregateFunction, TableAggregateFunction, TableFunction, UserDefinedAggregateFunction}
+import org.apache.flink.table.typeutils.FieldInfoUtils
 
 import _root_.scala.collection.JavaConverters._
 
@@ -95,7 +96,7 @@ class StreamTableEnvImpl(
       clazz: Class[T],
       queryConfig: StreamQueryConfig): DataStream[T] = {
     val typeInfo = TypeExtractor.createTypeInfo(clazz)
-    TableEnvImpl.validateType(typeInfo)
+    FieldInfoUtils.validateInputTypeInfo(typeInfo)
     translate[T](table, queryConfig, updatesAsRetraction = false, withChangeFlag = false)(typeInfo)
   }
 
@@ -103,7 +104,7 @@ class StreamTableEnvImpl(
       table: Table,
       typeInfo: TypeInformation[T],
       queryConfig: StreamQueryConfig): DataStream[T] = {
-    TableEnvImpl.validateType(typeInfo)
+    FieldInfoUtils.validateInputTypeInfo(typeInfo)
     translate[T](table, queryConfig, updatesAsRetraction = false, withChangeFlag = false)(typeInfo)
   }
 
@@ -127,7 +128,7 @@ class StreamTableEnvImpl(
       queryConfig: StreamQueryConfig): DataStream[JTuple2[JBool, T]] = {
 
     val typeInfo = TypeExtractor.createTypeInfo(clazz)
-    TableEnvImpl.validateType(typeInfo)
+    FieldInfoUtils.validateInputTypeInfo(typeInfo)
     val resultType = new TupleTypeInfo[JTuple2[JBool, T]](Types.BOOLEAN, typeInfo)
     translate[JTuple2[JBool, T]](
       table,
@@ -141,7 +142,7 @@ class StreamTableEnvImpl(
       typeInfo: TypeInformation[T],
       queryConfig: StreamQueryConfig): DataStream[JTuple2[JBool, T]] = {
 
-    TableEnvImpl.validateType(typeInfo)
+    FieldInfoUtils.validateInputTypeInfo(typeInfo)
     val resultTypeInfo = new TupleTypeInfo[JTuple2[JBool, T]](
       Types.BOOLEAN,
       typeInfo
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
index f8443b2..777aa8e 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
@@ -34,11 +34,12 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo, TypeExtractor}
 import org.apache.flink.table.api.dataview._
-import org.apache.flink.table.api.{TableEnvImpl, TableException, ValidationException}
+import org.apache.flink.table.api.{TableException, ValidationException}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.dataview._
 import org.apache.flink.table.functions._
 import org.apache.flink.table.plan.schema.FlinkTableFunctionImpl
+import org.apache.flink.table.typeutils.FieldInfoUtils
 import org.apache.flink.util.InstantiationUtil
 
 import scala.collection.mutable
@@ -699,9 +700,9 @@ object UserDefinedFunctionUtils {
   def getFieldInfo(inputType: TypeInformation[_])
     : (Array[String], Array[Int], Array[TypeInformation[_]]) = {
 
-    (TableEnvImpl.getFieldNames(inputType),
-    TableEnvImpl.getFieldIndices(inputType),
-    TableEnvImpl.getFieldTypes(inputType))
+    (FieldInfoUtils.getFieldNames(inputType),
+      FieldInfoUtils.getFieldIndices(inputType),
+      FieldInfoUtils.getFieldTypes(inputType))
   }
 
   /**