You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/21 03:29:47 UTC

[GitHub] [flink] danny0405 commented on a change in pull request #13331: [FLINK-19079][table-runtime] Import rowtime deduplicate operator

danny0405 commented on a change in pull request #13331:
URL: https://github.com/apache/flink/pull/13331#discussion_r508472527



##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDeduplicate.scala
##########
@@ -102,44 +110,109 @@ class StreamExecDeduplicate(
       .asInstanceOf[Transformation[RowData]]
 
     val rowTypeInfo = inputTransform.getOutputType.asInstanceOf[InternalTypeInfo[RowData]]
+    val inputFieldTypes = rowTypeInfo.toRowFieldTypes
+    val keyFieldTypes = new Array[LogicalType](uniqueKeys.length)
+    for (i <- 0 until uniqueKeys.length) {
+      keyFieldTypes(i) = inputFieldTypes(uniqueKeys(i))
+    }
+

Review comment:
       You can use `uniqueKeys .map(idx => inputFieldTypes(idx))` directly.

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDeduplicate.scala
##########
@@ -102,44 +110,109 @@ class StreamExecDeduplicate(
       .asInstanceOf[Transformation[RowData]]
 
     val rowTypeInfo = inputTransform.getOutputType.asInstanceOf[InternalTypeInfo[RowData]]
+    val inputFieldTypes = rowTypeInfo.toRowFieldTypes
+    val keyFieldTypes = new Array[LogicalType](uniqueKeys.length)
+    for (i <- 0 until uniqueKeys.length) {
+      keyFieldTypes(i) = inputFieldTypes(uniqueKeys(i))
+    }
+
     val generateUpdateBefore = ChangelogPlanUtils.generateUpdateBefore(this)
     val tableConfig = planner.getTableConfig
     val generateInsert = tableConfig.getConfiguration
       .getBoolean(TABLE_EXEC_INSERT_AND_UPDATE_AFTER_SENSITIVE)
     val isMiniBatchEnabled = tableConfig.getConfiguration.getBoolean(
       ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED)
     val minRetentionTime = tableConfig.getMinIdleStateRetentionTime
-    val operator = if (isMiniBatchEnabled) {
-      val exeConfig = planner.getExecEnv.getConfig
-      val rowSerializer = rowTypeInfo.createSerializer(exeConfig)
+
+    val rowtimeField = input.getRowType.getFieldList
+      .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
+    val rowtimeIndex = if (isRowtime) {
+      Preconditions.checkArgument(rowtimeField.nonEmpty)
+      rowtimeField.get(0).getIndex
+    } else {
+      -1
+    }
+
+    val miniBatchsize = if (isMiniBatchEnabled) {
+      val size = tableConfig.getConfiguration.getLong(
+        ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE)
+      Preconditions.checkArgument(size > 0)
+      size
+    } else {
+      -1L
+    }
+    val exeConfig = planner.getExecEnv.getConfig
+    val rowSerializer = rowTypeInfo.createSerializer(exeConfig)
+
+    val operator = if (isRowtime) {
+      if(isMiniBatchEnabled) {
+        val processFunction = if (keepLastRow) {
+          new RowTimeMiniBatchDeduplicateKeepLastRowFunction(
+            rowTypeInfo,

Review comment:
       Can we have 2 sub-class here ? One is for `proctime` another is for `rowtime`, in each class, we can have a method to return the row function `getRowFunctction(isMiniBatchEnabled, keepLastRow)`.
   
   There are too many if else branches here and it is hard to maintain.

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
##########
@@ -93,6 +95,239 @@ static void processFirstRow(
 		out.collect(currentRow);
 	}
 
+	/**
+	 * Processes element to deduplicate on keys with row time semantic, sends current element if it is last row,
+	 * retracts previous element if needed.
+	 *
+	 * @param state state of function
+	 * @param currentRow latest row received by deduplicate function
+	 * @param serializer serializer to serialize the data
+	 * @param out underlying collector
+	 * @param rowtimeIndex index of row time field
+	 * @param generateUpdateBefore flag to generate UPDATE_BEFORE message or not
+	 * @param generateInsert flag to gennerate INSERT message or not
+	 */
+	static void processLastRowOnRowtime(
+			ValueState<RowData> state,
+			RowData currentRow,
+			TypeSerializer<RowData> serializer,
+			Collector<RowData> out,
+			int rowtimeIndex,
+			boolean generateUpdateBefore,
+			boolean generateInsert) throws Exception {
+
+		checkInsertOnly(currentRow);
+		RowData prevRow = state.value();
+		if (!isLastRow(prevRow, currentRow, rowtimeIndex)) {
+			return;
+		}
+		state.update(currentRow);
+
+		// store all needed data to state
+		if (generateUpdateBefore || generateInsert) {
+			if (prevRow == null) {
+				// the first row, send INSERT message
+				currentRow.setRowKind(RowKind.INSERT);
+				out.collect(currentRow);
+			} else {
+				if (generateUpdateBefore) {
+					RowData copyRow = serializer.copy(prevRow);
+					copyRow.setRowKind(RowKind.UPDATE_BEFORE);

Review comment:
       Do not need copy.

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
##########
@@ -93,6 +95,239 @@ static void processFirstRow(
 		out.collect(currentRow);
 	}
 
+	/**
+	 * Processes element to deduplicate on keys with row time semantic, sends current element if it is last row,
+	 * retracts previous element if needed.
+	 *
+	 * @param state state of function
+	 * @param currentRow latest row received by deduplicate function
+	 * @param serializer serializer to serialize the data
+	 * @param out underlying collector
+	 * @param rowtimeIndex index of row time field
+	 * @param generateUpdateBefore flag to generate UPDATE_BEFORE message or not
+	 * @param generateInsert flag to gennerate INSERT message or not
+	 */
+	static void processLastRowOnRowtime(
+			ValueState<RowData> state,
+			RowData currentRow,
+			TypeSerializer<RowData> serializer,
+			Collector<RowData> out,
+			int rowtimeIndex,
+			boolean generateUpdateBefore,
+			boolean generateInsert) throws Exception {
+
+		checkInsertOnly(currentRow);
+		RowData prevRow = state.value();
+		if (!isLastRow(prevRow, currentRow, rowtimeIndex)) {
+			return;
+		}
+		state.update(currentRow);
+
+		// store all needed data to state
+		if (generateUpdateBefore || generateInsert) {
+			if (prevRow == null) {
+				// the first row, send INSERT message
+				currentRow.setRowKind(RowKind.INSERT);
+				out.collect(currentRow);
+			} else {
+				if (generateUpdateBefore) {
+					RowData copyRow = serializer.copy(prevRow);
+					copyRow.setRowKind(RowKind.UPDATE_BEFORE);
+					out.collect(copyRow);
+				}
+				currentRow.setRowKind(RowKind.UPDATE_AFTER);
+				out.collect(currentRow);
+			}
+		} else {
+			currentRow.setRowKind(RowKind.UPDATE_AFTER);
+			out.collect(currentRow);
+		}
+	}
+
+	/**
+	 * Processes element to deduplicate on keys with row time semantic, sends current element if it is last row,
+	 * retracts previous element if needed.
+	 *
+	 * @param state state of function
+	 * @param bufferedRows latest rows received by deduplicate function
+	 * @param serializer serializer to serialize the data
+	 * @param out underlying collector
+	 * @param rowtimeIndex index of row time field
+	 * @param generateUpdateBefore flag to generate UPDATE_BEFORE message or not
+	 * @param generateInsert flag to gennerate INSERT message or not
+	 */
+	static void processLastRowOnRowtime(
+		ValueState<RowData> state,
+		List<RowData> bufferedRows,
+		TypeSerializer<RowData> serializer,
+		Collector<RowData> out,
+		int rowtimeIndex,
+		boolean generateUpdateBefore,
+		boolean generateInsert) throws Exception {
+
+		if (bufferedRows == null) {
+			return;
+		}
+
+		RowData prevRow = state.value();
+		for (RowData currentRow: bufferedRows) {
+			checkInsertOnly(currentRow);
+			if (!isLastRow(prevRow, currentRow, rowtimeIndex)) {
+				continue;
+			}
+			// store all needed data to state
+			if (generateUpdateBefore || generateInsert) {
+				if (prevRow == null) {
+					// the first row, send INSERT message
+					currentRow.setRowKind(RowKind.INSERT);
+					out.collect(currentRow);
+				} else {
+					if (generateUpdateBefore) {
+						RowData copyRow = serializer.copy(prevRow);
+						copyRow.setRowKind(RowKind.UPDATE_BEFORE);
+						out.collect(copyRow);
+					}

Review comment:
       Can reuse the code in 
   ```java
   static void processLastRowOnRowtime(
   			ValueState<RowData> state,
   			RowData currentRow,
   			TypeSerializer<RowData> serializer,
   			Collector<RowData> out,
   			int rowtimeIndex,
   			boolean generateUpdateBefore,
   			boolean generateInsert)
   ```

##########
File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
##########
@@ -635,7 +635,25 @@ class FlinkRelMdHandlerTestBase {
   //  select a, b, c, proctime
   //  ROW_NUMBER() over (partition by b, c order by proctime desc) rn from TemporalTable3
   // ) t where rn <= 1
-  protected lazy val (streamDeduplicateFirstRow, streamDeduplicateLastRow) = {
+  protected lazy val (streamProcTimeDeduplicateFirstRow, streamProcTimeDeduplicateLastRow) = {
+    buildFirstRowAndLastRowDeduplicateNode(false)
+  }
+
+  // equivalent SQL is
+  // select a, b, c from (
+  //  select a, b, c, rowtime
+  //  ROW_NUMBER() over (partition by b order by rowtime) rn from TemporalTable3
+  // ) t where rn <= 1
+  //
+  // select a, b, c from (
+  //  select a, b, c, proctime
+  //  ROW_NUMBER() over (partition by b, c order by rowtime desc) rn from TemporalTable3

Review comment:
       `proctime` => `rowtime` ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org