You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/04/23 03:27:01 UTC

[GitHub] [flink] wuchong commented on a change in pull request #11837: [FLINK-16160][table-planner-blink] Fix proctime()/rowtime() doesn't w…

wuchong commented on a change in pull request #11837:
URL: https://github.com/apache/flink/pull/11837#discussion_r413470860



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
##########
@@ -153,6 +166,27 @@ private static RelDataType getRowType(RelDataTypeFactory typeFactory,
 				}
 			}
 		}
+
+		// The following block is a workaround to support tables defined by TableEnvironment.connect() and
+		// the actual table sources implement DefinedProctimeAttribute/DefinedRowtimeAttributes.
+		// It should be removed after we remove DefinedProctimeAttribute/DefinedRowtimeAttributes.
+		Optional<TableSource> sourceOpt = findAndCreateTableSource(new TableConfig().getConfiguration());
+		if (tableSchema.getTableColumns().stream().noneMatch(TableColumn::isGenerated)
+			&& tableSchema.getWatermarkSpecs().isEmpty()
+			&& sourceOpt.isPresent()) {
+			TableSource source = sourceOpt.get();
+			if ((source instanceof DefinedProctimeAttribute
+					&& ((DefinedProctimeAttribute) source).getProctimeAttribute() != null)
+					||
+					(source instanceof DefinedRowtimeAttributes
+							&& ((DefinedRowtimeAttributes) source).getRowtimeAttributeDescriptors() != null
+							&& !((DefinedRowtimeAttributes) source).getRowtimeAttributeDescriptors().isEmpty())) {

Review comment:
       Add a `hasProctimeAttribute` to `TableSourceValidation` and the condition can be simplified into 
   
   ```java
   if (hasRowtimeAttribute(source) && hasProctimeAttribute(source))
   ```

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
##########
@@ -153,6 +166,27 @@ private static RelDataType getRowType(RelDataTypeFactory typeFactory,
 				}
 			}
 		}
+
+		// The following block is a workaround to support tables defined by TableEnvironment.connect() and
+		// the actual table sources implement DefinedProctimeAttribute/DefinedRowtimeAttributes.
+		// It should be removed after we remove DefinedProctimeAttribute/DefinedRowtimeAttributes.
+		Optional<TableSource> sourceOpt = findAndCreateTableSource(new TableConfig().getConfiguration());

Review comment:
       ```suggestion
   		Optional<TableSource<?>> sourceOpt = findAndCreateTableSource(new TableConfig().getConfiguration());
   ```
   
   Add `<?>` to TableSource to avoid IDEA warning.

##########
File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala
##########
@@ -130,6 +131,60 @@ class TableSourceTest extends TableTestBase {
     util.verifyPlan(sqlQuery)
   }
 
+
+  @Test
+  def testLegacyRowTimeTableGroupWindow(): Unit = {
+    util.tableEnv.connect(new ConnectorDescriptor("TestTableSourceWithTime", 1, false) {
+      override protected def toConnectorProperties: JMap[String, String] = {
+        Collections.emptyMap()
+      }

Review comment:
       Can we have a dedicated descriptor for `TestTableSourceWithTime`? This code looks confusing. 

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
##########
@@ -153,6 +166,27 @@ private static RelDataType getRowType(RelDataTypeFactory typeFactory,
 				}
 			}
 		}
+
+		// The following block is a workaround to support tables defined by TableEnvironment.connect() and
+		// the actual table sources implement DefinedProctimeAttribute/DefinedRowtimeAttributes.
+		// It should be removed after we remove DefinedProctimeAttribute/DefinedRowtimeAttributes.
+		Optional<TableSource> sourceOpt = findAndCreateTableSource(new TableConfig().getConfiguration());
+		if (tableSchema.getTableColumns().stream().noneMatch(TableColumn::isGenerated)
+			&& tableSchema.getWatermarkSpecs().isEmpty()

Review comment:
       Add `isStreamingMode` into this condition, and `findAndCreateTableSource`  when the condition is satisfied. 

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
##########
@@ -153,6 +166,27 @@ private static RelDataType getRowType(RelDataTypeFactory typeFactory,
 				}
 			}
 		}
+
+		// The following block is a workaround to support tables defined by TableEnvironment.connect() and
+		// the actual table sources implement DefinedProctimeAttribute/DefinedRowtimeAttributes.
+		// It should be removed after we remove DefinedProctimeAttribute/DefinedRowtimeAttributes.
+		Optional<TableSource> sourceOpt = findAndCreateTableSource(new TableConfig().getConfiguration());

