You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/07/28 14:49:06 UTC
[3/3] flink git commit: [FLINK-7287] [kafka,
tests] Fix test instabilities in KafkaConsumerTestBase
[FLINK-7287] [kafka, tests] Fix test instabilities in KafkaConsumerTestBase
This closes #4414.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/452f5d10
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/452f5d10
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/452f5d10
Branch: refs/heads/release-1.3
Commit: 452f5d1032f61a29726dd484453a256c7d57d052
Parents: 4382464
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Jul 27 18:31:13 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jul 28 22:47:46 2017 +0800
----------------------------------------------------------------------
.../connectors/kafka/KafkaConsumerTestBase.java | 21 ++++++++++----------
1 file changed, 11 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/452f5d10/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 1de8e6f..3467237 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -216,7 +216,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
env.execute();
}
catch (Throwable t) {
- if (!(t.getCause() instanceof JobCancellationException)) {
+ if (!(t instanceof JobCancellationException)) {
errorRef.set(t);
}
}
@@ -242,8 +242,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
}
while (System.nanoTime() < deadline);
- // cancel the job
+ // cancel the job & wait for the job to finish
JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+ runner.join();
final Throwable t = errorRef.get();
if (t != null) {
@@ -302,7 +303,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
env.execute();
}
catch (Throwable t) {
- if (!(t.getCause() instanceof JobCancellationException)) {
+ if (!(t instanceof JobCancellationException)) {
errorRef.set(t);
}
}
@@ -327,8 +328,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
}
while (System.nanoTime() < deadline);
- // cancel the job
+ // cancel the job & wait for the job to finish
JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+ runner.join();
final Throwable t = errorRef.get();
if (t != null) {
@@ -1611,9 +1613,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
kafkaServer.produceIntoKafka(fromGen, topic, new KeyedSerializationSchemaWrapper<>(schema), standardProps, null);
env1.execute("Metrics test job");
- } catch(Throwable t) {
- LOG.warn("Got exception during execution", t);
- if(!(t instanceof JobCancellationException)) { // we'll cancel the job
+ } catch (Throwable t) {
+ if (!(t instanceof JobCancellationException)) { // we'll cancel the job
+ LOG.warn("Got exception during execution", t);
error.f0 = t;
}
}
@@ -1664,11 +1666,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
} finally {
// cancel
JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+ // wait for the job to finish (it should due to the cancel command above)
+ jobThread.join();
}
- while (jobThread.isAlive()) {
- Thread.sleep(50);
- }
if (error.f0 != null) {
throw error.f0;
}