You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/03/30 22:04:29 UTC

[18/50] [abbrv] flink git commit: [FLINK-5890] [gelly] GatherSumApply broken when object reuse enabled

[FLINK-5890] [gelly] GatherSumApply broken when object reuse enabled

The initial fix for this ticket is not working on larger data sets.

Reduce supports returning the left input, right input, a new object, or
a locally reused object. The trouble with the initial fix was that the
returned local object was reusing fields from the input tuples.

The problem is with ReduceDriver#run managing two values (reuse1 and
reuse2) and with a third, local value returned by
GatherSumApplyIteration.SumUDF. After the first grouping value.f1 ==
reuse1.f1. Following UDF calls may swap value.f1 and reuse2.f1, which
causes reuse1.f1 == reuse2.f1. With an odd number of swaps the next
grouping will reduce with reuse1 and reuse2 sharing a field and
deserialization will overwrite stored values.

The simple fix is to only use and return the provided inputs.

This closes #3515


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/524b20f2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/524b20f2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/524b20f2

Branch: refs/heads/table-retraction
Commit: 524b20f2db70fc4afba3a539fbf249c6d768ab4f
Parents: 4b19e27
Author: Greg Hogan <co...@greghogan.com>
Authored: Fri Mar 10 16:44:27 2017 -0500
Committer: Greg Hogan <co...@greghogan.com>
Committed: Fri Mar 24 11:03:12 2017 -0400

----------------------------------------------------------------------
 .../org/apache/flink/graph/gsa/GatherSumApplyIteration.java     | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/524b20f2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
index e941b7b..5c07a73 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
@@ -330,7 +330,6 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
 
 		@Override
 		public Tuple2<K, M> reduce(Tuple2<K, M> arg0, Tuple2<K, M> arg1) throws Exception {
-			K key = arg0.f0;
 			M result = this.sumFunction.sum(arg0.f1, arg1.f1);
 
 			// if the user returns value from the right argument then swap as
@@ -339,9 +338,11 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
 				M tmp = arg1.f1;
 				arg1.f1 = arg0.f1;
 				arg0.f1 = tmp;
+			} else {
+				arg0.f1 = result;
 			}
 
-			return new Tuple2<>(key, result);
+			return arg0;
 		}
 
 		@Override