You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/08 03:17:56 UTC

[GitHub] [flink] wuchong commented on a change in pull request #11985: [FLINK-16989][table] Support ScanTableSource in blink planner

wuchong commented on a change in pull request #11985:
URL: https://github.com/apache/flink/pull/11985#discussion_r421913683



##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
##########
@@ -92,177 +73,21 @@ class StreamExecTableSourceScan(
     replaceInput(ordinalInParent, newInputNode.asInstanceOf[RelNode])
   }
 
-  override protected def translateToPlanInternal(
-      planner: StreamPlanner): Transformation[RowData] = {
-    val config = planner.getTableConfig
-    val inputTransform = getSourceTransformation(planner.getExecEnv)
-
-    val fieldIndexes = computeIndexMapping()
-
-    val inputDataType = inputTransform.getOutputType
-    val producedDataType = tableSource.getProducedDataType
-
-    // check that declared and actual type of table source DataStream are identical
-    if (inputDataType !=
-        TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(producedDataType)) {
-      throw new TableException(s"TableSource of type ${tableSource.getClass.getCanonicalName} " +
-        s"returned a DataStream of data type $inputDataType that does not match with the " +
-        s"data type $producedDataType declared by the TableSource.getProducedDataType() method. " +
-        s"Please validate the implementation of the TableSource.")
-    }
-
-    // get expression to extract rowtime attribute
-    val rowtimeExpression: Option[RexNode] = TableSourceUtil.getRowtimeAttributeDescriptor(
-      tableSource,
-      tableSourceTable.getRowType
-    ).map(desc =>
-      TableSourceUtil.getRowtimeExtractionExpression(
-        desc.getTimestampExtractor,
-        producedDataType,
-        planner.getRelBuilder,
-        nameMapping
-      )
-    )
-
-    val streamTransformation = if (needInternalConversion) {
-      // extract time if the index is -1 or -2.
-      val (extractElement, resetElement) =
-        if (ScanUtil.hasTimeAttributeField(fieldIndexes)) {
-          (s"ctx.$ELEMENT = $ELEMENT;", s"ctx.$ELEMENT = null;")
-        } else {
-          ("", "")
-        }
-      val ctx = CodeGeneratorContext(config).setOperatorBaseClass(
-        classOf[AbstractProcessStreamOperator[RowData]])
-      // the produced type may not carry the correct precision user defined in DDL, because
-      // it may be converted from legacy type. Fix precision using logical schema from DDL.
-      // Code generation requires the correct precision of input fields.
-      val fixedProducedDataType = TableSourceUtil.fixPrecisionForProducedDataType(
-        tableSource,
-        FlinkTypeFactory.toLogicalRowType(tableSourceTable.getRowType))
-      val conversionTransform = ScanUtil.convertToInternalRow(
-        ctx,
-        inputTransform.asInstanceOf[Transformation[Any]],
-        fieldIndexes,
-        fixedProducedDataType,
-        getRowType,
-        getTable.getQualifiedName,
-        config,
-        rowtimeExpression,
-        beforeConvert = extractElement,
-        afterConvert = resetElement)
-      conversionTransform
-    } else {
-      inputTransform.asInstanceOf[Transformation[RowData]]
-    }
-
-    val ingestedTable = new DataStream(planner.getExecEnv, streamTransformation)
-
-    // generate watermarks for rowtime indicator
-    val rowtimeDescOption: Option[RowtimeAttributeDescriptor] =
-      TableSourceUtil.getRowtimeAttributeDescriptor(tableSource, tableSourceTable.getRowType)
-
-    val withWatermarks = rowtimeDescOption match {
-      case Some(rowtimeDesc) =>
-        val rowtimeFieldIdx = getRowType.getFieldNames.indexOf(rowtimeDesc.getAttributeName)
-        val watermarkStrategy = rowtimeDesc.getWatermarkStrategy
-        watermarkStrategy match {
-          case p: PeriodicWatermarkAssigner =>
-            val watermarkGenerator = new PeriodicWatermarkAssignerWrapper(rowtimeFieldIdx, p)
-            ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
-          case p: PunctuatedWatermarkAssigner =>
-            val watermarkGenerator =
-              new PunctuatedWatermarkAssignerWrapper(rowtimeFieldIdx, p, producedDataType)
-            ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
-          case _: PreserveWatermarks =>
-            // The watermarks have already been provided by the underlying DataStream.
-            ingestedTable
-        }
-      case None =>
-        // No need to generate watermarks if no rowtime attribute is specified.
-        ingestedTable
-    }
-    withWatermarks.getTransformation
-  }
-
-  private def needInternalConversion: Boolean = {
-    val fieldIndexes = computeIndexMapping()
-    ScanUtil.hasTimeAttributeField(fieldIndexes) ||
-      ScanUtil.needsConversion(tableSource.getProducedDataType)
-  }
-
-  override def createInput[IN](
+  override protected def createInputFormatTransformation(
       env: StreamExecutionEnvironment,
-      format: InputFormat[IN, _ <: InputSplit],
-      t: TypeInformation[IN]): Transformation[IN] = {
-    // See StreamExecutionEnvironment.createInput, it is better to deal with checkpoint.
-    // The disadvantage is that streaming not support multi-paths.
-    env.createInput(format, t).name(tableSource.explainSource()).getTransformation
-  }
-
-  private def computeIndexMapping()
-    : Array[Int] = {
-    TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(
-      tableSource,
-      FlinkTypeFactory.toTableSchema(getRowType).getTableColumns,
-      true,
-      nameMapping
-    )
-  }
-
-  private lazy val nameMapping: JFunction[String, String] = tableSource match {
-    case mapping: DefinedFieldMapping if mapping.getFieldMapping != null =>
-      new JFunction[String, String] {
-        override def apply(t: String): String = mapping.getFieldMapping.get(t)
-      }
-    case _ => JFunction.identity()
+      inputFormat: InputFormat[RowData, _],
+      name: String,
+      outTypeInfo: RowDataTypeInfo): Transformation[RowData] = {
+    // It's better to use StreamExecutionEnvironment.createInput()
+    // rather than addSource() for streaming, because it take care of checkpoint.
+    env
+      .createInput(inputFormat, outTypeInfo)
+      .name(name)
+      .getTransformation
   }
-}
-
-/**
-  * Generates periodic watermarks based on a [[PeriodicWatermarkAssigner]].
-  *
-  * @param timeFieldIdx the index of the rowtime attribute.
-  * @param assigner the watermark assigner.
-  */
-private class PeriodicWatermarkAssignerWrapper(
-    timeFieldIdx: Int,
-    assigner: PeriodicWatermarkAssigner)
-  extends AssignerWithPeriodicWatermarks[RowData] {
-
-  override def getCurrentWatermark: Watermark = assigner.getWatermark
-
-  override def extractTimestamp(row: RowData, previousElementTimestamp: Long): Long = {
-    val timestamp: Long = row.getTimestamp(timeFieldIdx, 3).getMillisecond
-    assigner.nextTimestamp(timestamp)
-    0L
-  }
-}
 
