You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/04/20 06:44:38 UTC

[GitHub] [flink] godfreyhe commented on a change in pull request #11797: [FLINK-17169][table-blink] Refactor BaseRow to use RowKind instead of byte header

godfreyhe commented on a change in pull request #11797:
URL: https://github.com/apache/flink/pull/11797#discussion_r411073864



##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java
##########
@@ -129,13 +130,15 @@ public int getArity() {
 	}
 
 	@Override
-	public byte getHeader() {
+	public RowKind getRowKind() {
 		// first nullBitsSizeInBytes byte is header.
-		return segments[0].get(offset);
+		byte header = segments[0].get(offset);
+		return RowKind.values()[header];

Review comment:
       big +1. If we add a new enum value in the middle, the index will also change.

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGlobalGroupAggFunction.java
##########
@@ -199,27 +196,27 @@ public void finishBundle(Map<BaseRow, BaseRow> buffer, Collector<BaseRow> out) t
 						// new row is not same with prev row
 						if (generateUpdateBefore) {
 							// prepare retraction message for previous row

Review comment:
       please update the comment

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ObjectArrayRow.java
##########
@@ -113,13 +113,7 @@ public BaseRow getRow(int ordinal, int numFields) {
 	@Override
 	public String toString() {
 		StringBuilder sb = new StringBuilder();
-		sb.append("(");
-		if (BaseRowUtil.isAccumulateMsg(this)) {
-			sb.append("+");
-		} else {
-			sb.append("-");
-		}
-		sb.append("|");
+		sb.append(rowKind.shortString()).append("(");

Review comment:
       unify the format of `toString`.  in `BinaryRow`, the format is [UA|...]

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java
##########
@@ -127,6 +127,10 @@ public void processElement2(StreamRecord<BaseRow> element) throws Exception {
 	 * method is too complex, so we provide the pseudo code to help understand the logic. We should
 	 * keep sync the following pseudo code with the real logic of the method.
 	 *
+	 * <p>Note: "+" represents "INSERT", "-" represents "DELETE", "*" represents input row kind.

Review comment:
       consistent with the shortString value of RowKind ?

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/JoinedRow.java
##########
@@ -17,14 +17,16 @@
 
 package org.apache.flink.table.dataformat;
 
+import org.apache.flink.types.RowKind;
+
 /**
  * Join two row to one row.
  */
 public final class JoinedRow implements BaseRow {
 
 	private BaseRow row1;
 	private BaseRow row2;
-	private byte header;
+	private RowKind rowKind = RowKind.INSERT;

Review comment:
       It's better we remove default value to avoid forgetting to set RowKind somewhere

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
##########
@@ -35,25 +37,29 @@
 	 *
 	 * @param currentRow latest row received by deduplicate function
 	 * @param generateUpdateBefore whether need to send UPDATE_BEFORE message for updates
-	 * @param state state of function
+	 * @param state state of function, null if generateUpdateBefore is false
 	 * @param out underlying collector
 	 */
 	static void processLastRow(
 			BaseRow currentRow,
 			boolean generateUpdateBefore,
-			ValueState<BaseRow> state,
+			@Nullable ValueState<BaseRow> state,
 			Collector<BaseRow> out) throws Exception {
-		// Check message should be accumulate
-		Preconditions.checkArgument(BaseRowUtil.isAccumulateMsg(currentRow));
+		// check message should be insert only.
+		Preconditions.checkArgument(currentRow.getRowKind() == RowKind.INSERT);
 		if (generateUpdateBefore) {
-			// state stores complete row if generateUpdateBefore is true
+			// state is not null when generateUpdateBefore is enabled,
+			// the state stores complete row
 			BaseRow preRow = state.value();
 			state.update(currentRow);
 			if (preRow != null) {
-				preRow.setHeader(BaseRowUtil.RETRACT_MSG);
+				preRow.setRowKind(RowKind.UPDATE_BEFORE);
 				out.collect(preRow);
 			}
 		}
+		// in order for better performance, we don't have state for LastRow
+		// if not generate UPDATE_BEFORE, thus, we can't produce INSERT messages for first row.

Review comment:
       If the downstream is a database, Sink will execute a INSERT or a UPDATE statement for a row based on RowKind. If there is no INSERT message, how to insert the first row ?

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java
##########
@@ -182,14 +188,16 @@ private void processElement(
 			boolean inputIsLeft) throws Exception {
 		boolean inputIsOuter = inputIsLeft ? leftIsOuter : rightIsOuter;
 		boolean otherIsOuter = inputIsLeft ? rightIsOuter : leftIsOuter;
+		RowKind inputRowKind = input.getRowKind();
+		input.setRowKind(RowKind.INSERT); // erase RowKind for later state updating
 
 		AssociatedRecords associatedRecords = AssociatedRecords.of(input, inputIsLeft, otherSideStateView, joinCondition);
-		if (BaseRowUtil.isAccumulateMsg(input)) { // record is accumulate
+		if (inputRowKind == RowKind.INSERT || inputRowKind == RowKind.UPDATE_AFTER) { // record is accumulate

Review comment:
       boolean isAccumulateMsg = BaseRowUtil.isAccumulateMsg(input);
   ...
   if (isAccumulateMsg) ...

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ObjectArrayRow.java
##########
@@ -27,7 +27,7 @@
  */
 public abstract class ObjectArrayRow implements BaseRow {
 
-	private byte header;
+	private RowKind rowKind = RowKind.INSERT; // INSERT as default

Review comment:
       ditto

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
##########
@@ -153,7 +153,6 @@ public void open() throws Exception {
 			TIMERS_STATE_NAME, VoidNamespaceSerializer.INSTANCE, this);
 		collector = new TimestampedCollector<>(output);
 		outRow = new JoinedRow();
-		outRow.setHeader(BaseRowUtil.ACCUMULATE_MSG);

Review comment:
       expicity call `outRow.setRowKind(RowKind.INSERT)`

##########
File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala
##########
@@ -75,57 +75,57 @@ class GroupAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase(
     // register cleanup timer with 3001
     testHarness.setProcessingTime(1)
 
-    // accumulate
-    testHarness.processElement(new StreamRecord(binaryrow("aaa", 1L: JLong), 1))
-    expectedOutput.add(new StreamRecord(binaryrow("aaa", 1L: JLong), 1))
+    // insertion
+    testHarness.processElement(binaryRecord(INSERT,"aaa", 1L: JLong))

Review comment:
       is `timestamp` unnecessary ?

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingSemiAntiJoinOperator.java
##########
@@ -147,8 +154,11 @@ public void processElement1(StreamRecord<BaseRow> element) throws Exception {
 	@Override
 	public void processElement2(StreamRecord<BaseRow> element) throws Exception {
 		BaseRow input = element.getValue();
+		RowKind inputRowKind = input.getRowKind();
+		input.setRowKind(RowKind.INSERT); // erase RowKind for later state updating
+
 		AssociatedRecords associatedRecords = AssociatedRecords.of(input, false, leftRecordStateView, joinCondition);
-		if (BaseRowUtil.isAccumulateMsg(input)) {
+		if (inputRowKind == RowKind.INSERT || inputRowKind == RowKind.UPDATE_AFTER) { // record is accumulate

Review comment:
       ditto

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/WindowOperatorBuilder.java
##########
@@ -156,13 +156,13 @@ public WindowOperatorBuilder withAllowedLateness(Duration allowedLateness) {
 		if (allowedLateness.toMillis() > 0) {
 			this.allowedLateness = allowedLateness.toMillis();
 			// allow late element, which means this window will send retractions
-			this.sendRetraction = true;
+			this.produceUpdates = true;
 		}
 		return this;
 	}
 
-	public WindowOperatorBuilder withSendRetraction() {
-		this.sendRetraction = true;
+	public WindowOperatorBuilder withProduceUpdates() {

Review comment:
       how about renaming to `produceUpdates` ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org