You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/09 20:16:43 UTC
[4/5] flink git commit: [FLINK-2648] [tests] Fix flaky
CombineTaskTest and improve cancelling in GroupReduceCombineDriver
[FLINK-2648] [tests] Fix flaky CombineTaskTest and improve cancelling in GroupReduceCombineDriver
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/361947d6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/361947d6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/361947d6
Branch: refs/heads/master
Commit: 361947d6c5490f0c6d04ffec709a353995aad373
Parents: 4b6eae5
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Sep 9 18:59:43 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Sep 9 19:10:51 2015 +0200
----------------------------------------------------------------------
.../runtime/operators/GroupReduceCombineDriver.java | 4 +++-
.../apache/flink/runtime/operators/CombineTaskTest.java | 11 +++++++++--
2 files changed, 12 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/361947d6/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
index 028ed95..7115a4d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
@@ -184,7 +184,9 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin
}
// sort, combine, and send the final batch
- sortAndCombine();
+ if (running) {
+ sortAndCombine();
+ }
}
private void sortAndCombine() throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/361947d6/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
index 932e746..b6ce2d4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
@@ -53,10 +53,12 @@ public class CombineTaskTest
private final ArrayList<Tuple2<Integer, Integer>> outList = new ArrayList<Tuple2<Integer, Integer>>();
+ @SuppressWarnings("unchecked")
private final TypeSerializer<Tuple2<Integer, Integer>> serializer = new TupleSerializer<Tuple2<Integer, Integer>>(
(Class<Tuple2<Integer, Integer>>) (Class<?>) Tuple2.class,
new TypeSerializer<?>[] { IntSerializer.INSTANCE, IntSerializer.INSTANCE });
+
private final TypeComparator<Tuple2<Integer, Integer>> comparator = new TupleComparator<Tuple2<Integer, Integer>>(
new int[]{0},
new TypeComparator<?>[] { new IntComparator(true) },
@@ -179,9 +181,14 @@ public class CombineTaskTest
testTask.cancel();
// make sure it reacts to the canceling in some time
- taskRunner.join(5000);
+ long deadline = System.currentTimeMillis() + 10000;
+ do {
+ taskRunner.interrupt();
+ taskRunner.join(5000);
+ }
+ while (taskRunner.isAlive() && System.currentTimeMillis() < deadline);
- assertFalse("Task did not cancel properly within in 5 seconds.", taskRunner.isAlive());
+ assertFalse("Task did not cancel properly within in 10 seconds.", taskRunner.isAlive());
}
catch (Exception e) {
e.printStackTrace();