You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/12/18 04:07:31 UTC

[GitHub] [flink] wuchong commented on a change in pull request #10605: [FLINK-15168][table-planner] Fix physical indices computing.

wuchong commented on a change in pull request #10605: [FLINK-15168][table-planner] Fix physical indices computing. 
URL: https://github.com/apache/flink/pull/10605#discussion_r359144019
 
 

 ##########
 File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSourceUtils.java
 ##########
 @@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.sources.DefinedProctimeAttribute;
+import org.apache.flink.table.sources.DefinedRowtimeAttributes;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasFamily;
+
+/**
+ * Utility methods for dealing with {@link org.apache.flink.table.sources.TableSource}.
+ */
+@Internal
+public final class TableSourceUtils {
+
+	/**
+	 * Computes indices of physical fields corresponding to the selected logical fields of a {@link TableSchema}.
+	 *
+	 * @param logicalColumns         Logical columns that describe the physical type.
+	 * @param physicalType           Physical type to retrieve indices from.
+	 * @param nameRemapping          Additional remapping of a logical to a physical field name.
+	 *                               TimestampExtractor works with logical names, but accesses physical
+	 *                               fields
+	 * @return Physical indices of logical fields selected with {@code projectedLogicalFields} mask.
+	 */
+	public static int[] computePhysicalIndices(
+			List<TableColumn> logicalColumns,
+			DataType physicalType,
+			Function<String, String> nameRemapping) {
+
+		Map<TableColumn, Integer> physicalIndexLookup = computePhysicalIndices(
+			logicalColumns.stream(),
+			physicalType,
+			nameRemapping);
+
+		return logicalColumns.stream().mapToInt(physicalIndexLookup::get).toArray();
+	}
+
+	/**
+	 * Computes indices of physical fields corresponding to the selected logical fields of a {@link TableSchema}.
+	 *
+	 * <p>It puts markers (idx < 0) for time attributes extracted from {@link DefinedProctimeAttribute}
+	 * and {@link DefinedRowtimeAttributes}
+	 *
+	 * <p>{@link TableSourceUtils#computePhysicalIndices(List, DataType, Function)} should be preferred. The
+	 * time attribute markers should not be used anymore.
+	 *
+	 * @param tableSource     Used to extract {@link DefinedRowtimeAttributes}, {@link DefinedProctimeAttribute}
+	 *                        and {@link TableSource#getProducedDataType()}.
+	 * @param logicalColumns  Logical columns that describe the physical type.
+	 * @param streamMarkers   If true puts stream markers otherwise puts batch markers.
+	 * @param nameRemapping   Additional remapping of a logical to a physical field name.
+	 *                        TimestampExtractor works with logical names, but accesses physical
+	 *                        fields
+	 * @return Physical indices of logical fields selected with {@code projectedLogicalFields} mask.
+	 */
+	public static int[] computePhysicalIndicesOrTimeAttributeMarkers(
+			TableSource<?> tableSource,
+			List<TableColumn> logicalColumns,
+			boolean streamMarkers,
+			Function<String, String> nameRemapping) {
+		Optional<String> proctimeAttribute = getProctimeAttribute(tableSource);
+		List<String> rowtimeAttributes = getRowtimeAttributes(tableSource);
+
+		List<TableColumn> columnsWithoutTimeAttributes = logicalColumns.stream().filter(col ->
+			!rowtimeAttributes.contains(col.getName())
+				&& proctimeAttribute.map(attr -> !attr.equals(col.getName())).orElse(true))
+			.collect(Collectors.toList());
+
+		Map<TableColumn, Integer> columnsToPhysicalIndices = TableSourceUtils.computePhysicalIndices(
+			columnsWithoutTimeAttributes.stream(),
+			tableSource.getProducedDataType(),
+			nameRemapping
+		);
+
+		return logicalColumns.stream().mapToInt(logicalColumn -> {
+			if (proctimeAttribute.map(attr -> attr.equals(logicalColumn.getName())).orElse(false)) {
+				verifyTimeAttributeType(logicalColumn, "Proctime");
+
+				if (streamMarkers) {
+					return TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER;
+				} else {
+					return TimeIndicatorTypeInfo.PROCTIME_BATCH_MARKER;
+				}
+			} else if (rowtimeAttributes.contains(logicalColumn.getName())) {
+				verifyTimeAttributeType(logicalColumn, "Rowtime");
+
+				if (streamMarkers) {
+					return TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER;
+				} else {
+					return TimeIndicatorTypeInfo.ROWTIME_BATCH_MARKER;
+				}
+			} else {
+				return columnsToPhysicalIndices.get(logicalColumn);
+			}
+		}).toArray();
+	}
+
+	private static void verifyTimeAttributeType(TableColumn logicalColumn, String rowtimeOrProctime) {
+		if (!hasFamily(logicalColumn.getType().getLogicalType(), LogicalTypeFamily.TIMESTAMP)) {
+			throw new ValidationException(String.format(
+				"%s field '%s' has invalid type %s. %s attributes must be of a Timestamp family.",
+				rowtimeOrProctime,
+				logicalColumn.getName(),
+				logicalColumn.getType(),
+				rowtimeOrProctime));
+		}
+	}
+
+	private static Map<TableColumn, Integer> computePhysicalIndices(
+			Stream<TableColumn> columns,
+			DataType physicalType,
+			Function<String, String> nameRemappingFunction) {
+		if (DataTypeUtils.isCompositeType(physicalType)) {
+			TableSchema physicalSchema = TypeConversions.expandCompositeTypeToSchema(physicalType);
+			return computeInCompositeType(columns, physicalSchema, nameRemappingFunction);
+		} else {
+			return computeInSimpleType(columns, physicalType);
+		}
+	}
+
+	private static Map<TableColumn, Integer> computeInCompositeType(
+			Stream<TableColumn> columns,
+			TableSchema physicalSchema,
+			Function<String, String> nameRemappingFunction) {
+		return columns.collect(
+			Collectors.toMap(
+				Function.identity(),
+				column -> {
+					String remappedName = nameRemappingFunction.apply(column.getName());
+
+					int idx = IntStream.range(0, physicalSchema.getFieldCount())
+						.filter(i -> physicalSchema.getFieldName(i).get().equals(remappedName))
+						.findFirst()
+						.orElseThrow(() -> new TableException(String.format(
+							"Could not map %s column to the underlying physical type %s. No such field.",
+							column.getName(),
+							physicalSchema
+						)));
+
+					LogicalType physicalFieldType = physicalSchema.getFieldDataType(idx).get().getLogicalType();
+					LogicalType logicalFieldType = column.getType().getLogicalType();
+
+					if (!physicalFieldType.equals(logicalFieldType)) {
 
 Review comment:
   Hi @dawidwys , thanks for fixing this problem. 
   
   I looked into the changed code. The test is failed because it uses the new `computePhysicalIndices` in table-common. In blink planner, we uses `PlannerTypeUtils#isAssignable` to assert whether the physical and logical type are equal. The `isAssignable` is very loose which only checks type root, but `LogicalType#equals` is the most strict which also checks field names, precisions and nullable. In previous, the old planner check physcial and logical type using TypeInformation which is the same to `isAssignable`.
   
   From my point of view, we don't need to check precisions and nullable, because we are using TypeInformation in runtime, e.g. BigDecimal. It's fine it uses (38, 18) in reader, because we will convert it into the correct precision (precision from DDL) in `SourceConversion` via `DataFormatConverter`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services