You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/16 07:30:40 UTC

[flink] branch release-1.9 updated: [FLINK-13216][FLINK-13153][table-planner-blink] Fix Max_Retract and Min_Retract may produce incorrect result

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new db38ed0  [FLINK-13216][FLINK-13153][table-planner-blink] Fix Max_Retract and Min_Retract may produce incorrect result
db38ed0 is described below

commit db38ed0e503ebd244a52144e3c320df571943dd9
Author: Jark Wu <im...@gmail.com>
AuthorDate: Mon Jul 15 22:07:53 2019 +0800

    [FLINK-13216][FLINK-13153][table-planner-blink] Fix Max_Retract and Min_Retract may produce incorrect result
    
    The reason is we didn't set null to acc.max/min which may have an old value when we need to get a new max/min from an empty MapView.
    And the old value will be output to downstream instead of a null value. This influences the final result.
    
    This causes many unstable cases, including:
    - AggregateITCase.testNestedGroupByAgg
    - SplitAggregateITCase.testMinMaxWithRetraction
    
    This closes #9120
---
 .../aggfunctions/MaxWithRetractAggFunction.java    |  6 ++--
 .../aggfunctions/MinWithRetractAggFunction.java    |  6 ++--
 .../aggfunctions/AggFunctionTestBase.java          | 42 ++++++++++++++++++++++
 3 files changed, 50 insertions(+), 4 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/MaxWithRetractAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/MaxWithRetractAggFunction.java
index 0ca7bf9..4c076f6 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/MaxWithRetractAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/MaxWithRetractAggFunction.java
@@ -135,6 +135,8 @@ public abstract class MaxWithRetractAggFunction<T extends Comparable>
 		// when both of them are expired.
 		if (!hasMax) {
 			acc.mapSize = 0L;
+			// we should also override max value, because it may have an old value.
+			acc.max = null;
 		}
 	}
 
@@ -142,7 +144,7 @@ public abstract class MaxWithRetractAggFunction<T extends Comparable>
 		boolean needUpdateMax = false;
 		for (MaxWithRetractAccumulator<T> a : its) {
 			// set max element
-			if (acc.mapSize == 0 || (a.max != null && acc.max.compareTo(a.max) < 0)) {
+			if (acc.mapSize == 0 || (a.mapSize > 0 && a.max != null && acc.max.compareTo(a.max) < 0)) {
 				acc.max = a.max;
 			}
 			// merge the count for each key
@@ -195,7 +197,7 @@ public abstract class MaxWithRetractAggFunction<T extends Comparable>
 
 	@Override
 	public T getValue(MaxWithRetractAccumulator<T> acc) {
-		if (acc.mapSize != 0) {
+		if (acc.mapSize > 0) {
 			return acc.max;
 		} else {
 			return null;
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/MinWithRetractAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/MinWithRetractAggFunction.java
index 8636692..3daaf45 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/MinWithRetractAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/MinWithRetractAggFunction.java
@@ -135,6 +135,8 @@ public abstract class MinWithRetractAggFunction<T extends Comparable>
 		// when both of them are expired.
 		if (!hasMin) {
 			acc.mapSize = 0L;
+			// we should also override min value, because it may have an old value.
+			acc.min = null;
 		}
 	}
 
@@ -142,7 +144,7 @@ public abstract class MinWithRetractAggFunction<T extends Comparable>
 		boolean needUpdateMin = false;
 		for (MinWithRetractAccumulator<T> a : its) {
 			// set min element
-			if (acc.mapSize == 0 || (a.min != null && acc.min.compareTo(a.min) > 0)) {
+			if (acc.mapSize == 0 || (a.mapSize > 0 && a.min != null && acc.min.compareTo(a.min) > 0)) {
 				acc.min = a.min;
 			}
 			// merge the count for each key
@@ -195,7 +197,7 @@ public abstract class MinWithRetractAggFunction<T extends Comparable>
 
 	@Override
 	public T getValue(MinWithRetractAccumulator<T> acc) {
-		if (acc.mapSize != 0) {
+		if (acc.mapSize > 0) {
 			return acc.min;
 		} else {
 			return null;
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.java
index c448cfd..b909f58 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.java
@@ -32,6 +32,7 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.math.BigDecimal;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
@@ -143,6 +144,47 @@ public abstract class AggFunctionTestBase<T, ACC> {
 	}
 
 	@Test
+	public void testMergeReservedAccumulator() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+		AggregateFunction<T, ACC> aggregator = getAggregator();
+		boolean hasMerge = UserDefinedFunctionUtils.ifMethodExistInFunction("merge", aggregator);
+		boolean hasRetract = UserDefinedFunctionUtils.ifMethodExistInFunction("retract", aggregator);
+		if (!hasMerge || !hasRetract) {
+			// this test only verify AggregateFunctions which has merge() and retract() method
+			return;
+		}
+
+		Method mergeFunc = aggregator.getClass().getMethod("merge", getAccClass(), Iterable.class);
+		List<List<T>> inputValueSets = getInputValueSets();
+		int size = getInputValueSets().size();
+
+		// iterate over input sets
+		for (int i = 0; i < size; ++i) {
+			List<T> inputValues = inputValueSets.get(i);
+			List<ACC> accumulators = new ArrayList<>();
+			List<ACC> reversedAccumulators = new ArrayList<>();
+			// prepare accumulators
+			accumulators.add(accumulateValues(inputValues));
+			// prepare reversed accumulators
+			ACC retractedAcc = aggregator.createAccumulator();
+			retractValues(retractedAcc, inputValues);
+			reversedAccumulators.add(retractedAcc);
+			// prepare accumulator only contain two elements
+			ACC accWithSubset = accumulateValues(inputValues.subList(0, 2));
+			T expectedValue = aggregator.getValue(accWithSubset);
+
+			// merge
+			ACC acc = aggregator.createAccumulator();
+			mergeFunc.invoke(aggregator, acc, accumulators);
+			mergeFunc.invoke(aggregator, acc, reversedAccumulators);
+			mergeFunc.invoke(aggregator, accWithSubset, Collections.singleton(acc));
+
+			// getValue
+			T result = aggregator.getValue(accWithSubset);
+			validateResult(expectedValue, result);
+		}
+	}
+
+	@Test
 	public void testResetAccumulator() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
 		AggregateFunction<T, ACC> aggregator = getAggregator();
 		if (UserDefinedFunctionUtils.ifMethodExistInFunction("resetAccumulator", aggregator)) {