-/**
-  * Generates periodic watermarks based on a [[PunctuatedWatermarkAssigner]].
-  *
-  * @param timeFieldIdx the index of the rowtime attribute.
-  * @param assigner the watermark assigner.
-  */
-private class PunctuatedWatermarkAssignerWrapper(
-    timeFieldIdx: Int,
-    assigner: PunctuatedWatermarkAssigner,
-    sourceType: DataType)
-  extends AssignerWithPunctuatedWatermarks[RowData] {
-
-  private val converter =
-    DataFormatConverters.getConverterForDataType((sourceType match {
-      case _: FieldsDataType => sourceType
-      case _ => DataTypes.ROW(DataTypes.FIELD("f0", sourceType))
-    }).bridgedTo(classOf[Row])).asInstanceOf[DataFormatConverter[RowData, Row]]
-
-  override def checkAndGetNextWatermark(row: RowData, ts: Long): Watermark = {
-    val timestamp: Long = row.getLong(timeFieldIdx)
-    assigner.getWatermark(converter.toExternal(row), timestamp)
-  }
-
-  override def extractTimestamp(element: RowData, previousElementTimestamp: Long): Long = {
-    0L
+  override protected def translateToPlanInternal(
+      planner: StreamPlanner): Transformation[RowData] = {
+    createSourceTransformation(planner.getExecEnv, getRelDetailedDescription)
   }

Review comment:
       Yes. This is on purpose. We should prefer to use `getRelDetailedDescription` which have more detailed and formated information about this node. Besides, the `DynamicTableSource#asSummaryString` is not the same meaning with `TableSource#explainSource`. `asSummaryString` is just used for logging (such as `Kafka_0.9`), it doesn't contain detailed information of this node.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org