You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/05/31 06:46:08 UTC

[GitHub] [flink] twalthr commented on a change in pull request #8521: [FLINK-12601][table] Make the DataStream & DataSet conversion to a Table independent from Calcite

twalthr commented on a change in pull request #8521: [FLINK-12601][table] Make the DataStream & DataSet conversion to a Table independent from Calcite
URL: https://github.com/apache/flink/pull/8521#discussion_r289271852
 
 

 ##########
 File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java
 ##########
 @@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionUtils;
+import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.types.Row;
+
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static java.lang.String.format;
+import static org.apache.flink.table.expressions.BuiltInFunctionDefinitions.TIME_ATTRIBUTES;
+
+/**
+ * Utility classes for extracting names and indices of fields from different {@link TypeInformation}s.
+ */
+public class FieldInfoUtils {
+
+	/**
+	 * Describes extracted fields and corresponding indices from a {@link TypeInformation}.
+	 */
+	public static class FieldsInfo {
+		private final String[] fieldNames;
+		private final int[] indices;
+
+		FieldsInfo(String[] fieldNames, int[] indices) {
+			this.fieldNames = fieldNames;
+			this.indices = indices;
+		}
+
+		public String[] getFieldNames() {
+			return fieldNames;
+		}
+
+		public int[] getIndices() {
+			return indices;
+		}
+	}
+
+	/**
+	 * Reference input fields by name:
+	 * All fields in the schema definition are referenced by name
+	 * (and possibly renamed using an alias (as). In this mode, fields can be reordered and
+	 * projected out. Moreover, we can define proctime and rowtime attributes at arbitrary
+	 * positions using arbitrary names (except those that exist in the result schema). This mode
+	 * can be used for any input type, including POJOs.
+	 *
+	 * <p>Reference input fields by position:
+	 * In this mode, fields are simply renamed. Event-time attributes can
+	 * replace the field on their position in the input data (if it is of correct type) or be
+	 * appended at the end. Proctime attributes must be appended at the end. This mode can only be
+	 * used if the input type has a defined field order (tuple, case class, Row) and no of fields
+	 * references a field of the input type.
+	 */
+	public static boolean isReferenceByPosition(CompositeType<?> ct, Expression[] fields) {
+		if (!(ct instanceof TupleTypeInfoBase)) {
+			return false;
+		}
+
+		List<String> inputNames = Arrays.asList(ct.getFieldNames());
+
+		// Use the by-position mode if no of the fields exists in the input.
+		// This prevents confusing cases like ('f2, 'f0, 'myName) for a Tuple3 where fields are renamed
+		// by position but the user might assume reordering instead of renaming.
+		return Arrays.stream(fields).allMatch(f -> {
+			if (f instanceof UnresolvedReferenceExpression) {
+				return !inputNames.contains(((UnresolvedReferenceExpression) f).getName());
+			}
+
+			return true;
+		});
+	}
+
+	/**
+	 * Returns field names and field positions for a given {@link TypeInformation}.
+	 *
+	 * @param inputType The TypeInformation extract the field names and positions from.
+	 * @param <A> The type of the TypeInformation.
+	 * @return A tuple of two arrays holding the field names and corresponding field positions.
+	 */
+	public static <A> FieldsInfo getFieldInfo(TypeInformation<A> inputType) {
 
 Review comment:
   nit: `getFieldsInfo`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services