You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yuval Itzchakov (Jira)" <ji...@apache.org> on 2020/05/10 15:19:00 UTC
[jira] [Commented] (FLINK-17600) Blink Planner fails to generate
RowtimeAttribute based on TableSource's DefinedRowtimeAttributes
implementation
[ https://issues.apache.org/jira/browse/FLINK-17600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17103836#comment-17103836 ]
Yuval Itzchakov commented on FLINK-17600:
-----------------------------------------
Related? [https://github.com/apache/flink/pull/11204/|https://github.com/apache/flink/pull/11204/files]
> Blink Planner fails to generate RowtimeAttribute based on TableSource's DefinedRowtimeAttributes implementation
> ---------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-17600
> URL: https://issues.apache.org/jira/browse/FLINK-17600
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.10.0
> Reporter: Yuval Itzchakov
> Priority: Major
>
> Given the following SQL statement:
> {code:java}
> tableEnv.sqlQuery("SELECT EVENT_TIME, B, C FROM FOO"){code}
> Where FOO is a table originating from a custom StreamTableSource[Row] which implements `DefinedRowtimeAttributes.getRowtimeAttributeDescriptors`, Blink Planner fails to mark the selected field with a `RowtimeAttribute`.
> This happens because `TableSourceUtil.getSourceRowType`s implementation receives a `None` TableSource from `CatalogSchemaTable.getRowType`, presumably because the Catalog has yet to create the underlying TableSource which is deferred to implementing TableFactory (in this case my own custom one).
> *This* *does not reproduce in the old Flink planner*, because the old planner uses `TableSourceTable` which explicitly holds a reference to the underlying `TableSource` and extracts it's row time attributes.
> Relevant code:
> *CatalogSchemaTable*:
>
> {code:java}
> private static RelDataType getRowType(RelDataTypeFactory typeFactory,
> CatalogBaseTable catalogBaseTable,
> boolean isStreamingMode) {
> final FlinkTypeFactory flinkTypeFactory = (FlinkTypeFactory) typeFactory;
> TableSchema tableSchema = catalogBaseTable.getSchema();
> final DataType[] fieldDataTypes = tableSchema.getFieldDataTypes();
> if (!isStreamingMode
> && catalogBaseTable instanceof ConnectorCatalogTable
> && ((ConnectorCatalogTable) catalogBaseTable).getTableSource().isPresent()) {
> // If the table source is bounded, materialize the time attributes to normal TIMESTAMP type.
> // Now for ConnectorCatalogTable, there is no way to
> // deduce if it is bounded in the table environment, so the data types in TableSchema
> // always patched with TimeAttribute.
> // See ConnectorCatalogTable#calculateSourceSchema
> // for details.
> // Remove the patched time attributes type to let the TableSourceTable handle it.
> // We should remove this logic if the isBatch flag in ConnectorCatalogTable is fixed.
> // TODO: Fix FLINK-14844.
> for (int i = 0; i < fieldDataTypes.length; i++) {
> LogicalType lt = fieldDataTypes[i].getLogicalType();
> if (lt instanceof TimestampType
> && (((TimestampType) lt).getKind() == TimestampKind.PROCTIME
> || ((TimestampType) lt).getKind() == TimestampKind.ROWTIME)) {
> int precision = ((TimestampType) lt).getPrecision();
> fieldDataTypes[i] = DataTypes.TIMESTAMP(precision);
> }
> }
> }
> return TableSourceUtil.getSourceRowType(flinkTypeFactory,
> tableSchema,
> scala.Option.empty(),
> isStreamingMode);
> }
> {code}
> *TableSourceUtil:*
>
>
> {code:java}
> def getSourceRowType(
> typeFactory: FlinkTypeFactory,
> tableSchema: TableSchema,
> tableSource: Option[TableSource[_]],
> streaming: Boolean): RelDataType = {
> val fieldNames = tableSchema.getFieldNames
> val fieldDataTypes = tableSchema.getFieldDataTypes
> if (tableSchema.getWatermarkSpecs.nonEmpty) {
> getSourceRowType(typeFactory, fieldNames, fieldDataTypes, tableSchema.getWatermarkSpecs.head,
> streaming)
> } else if (tableSource.isDefined) {
> getSourceRowType(typeFactory, fieldNames, fieldDataTypes, tableSource.get,
> streaming)
> } else {
> val fieldTypes = fieldDataTypes.map(fromDataTypeToLogicalType)
> typeFactory.buildRelNodeRowType(fieldNames, fieldTypes)
> }
> }{code}
> *TableSourceTable:*
> {code:java}
> // We must enrich logical schema from catalog table with physical type coming from table source.
> // Schema coming from catalog table might not have proper conversion classes. Those must be
> // extracted from produced type, before converting to RelDataType
> def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory] val fieldNames = tableSchema.getFieldNames val nameMapping: JFunction[String, String] = tableSource match {
> case mapping: DefinedFieldMapping if mapping.getFieldMapping != null =>
> new JFunction[String, String] {
> override def apply(t: String): String = mapping.getFieldMapping.get(t)
> }
> case _ => JFunction.identity()
> } val producedDataType = tableSource.getProducedDataType
> val fieldIndexes = TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(
> tableSource,
> tableSchema.getTableColumns,
> isStreamingMode,
> nameMapping
> ) val typeInfos = if (LogicalTypeChecks.isCompositeType(producedDataType.getLogicalType)) {
> val physicalSchema = DataTypeUtils.expandCompositeTypeToSchema(producedDataType)
> fieldIndexes.map(mapIndex(_,
> idx =>
> TypeConversions.fromDataTypeToLegacyInfo(physicalSchema.getFieldDataType(idx).get()))
> )
> } else {
> fieldIndexes.map(mapIndex(_, _ => TypeConversions.fromDataTypeToLegacyInfo(producedDataType)))
> } flinkTypeFactory.buildLogicalRowType(fieldNames, typeInfos)
> }
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)