You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yuval Itzchakov (Jira)" <ji...@apache.org> on 2020/05/10 15:19:00 UTC

[jira] [Commented] (FLINK-17600) Blink Planner fails to generate RowtimeAttribute based on TableSource's DefinedRowtimeAttributes implementation

    [ https://issues.apache.org/jira/browse/FLINK-17600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17103836#comment-17103836 ] 

Yuval Itzchakov commented on FLINK-17600:
-----------------------------------------

Related? [https://github.com/apache/flink/pull/11204/|https://github.com/apache/flink/pull/11204/files]

> Blink Planner fails to generate RowtimeAttribute based on TableSource's DefinedRowtimeAttributes implementation
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-17600
>                 URL: https://issues.apache.org/jira/browse/FLINK-17600
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.10.0
>            Reporter: Yuval Itzchakov
>            Priority: Major
>
> Given the following SQL statement: 
> {code:java}
> tableEnv.sqlQuery("SELECT EVENT_TIME, B, C FROM FOO"){code}
> Where FOO is a table originating from a custom StreamTableSource[Row] which implements `DefinedRowtimeAttributes.getRowtimeAttributeDescriptors`, Blink Planner fails to mark the selected field with a `RowtimeAttribute`.
> This happens because `TableSourceUtil.getSourceRowType`s implementation receives a `None` TableSource from `CatalogSchemaTable.getRowType`, presumably because the Catalog has yet to create the underlying TableSource which is deferred to implementing TableFactory (in this case my own custom one).
> *This* *does not reproduce in the old Flink planner*, because the old planner uses `TableSourceTable` which explicitly holds a reference to the underlying `TableSource` and extracts it's row time attributes.
> Relevant code:
> *CatalogSchemaTable*:
>  
> {code:java}
> private static RelDataType getRowType(RelDataTypeFactory typeFactory,
>       CatalogBaseTable catalogBaseTable,
>       boolean isStreamingMode) {
>    final FlinkTypeFactory flinkTypeFactory = (FlinkTypeFactory) typeFactory;
>    TableSchema tableSchema = catalogBaseTable.getSchema();
>    final DataType[] fieldDataTypes = tableSchema.getFieldDataTypes();
>    if (!isStreamingMode
>       && catalogBaseTable instanceof ConnectorCatalogTable
>       && ((ConnectorCatalogTable) catalogBaseTable).getTableSource().isPresent()) {
>       // If the table source is bounded, materialize the time attributes to normal TIMESTAMP type.
>       // Now for ConnectorCatalogTable, there is no way to
>       // deduce if it is bounded in the table environment, so the data types in TableSchema
>       // always patched with TimeAttribute.
>       // See ConnectorCatalogTable#calculateSourceSchema
>       // for details.
>       // Remove the patched time attributes type to let the TableSourceTable handle it.
>       // We should remove this logic if the isBatch flag in ConnectorCatalogTable is fixed.
>       // TODO: Fix FLINK-14844.
>       for (int i = 0; i < fieldDataTypes.length; i++) {
>          LogicalType lt = fieldDataTypes[i].getLogicalType();
>          if (lt instanceof TimestampType
>             && (((TimestampType) lt).getKind() == TimestampKind.PROCTIME
>             || ((TimestampType) lt).getKind() == TimestampKind.ROWTIME)) {
>             int precision = ((TimestampType) lt).getPrecision();
>             fieldDataTypes[i] = DataTypes.TIMESTAMP(precision);
>          }
>       }
>    }
>    return TableSourceUtil.getSourceRowType(flinkTypeFactory,
>       tableSchema,
>       scala.Option.empty(),
>       isStreamingMode);
> }
> {code}
> *TableSourceUtil:*
>  
>  
> {code:java}
> def getSourceRowType(
>     typeFactory: FlinkTypeFactory,
>     tableSchema: TableSchema,
>     tableSource: Option[TableSource[_]],
>     streaming: Boolean): RelDataType = {
>   val fieldNames = tableSchema.getFieldNames
>   val fieldDataTypes = tableSchema.getFieldDataTypes
>   if (tableSchema.getWatermarkSpecs.nonEmpty) {
>     getSourceRowType(typeFactory, fieldNames, fieldDataTypes, tableSchema.getWatermarkSpecs.head,
>       streaming)
>   } else if (tableSource.isDefined) {
>     getSourceRowType(typeFactory, fieldNames, fieldDataTypes, tableSource.get,
>       streaming)
>   } else {
>     val fieldTypes = fieldDataTypes.map(fromDataTypeToLogicalType)
>     typeFactory.buildRelNodeRowType(fieldNames, fieldTypes)
>   }
> }{code}
> *TableSourceTable:*
> {code:java}
>  // We must enrich logical schema from catalog table with physical type coming from table source.
>   // Schema coming from catalog table might not have proper conversion classes. Those must be
>   // extracted from produced type, before converting to RelDataType
>   def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {    val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]    val fieldNames = tableSchema.getFieldNames    val nameMapping: JFunction[String, String] = tableSource match {
>       case mapping: DefinedFieldMapping if mapping.getFieldMapping != null =>
>         new JFunction[String, String] {
>           override def apply(t: String): String = mapping.getFieldMapping.get(t)
>         }
>       case _ => JFunction.identity()
>     }    val producedDataType = tableSource.getProducedDataType
>     val fieldIndexes = TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(
>       tableSource,
>       tableSchema.getTableColumns,
>       isStreamingMode,
>       nameMapping
>     )    val typeInfos = if (LogicalTypeChecks.isCompositeType(producedDataType.getLogicalType)) {
>       val physicalSchema = DataTypeUtils.expandCompositeTypeToSchema(producedDataType)
>       fieldIndexes.map(mapIndex(_,
>         idx =>
>           TypeConversions.fromDataTypeToLegacyInfo(physicalSchema.getFieldDataType(idx).get()))
>       )
>     } else {
>       fieldIndexes.map(mapIndex(_, _ => TypeConversions.fromDataTypeToLegacyInfo(producedDataType)))
>     }    flinkTypeFactory.buildLogicalRowType(fieldNames, typeInfos)
>   }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)