Review comment:
       If the `ReadableConfig` is always an empty configuration. Please remove the parameter and construct in the `findAndCreateTableSource` method with a comment to explain why we use an empty configuration. 

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
##########
@@ -210,20 +208,37 @@ class CatalogSourceTable[T](
    */
   private def eraseTimeIndicator(
       relDataType: RelDataType,
-      factory: FlinkTypeFactory): RelDataType = {
-    val logicalRowType = FlinkTypeFactory.toLogicalRowType(relDataType)
-    val fieldNames = logicalRowType.getFieldNames
-    val fieldTypes = logicalRowType.getFields.map { f =>
-      if (FlinkTypeFactory.isTimeIndicatorType(f.getType)) {
-        val timeIndicatorType = f.getType.asInstanceOf[TimestampType]
-        new TimestampType(
-          timeIndicatorType.isNullable,
-          TimestampKind.REGULAR,
-          timeIndicatorType.getPrecision)
-      } else {
-        f.getType
+      factory: FlinkTypeFactory,
+      tableSource: TableSource[_]): RelDataType = {
+    val isLegacySource = tableSource match {
+      case rts: DefinedRowtimeAttributes
+        if (rts.getRowtimeAttributeDescriptors != null
+          && rts.getRowtimeAttributeDescriptors.nonEmpty) =>
+        true
+      case pts: DefinedProctimeAttribute if pts.getProctimeAttribute != null =>
+         true
+      case _ => false
+    }

Review comment:
       `val legacyTimeAttributeDefined = hasRowtimeAttribute(source) && hasProctimeAttribute(source)`

##########
File path: flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
##########
@@ -20,3 +20,5 @@ org.apache.flink.table.planner.utils.TestFilterableTableSourceFactory
 org.apache.flink.table.planner.utils.TestProjectableTableSourceFactory
 org.apache.flink.table.planner.utils.TestCsvFileSystemFormatFactory
 org.apache.flink.table.planner.utils.TestOptionsTableFactory
+

Review comment:
       remove empty line?

##########
File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala
##########
@@ -200,6 +199,65 @@ class TestTableSourceWithTime[T](
   }
 }
 
+class TestTableSourceWithTimeFactory[T] extends StreamTableSourceFactory[T] {
+  override def createStreamTableSource(properties: JMap[String, String]): StreamTableSource[T] = {
+    val dp = new DescriptorProperties()
+    dp.putProperties(properties)
+
+    val isBounded = dp.getOptionalBoolean("is-bounded").orElse(false)
+    val tableSchema = dp.getTableSchema(Schema.SCHEMA)
+    val serializedData = dp.getOptionalString("data").orElse(null)
+    val data = if (serializedData != null) {
+      EncodingUtils.decodeStringToObject(serializedData, classOf[List[T]])
+    } else {
+      Seq.empty[T]
+    }
+    val rowtimeAttributes = SchemaValidator.deriveRowtimeAttributes(dp)
+    val rowtime = if (rowtimeAttributes.isEmpty) {
+      null
+    } else {
+      rowtimeAttributes.head.getAttributeName
+    }
+    val proctimeAttribute = SchemaValidator.deriveProctimeAttribute(dp)
+    val proctime = if (proctimeAttribute.isPresent) {
+      proctimeAttribute.get()
+    } else {
+      null
+    }
+
+    val serializedMapKeys = dp.getOptionalString("map-keys").orElse(null)
+    val serializedMapVals = dp.getOptionalString("map-vals").orElse(null)
+    val mapping = if (serializedMapKeys != null && serializedMapVals != null) {
+      val mapKeys = EncodingUtils.decodeStringToObject(serializedMapKeys, classOf[List[String]])
+      val mapVals = EncodingUtils.decodeStringToObject(serializedMapVals, classOf[List[String]])
+      if (mapKeys.length != mapVals.length) {
+        null
+      } else {
+        mapKeys.zip(mapVals).toMap
+      }
+    } else {
+      null
+    }
+
+    val existingTs = dp.getOptionalString("existingTs").orElse(null)
+
+    new TestTableSourceWithTime[T](
+      isBounded, tableSchema, null, data, rowtime, proctime, mapping, existingTs)

Review comment:
       If we only use this for planning, do you need to support extracting the `data` and `mapping` and `existingTs` ? 

##########
File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala
##########
@@ -200,6 +199,65 @@ class TestTableSourceWithTime[T](
   }
 }
 
+class TestTableSourceWithTimeFactory[T] extends StreamTableSourceFactory[T] {
+  override def createStreamTableSource(properties: JMap[String, String]): StreamTableSource[T] = {
+    val dp = new DescriptorProperties()
+    dp.putProperties(properties)
+
+    val isBounded = dp.getOptionalBoolean("is-bounded").orElse(false)
+    val tableSchema = dp.getTableSchema(Schema.SCHEMA)
+    val serializedData = dp.getOptionalString("data").orElse(null)
+    val data = if (serializedData != null) {
+      EncodingUtils.decodeStringToObject(serializedData, classOf[List[T]])
+    } else {
+      Seq.empty[T]
+    }
+    val rowtimeAttributes = SchemaValidator.deriveRowtimeAttributes(dp)
+    val rowtime = if (rowtimeAttributes.isEmpty) {
+      null
+    } else {
+      rowtimeAttributes.head.getAttributeName
+    }
+    val proctimeAttribute = SchemaValidator.deriveProctimeAttribute(dp)
+    val proctime = if (proctimeAttribute.isPresent) {
+      proctimeAttribute.get()
+    } else {
+      null
+    }
+
+    val serializedMapKeys = dp.getOptionalString("map-keys").orElse(null)
+    val serializedMapVals = dp.getOptionalString("map-vals").orElse(null)
+    val mapping = if (serializedMapKeys != null && serializedMapVals != null) {
+      val mapKeys = EncodingUtils.decodeStringToObject(serializedMapKeys, classOf[List[String]])
+      val mapVals = EncodingUtils.decodeStringToObject(serializedMapVals, classOf[List[String]])
+      if (mapKeys.length != mapVals.length) {
+        null
+      } else {
+        mapKeys.zip(mapVals).toMap
+      }
+    } else {
+      null
+    }
+
+    val existingTs = dp.getOptionalString("existingTs").orElse(null)
+
+    new TestTableSourceWithTime[T](
+      isBounded, tableSchema, null, data, rowtime, proctime, mapping, existingTs)

Review comment:
       Why the `returnType` is null? Use `tableSchema.toRowType` ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org