You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/07/28 14:49:06 UTC

[3/3] flink git commit: [FLINK-7287] [kafka, tests] Fix test instabilities in KafkaConsumerTestBase

[FLINK-7287] [kafka, tests] Fix test instabilities in KafkaConsumerTestBase

This closes #4414.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/452f5d10
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/452f5d10
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/452f5d10

Branch: refs/heads/release-1.3
Commit: 452f5d1032f61a29726dd484453a256c7d57d052
Parents: 4382464
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Jul 27 18:31:13 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jul 28 22:47:46 2017 +0800

----------------------------------------------------------------------
 .../connectors/kafka/KafkaConsumerTestBase.java | 21 ++++++++++----------
 1 file changed, 11 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/452f5d10/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 1de8e6f..3467237 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -216,7 +216,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 					env.execute();
 				}
 				catch (Throwable t) {
-					if (!(t.getCause() instanceof JobCancellationException)) {
+					if (!(t instanceof JobCancellationException)) {
 						errorRef.set(t);
 					}
 				}
@@ -242,8 +242,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		}
 		while (System.nanoTime() < deadline);
 
-		// cancel the job
+		// cancel the job & wait for the job to finish
 		JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+		runner.join();
 
 		final Throwable t = errorRef.get();
 		if (t != null) {
@@ -302,7 +303,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 					env.execute();
 				}
 				catch (Throwable t) {
-					if (!(t.getCause() instanceof JobCancellationException)) {
+					if (!(t instanceof JobCancellationException)) {
 						errorRef.set(t);
 					}
 				}
@@ -327,8 +328,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		}
 		while (System.nanoTime() < deadline);
 
-		// cancel the job
+		// cancel the job & wait for the job to finish
 		JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+		runner.join();
 
 		final Throwable t = errorRef.get();
 		if (t != null) {
@@ -1611,9 +1613,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 					kafkaServer.produceIntoKafka(fromGen, topic, new KeyedSerializationSchemaWrapper<>(schema), standardProps, null);
 
 					env1.execute("Metrics test job");
-				} catch(Throwable t) {
-					LOG.warn("Got exception during execution", t);
-					if(!(t instanceof JobCancellationException)) { // we'll cancel the job
+				} catch (Throwable t) {
+					if (!(t instanceof JobCancellationException)) { // we'll cancel the job
+						LOG.warn("Got exception during execution", t);
 						error.f0 = t;
 					}
 				}
@@ -1664,11 +1666,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		} finally {
 			// cancel
 			JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+			// wait for the job to finish (it should due to the cancel command above)
+			jobThread.join();
 		}
 
-		while (jobThread.isAlive()) {
-			Thread.sleep(50);
-		}
 		if (error.f0 != null) {
 			throw error.f0;
 		}