You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/09 20:16:43 UTC

[4/5] flink git commit: [FLINK-2648] [tests] Fix flaky CombineTaskTest and improve cancelling in GroupReduceCombineDriver

[FLINK-2648] [tests] Fix flaky CombineTaskTest and improve cancelling in GroupReduceCombineDriver


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/361947d6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/361947d6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/361947d6

Branch: refs/heads/master
Commit: 361947d6c5490f0c6d04ffec709a353995aad373
Parents: 4b6eae5
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Sep 9 18:59:43 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Sep 9 19:10:51 2015 +0200

----------------------------------------------------------------------
 .../runtime/operators/GroupReduceCombineDriver.java      |  4 +++-
 .../apache/flink/runtime/operators/CombineTaskTest.java  | 11 +++++++++--
 2 files changed, 12 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/361947d6/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
index 028ed95..7115a4d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
@@ -184,7 +184,9 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin
 		}
 
 		// sort, combine, and send the final batch
-		sortAndCombine();
+		if (running) {
+			sortAndCombine();
+		}
 	}
 
 	private void sortAndCombine() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/361947d6/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
index 932e746..b6ce2d4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
@@ -53,10 +53,12 @@ public class CombineTaskTest
 	
 	private final ArrayList<Tuple2<Integer, Integer>> outList = new ArrayList<Tuple2<Integer, Integer>>();
 
+	@SuppressWarnings("unchecked")
 	private final TypeSerializer<Tuple2<Integer, Integer>> serializer = new TupleSerializer<Tuple2<Integer, Integer>>(
 			(Class<Tuple2<Integer, Integer>>) (Class<?>) Tuple2.class,
 			new TypeSerializer<?>[] { IntSerializer.INSTANCE, IntSerializer.INSTANCE });
 	
+	
 	private final TypeComparator<Tuple2<Integer, Integer>> comparator = new TupleComparator<Tuple2<Integer, Integer>>(
 			new int[]{0},
 			new TypeComparator<?>[] { new IntComparator(true) },
@@ -179,9 +181,14 @@ public class CombineTaskTest
 			testTask.cancel();
 			
 			// make sure it reacts to the canceling in some time
-			taskRunner.join(5000);
+			long deadline = System.currentTimeMillis() + 10000;
+			do {
+				taskRunner.interrupt();
+				taskRunner.join(5000);
+			}
+			while (taskRunner.isAlive() && System.currentTimeMillis() < deadline);
 			
-			assertFalse("Task did not cancel properly within in 5 seconds.", taskRunner.isAlive());
+			assertFalse("Task did not cancel properly within in 10 seconds.", taskRunner.isAlive());
 		}
 		catch (Exception e) {
 			e.printStackTrace();