You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/16 07:30:40 UTC
[flink] branch release-1.9 updated:
[FLINK-13216][FLINK-13153][table-planner-blink] Fix Max_Retract and
Min_Retract may produce incorrect result
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new db38ed0 [FLINK-13216][FLINK-13153][table-planner-blink] Fix Max_Retract and Min_Retract may produce incorrect result
db38ed0 is described below
commit db38ed0e503ebd244a52144e3c320df571943dd9
Author: Jark Wu <im...@gmail.com>
AuthorDate: Mon Jul 15 22:07:53 2019 +0800
[FLINK-13216][FLINK-13153][table-planner-blink] Fix Max_Retract and Min_Retract may produce incorrect result
The reason is we didn't set null to acc.max/min which may have an old value when we need to get a new max/min from an empty MapView.
And the old value will be output to downstream instead of a null value. This influences the final result.
This causes many unstable cases, including:
- AggregateITCase.testNestedGroupByAgg
- SplitAggregateITCase.testMinMaxWithRetraction
This closes #9120
---
.../aggfunctions/MaxWithRetractAggFunction.java | 6 ++--
.../aggfunctions/MinWithRetractAggFunction.java | 6 ++--
.../aggfunctions/AggFunctionTestBase.java | 42 ++++++++++++++++++++++
3 files changed, 50 insertions(+), 4 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/MaxWithRetractAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/MaxWithRetractAggFunction.java
index 0ca7bf9..4c076f6 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/MaxWithRetractAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/MaxWithRetractAggFunction.java
@@ -135,6 +135,8 @@ public abstract class MaxWithRetractAggFunction<T extends Comparable>
// when both of them are expired.
if (!hasMax) {
acc.mapSize = 0L;
+ // we should also override max value, because it may have an old value.
+ acc.max = null;
}
}
@@ -142,7 +144,7 @@ public abstract class MaxWithRetractAggFunction<T extends Comparable>
boolean needUpdateMax = false;
for (MaxWithRetractAccumulator<T> a : its) {
// set max element
- if (acc.mapSize == 0 || (a.max != null && acc.max.compareTo(a.max) < 0)) {
+ if (acc.mapSize == 0 || (a.mapSize > 0 && a.max != null && acc.max.compareTo(a.max) < 0)) {
acc.max = a.max;
}
// merge the count for each key
@@ -195,7 +197,7 @@ public abstract class MaxWithRetractAggFunction<T extends Comparable>
@Override
public T getValue(MaxWithRetractAccumulator<T> acc) {
- if (acc.mapSize != 0) {
+ if (acc.mapSize > 0) {
return acc.max;
} else {
return null;
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/MinWithRetractAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/MinWithRetractAggFunction.java
index 8636692..3daaf45 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/MinWithRetractAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/MinWithRetractAggFunction.java
@@ -135,6 +135,8 @@ public abstract class MinWithRetractAggFunction<T extends Comparable>
// when both of them are expired.
if (!hasMin) {
acc.mapSize = 0L;
+ // we should also override min value, because it may have an old value.
+ acc.min = null;
}
}
@@ -142,7 +144,7 @@ public abstract class MinWithRetractAggFunction<T extends Comparable>
boolean needUpdateMin = false;
for (MinWithRetractAccumulator<T> a : its) {
// set min element
- if (acc.mapSize == 0 || (a.min != null && acc.min.compareTo(a.min) > 0)) {
+ if (acc.mapSize == 0 || (a.mapSize > 0 && a.min != null && acc.min.compareTo(a.min) > 0)) {
acc.min = a.min;
}
// merge the count for each key
@@ -195,7 +197,7 @@ public abstract class MinWithRetractAggFunction<T extends Comparable>
@Override
public T getValue(MinWithRetractAccumulator<T> acc) {
- if (acc.mapSize != 0) {
+ if (acc.mapSize > 0) {
return acc.min;
} else {
return null;
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.java
index c448cfd..b909f58 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.java
@@ -32,6 +32,7 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.math.BigDecimal;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertEquals;
@@ -143,6 +144,47 @@ public abstract class AggFunctionTestBase<T, ACC> {
}
@Test
+ public void testMergeReservedAccumulator() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+ AggregateFunction<T, ACC> aggregator = getAggregator();
+ boolean hasMerge = UserDefinedFunctionUtils.ifMethodExistInFunction("merge", aggregator);
+ boolean hasRetract = UserDefinedFunctionUtils.ifMethodExistInFunction("retract", aggregator);
+ if (!hasMerge || !hasRetract) {
+ // this test only verify AggregateFunctions which has merge() and retract() method
+ return;
+ }
+
+ Method mergeFunc = aggregator.getClass().getMethod("merge", getAccClass(), Iterable.class);
+ List<List<T>> inputValueSets = getInputValueSets();
+ int size = getInputValueSets().size();
+
+ // iterate over input sets
+ for (int i = 0; i < size; ++i) {
+ List<T> inputValues = inputValueSets.get(i);
+ List<ACC> accumulators = new ArrayList<>();
+ List<ACC> reversedAccumulators = new ArrayList<>();
+ // prepare accumulators
+ accumulators.add(accumulateValues(inputValues));
+ // prepare reversed accumulators
+ ACC retractedAcc = aggregator.createAccumulator();
+ retractValues(retractedAcc, inputValues);
+ reversedAccumulators.add(retractedAcc);
+ // prepare accumulator only contain two elements
+ ACC accWithSubset = accumulateValues(inputValues.subList(0, 2));
+ T expectedValue = aggregator.getValue(accWithSubset);
+
+ // merge
+ ACC acc = aggregator.createAccumulator();
+ mergeFunc.invoke(aggregator, acc, accumulators);
+ mergeFunc.invoke(aggregator, acc, reversedAccumulators);
+ mergeFunc.invoke(aggregator, accWithSubset, Collections.singleton(acc));
+
+ // getValue
+ T result = aggregator.getValue(accWithSubset);
+ validateResult(expectedValue, result);
+ }
+ }
+
+ @Test
public void testResetAccumulator() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
AggregateFunction<T, ACC> aggregator = getAggregator();
if (UserDefinedFunctionUtils.ifMethodExistInFunction("resetAccumulator", aggregator)) {