You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/06 13:23:49 UTC

[4/7] flink git commit: [FLINK-8703][tests] Port KafkaTestBase to MiniClusterResource

[FLINK-8703][tests] Port KafkaTestBase to MiniClusterResource

This closes #5669.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7d6afed8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7d6afed8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7d6afed8

Branch: refs/heads/master
Commit: 7d6afed83e370c8e36f6a690704041417e580498
Parents: 4c337dc
Author: zentol <ch...@apache.org>
Authored: Wed Mar 7 13:39:25 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Fri Apr 6 15:23:01 2018 +0200

----------------------------------------------------------------------
 .../connectors/kafka/KafkaConsumerTestBase.java | 219 +++++++++++--------
 .../connectors/kafka/KafkaTestBase.java         |  25 +--
 .../testutils/ClusterCommunicationUtils.java    |  56 +++++
 .../testutils/JobManagerCommunicationUtils.java | 147 -------------
 4 files changed, 186 insertions(+), 261 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7d6afed8/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 959d6f1..6ed9143 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
@@ -34,6 +35,7 @@ import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.DataInputView;
@@ -42,7 +44,8 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.client.JobCancellationException;
 import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -54,11 +57,11 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators;
 import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
-import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
 import org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidatingMapper;
 import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper;
 import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2FlinkPartitioner;
@@ -72,6 +75,9 @@ import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.testutils.junit.RetryOnException;
 import org.apache.flink.testutils.junit.RetryRule;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
 
 import kafka.consumer.Consumer;
 import kafka.consumer.ConsumerConfig;
@@ -106,10 +112,12 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.flink.streaming.connectors.kafka.testutils.ClusterCommunicationUtils.getRunningJobs;
+import static org.apache.flink.streaming.connectors.kafka.testutils.ClusterCommunicationUtils.waitUntilJobIsRunning;
+import static org.apache.flink.streaming.connectors.kafka.testutils.ClusterCommunicationUtils.waitUntilNoJobIsRunning;
 import static org.apache.flink.test.util.TestUtils.tryExecute;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -123,6 +131,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	@Rule
 	public RetryRule retryRule = new RetryRule();
 
+	private ClusterClient<?> client;
+
 	// ------------------------------------------------------------------------
 	//  Common Test Preparation
 	// ------------------------------------------------------------------------
@@ -132,8 +142,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	 * the same mini cluster. Otherwise, missing slots may happen.
 	 */
 	@Before
-	public void ensureNoJobIsLingering() throws Exception {
-		JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
+	public void setClientAndEnsureNoJobIsLingering() throws Exception {
+		client = flink.getClusterClient();
+		waitUntilNoJobIsRunning(client);
 	}
 
 	// ------------------------------------------------------------------------
@@ -244,7 +255,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		while (System.nanoTime() < deadline);
 
 		// cancel the job & wait for the job to finish
-		JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+		client.cancel(Iterables.getOnlyElement(getRunningJobs(client)));
 		runner.join();
 
 		final Throwable t = errorRef.get();
@@ -330,7 +341,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		while (System.nanoTime() < deadline);
 
 		// cancel the job & wait for the job to finish
-		JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+		client.cancel(Iterables.getOnlyElement(getRunningJobs(client)));
 		runner.join();
 
 		final Throwable t = errorRef.get();
@@ -443,14 +454,18 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			}).setParallelism(1)
 			.addSink(new DiscardingSink<>());
 
+		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+		final JobID consumeJobId = jobGraph.getJobID();
+
 		final AtomicReference<Throwable> error = new AtomicReference<>();
 		Thread consumeThread = new Thread(new Runnable() {
 			@Override
 			public void run() {
 				try {
-					env.execute(consumeExtraRecordsJobName);
+					client.setDetached(false);
+					client.submitJob(jobGraph, KafkaConsumerTestBase.class.getClassLoader());
 				} catch (Throwable t) {
-					if (!(t instanceof JobCancellationException)) {
+					if (!ExceptionUtils.findThrowable(t, JobCancellationException.class).isPresent()) {
 						error.set(t);
 					}
 				}
@@ -459,9 +474,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		consumeThread.start();
 
 		// wait until the consuming job has started, to be extra safe
-		JobManagerCommunicationUtils.waitUntilJobIsRunning(
-			flink.getLeaderGateway(timeout),
-			consumeExtraRecordsJobName);
+		waitUntilJobIsRunning(client);
 
 		// setup the extra records writing job
 		final StreamExecutionEnvironment env2 = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -500,9 +513,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		}
 
 		// cancel the consume job after all extra records are written
-		JobManagerCommunicationUtils.cancelCurrentJob(
-			flink.getLeaderGateway(timeout),
-			consumeExtraRecordsJobName);
+		client.cancel(consumeJobId);
 		consumeThread.join();
 
 		kafkaOffsetHandler.close();
@@ -989,23 +1000,27 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		final AtomicReference<Throwable> jobError = new AtomicReference<>();
 
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(parallelism);
+		env.enableCheckpointing(100);
+		env.getConfig().disableSysoutLogging();
+
+		Properties props = new Properties();
+		props.putAll(standardProps);
+		props.putAll(secureProps);
+		FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), props);
+
+		env.addSource(source).addSink(new DiscardingSink<String>());
+
+		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+		final JobID jobId = jobGraph.getJobID();
+
 		final Runnable jobRunner = new Runnable() {
 			@Override
 			public void run() {
 				try {
-					final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-					env.setParallelism(parallelism);
-					env.enableCheckpointing(100);
-					env.getConfig().disableSysoutLogging();
-
-					Properties props = new Properties();
-					props.putAll(standardProps);
-					props.putAll(secureProps);
-					FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), props);
-
-					env.addSource(source).addSink(new DiscardingSink<String>());
-
-					env.execute("Runner for CancelingOnFullInputTest");
+					client.setDetached(false);
+					client.submitJob(jobGraph, KafkaConsumerTestBase.class.getClassLoader());
 				}
 				catch (Throwable t) {
 					jobError.set(t);
@@ -1026,14 +1041,12 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		}
 
 		// cancel
-		JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout), "Runner for CancelingOnFullInputTest");
+		client.cancel(jobId);
 
 		// wait for the program to be done and validate that we failed with the right exception
 		runnerThread.join();
 
-		failueCause = jobError.get();
-		assertNotNull("program did not fail properly due to canceling", failueCause);
-		assertTrue(failueCause.getMessage().contains("Job was cancelled"));
+		assertEquals(JobStatus.CANCELED, client.getJobStatus(jobId).get());
 
 		if (generator.isAlive()) {
 			generator.shutdown();
@@ -1063,23 +1076,27 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		final AtomicReference<Throwable> error = new AtomicReference<>();
 
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(parallelism);
+		env.enableCheckpointing(100);
+		env.getConfig().disableSysoutLogging();
+
+		Properties props = new Properties();
+		props.putAll(standardProps);
+		props.putAll(secureProps);
+		FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), props);
+
+		env.addSource(source).addSink(new DiscardingSink<String>());
+
+		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+		final JobID jobId = jobGraph.getJobID();
+
 		final Runnable jobRunner = new Runnable() {
 			@Override
 			public void run() {
 				try {
-					final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-					env.setParallelism(parallelism);
-					env.enableCheckpointing(100);
-					env.getConfig().disableSysoutLogging();
-
-					Properties props = new Properties();
-					props.putAll(standardProps);
-					props.putAll(secureProps);
-					FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), props);
-
-					env.addSource(source).addSink(new DiscardingSink<String>());
-
-					env.execute("CancelingOnEmptyInputTest");
+					client.setDetached(false);
+					client.submitJob(jobGraph, KafkaConsumerTestBase.class.getClassLoader());
 				}
 				catch (Throwable t) {
 					LOG.error("Job Runner failed with exception", t);
@@ -1100,14 +1117,12 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			Assert.fail("Test failed prematurely with: " + failueCause.getMessage());
 		}
 		// cancel
-		JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+		client.cancel(jobId);
 
 		// wait for the program to be done and validate that we failed with the right exception
 		runnerThread.join();
 
-		failueCause = error.get();
-		assertNotNull("program did not fail properly due to canceling", failueCause);
-		assertTrue(failueCause.getMessage().contains("Job was cancelled"));
+		assertEquals(JobStatus.CANCELED, client.getJobStatus(jobId).get());
 
 		deleteTestTopic(topic);
 	}
@@ -1558,52 +1573,53 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		createTestTopic(topic, 5, 1);
 
 		final Tuple1<Throwable> error = new Tuple1<>(null);
-		Runnable job = new Runnable() {
+
+		// start job writing & reading data.
+		final StreamExecutionEnvironment env1 = StreamExecutionEnvironment.getExecutionEnvironment();
+		env1.setParallelism(1);
+		env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+		env1.getConfig().disableSysoutLogging();
+		env1.disableOperatorChaining(); // let the source read everything into the network buffers
+
+		TypeInformationSerializationSchema<Tuple2<Integer, Integer>> schema = new TypeInformationSerializationSchema<>(TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>"), env1.getConfig());
+		DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, standardProps));
+		fromKafka.flatMap(new FlatMapFunction<Tuple2<Integer, Integer>, Void>() {
 			@Override
-			public void run() {
-				try {
-					// start job writing & reading data.
-					final StreamExecutionEnvironment env1 = StreamExecutionEnvironment.getExecutionEnvironment();
-					env1.setParallelism(1);
-					env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-					env1.getConfig().disableSysoutLogging();
-					env1.disableOperatorChaining(); // let the source read everything into the network buffers
-
-					Properties props = new Properties();
-					props.putAll(standardProps);
-					props.putAll(secureProps);
-
-					TypeInformationSerializationSchema<Tuple2<Integer, Integer>> schema = new TypeInformationSerializationSchema<>(TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>"), env1.getConfig());
-					DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, standardProps));
-					fromKafka.flatMap(new FlatMapFunction<Tuple2<Integer, Integer>, Void>() {
-						@Override
-						public void flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws Exception {// no op
-						}
-					});
-
-					DataStream<Tuple2<Integer, Integer>> fromGen = env1.addSource(new RichSourceFunction<Tuple2<Integer, Integer>>() {
-						boolean running = true;
-
-						@Override
-						public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
-							int i = 0;
-							while (running) {
-								ctx.collect(Tuple2.of(i++, getRuntimeContext().getIndexOfThisSubtask()));
-								Thread.sleep(1);
-							}
-						}
+			public void flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws Exception {// no op
+			}
+		});
 
-						@Override
-						public void cancel() {
-							running = false;
-						}
-					});
+		DataStream<Tuple2<Integer, Integer>> fromGen = env1.addSource(new RichSourceFunction<Tuple2<Integer, Integer>>() {
+			boolean running = true;
+
+			@Override
+			public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
+				int i = 0;
+				while (running) {
+					ctx.collect(Tuple2.of(i++, getRuntimeContext().getIndexOfThisSubtask()));
+					Thread.sleep(1);
+				}
+			}
+
+			@Override
+			public void cancel() {
+				running = false;
+			}
+		});
+
+		kafkaServer.produceIntoKafka(fromGen, topic, new KeyedSerializationSchemaWrapper<>(schema), standardProps, null);
 
-					kafkaServer.produceIntoKafka(fromGen, topic, new KeyedSerializationSchemaWrapper<>(schema), standardProps, null);
+		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env1.getStreamGraph());
+		final JobID jobId = jobGraph.getJobID();
 
-					env1.execute("Metrics test job");
+		Runnable job = new Runnable() {
+			@Override
+			public void run() {
+				try {
+					client.setDetached(false);
+					client.submitJob(jobGraph, KafkaConsumerTestBase.class.getClassLoader());
 				} catch (Throwable t) {
-					if (!(t instanceof JobCancellationException)) { // we'll cancel the job
+					if (!ExceptionUtils.findThrowable(t, JobCancellationException.class).isPresent()) {
 						LOG.warn("Got exception during execution", t);
 						error.f0 = t;
 					}
@@ -1653,7 +1669,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			LOG.info("Found all JMX metrics. Cancelling job.");
 		} finally {
 			// cancel
-			JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+			client.cancel(jobId);
 			// wait for the job to finish (it should due to the cancel command above)
 			jobThread.join();
 		}
@@ -1903,7 +1919,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			catch (Exception e) {
 				LOG.error("Write attempt failed, trying again", e);
 				deleteTestTopic(topicName);
-				JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
+				waitUntilNoJobIsRunning(client);
 				continue;
 			}
 
@@ -1914,7 +1930,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			// we need to validate the sequence, because kafka's producers are not exactly once
 			LOG.info("Validating sequence");
 
-			JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
+			waitUntilNoJobIsRunning(client);
 
 			if (validateSequence(topicName, parallelism, deserSchema, numElements)) {
 				// everything is good!
@@ -1996,7 +2012,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		// we need to validate the sequence, because kafka's producers are not exactly once
 		LOG.info("Validating sequence");
-		JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
+		while (!getRunningJobs(client).isEmpty()){
+			Thread.sleep(50);
+		}
 
 		if (!validateSequence(topicName, parallelism, deserSchema, originalNumElements + numElementsToAppend)) {
 			throw new Exception("Could not append a valid sequence to Kafka.");
@@ -2040,13 +2058,20 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		final AtomicReference<Throwable> errorRef = new AtomicReference<>();
 
+		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(readEnv.getStreamGraph());
+		final JobID jobId = jobGraph.getJobID();
+
 		Thread runner = new Thread() {
 			@Override
 			public void run() {
 				try {
+					client.setDetached(false);
+					client.submitJob(jobGraph, KafkaConsumerTestBase.class.getClassLoader());
 					tryExecute(readEnv, "sequence validation");
 				} catch (Throwable t) {
-					errorRef.set(t);
+					if (!ExceptionUtils.findThrowable(t, SuccessException.class).isPresent()) {
+						errorRef.set(t);
+					}
 				}
 			}
 		};
@@ -2064,7 +2089,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			// did not finish in time, maybe the producer dropped one or more records and
 			// the validation did not reach the exit point
 			success = false;
-			JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+			client.cancel(jobId);
 		}
 		else {
 			Throwable error = errorRef.get();
@@ -2077,7 +2102,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			}
 		}
 
-		JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
+		waitUntilNoJobIsRunning(client);
 
 		return success;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/7d6afed8/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index f471cd4..697e075 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -24,9 +24,9 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.metrics.jmx.JMXReporter;
 import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
@@ -75,13 +75,17 @@ public abstract class KafkaTestBase extends TestLogger {
 
 	protected static final int TM_SLOTS = 8;
 
-	protected static final int PARALLELISM = NUM_TMS * TM_SLOTS;
-
 	protected static String brokerConnectionStrings;
 
 	protected static Properties standardProps;
 
-	protected static LocalFlinkMiniCluster flink;
+	@ClassRule
+	public static MiniClusterResource flink = new MiniClusterResource(
+		new MiniClusterResource.MiniClusterResourceConfiguration(
+			getFlinkConfiguration(),
+			NUM_TMS,
+			TM_SLOTS),
+		true);
 
 	protected static FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
 
@@ -107,8 +111,6 @@ public abstract class KafkaTestBase extends TestLogger {
 		LOG.info("-------------------------------------------------------------------------");
 
 		startClusters(false, hideKafkaBehindProxy);
-
-		TestStreamEnvironment.setAsContext(flink, PARALLELISM);
 	}
 
 	@AfterClass
@@ -131,8 +133,6 @@ public abstract class KafkaTestBase extends TestLogger {
 		Configuration flinkConfig = new Configuration();
 		flinkConfig.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "5 s");
 		flinkConfig.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "1 s");
-		flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
-		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
 		flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
 		flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
 		flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
@@ -163,18 +163,9 @@ public abstract class KafkaTestBase extends TestLogger {
 			}
 			secureProps = kafkaServer.getSecureProperties();
 		}
-
-		// start also a re-usable Flink mini cluster
-		flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false);
-		flink.start();
 	}
 
 	protected static void shutdownClusters() throws Exception {
-
-		if (flink != null) {
-			flink.stop();
-		}
-
 		if (secureProps != null) {
 			secureProps.clear();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/7d6afed8/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ClusterCommunicationUtils.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ClusterCommunicationUtils.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ClusterCommunicationUtils.java
new file mode 100644
index 0000000..41f9d1e
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ClusterCommunicationUtils.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.runtime.client.JobStatusMessage;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Utilities for communicating with a cluster through a {@link ClusterClient}.
+ */
+public class ClusterCommunicationUtils {
+
+	public static void waitUntilJobIsRunning(ClusterClient<?> client) throws Exception {
+		while (getRunningJobs(client).isEmpty()) {
+			Thread.sleep(50);
+		}
+	}
+
+	public static void waitUntilNoJobIsRunning(ClusterClient<?> client) throws Exception {
+		while (!getRunningJobs(client).isEmpty()) {
+			Thread.sleep(50);
+		}
+	}
+
+	public static List<JobID> getRunningJobs(ClusterClient<?> client) throws Exception {
+		Collection<JobStatusMessage> statusMessages = client.listJobs().get();
+		return statusMessages.stream()
+			.filter(status -> !status.getJobState().isGloballyTerminalState())
+			.map(JobStatusMessage::getJobId)
+			.collect(Collectors.toList());
+	}
+
+	private ClusterCommunicationUtils() {
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7d6afed8/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
deleted file mode 100644
index 9bbe1d3..0000000
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-/**
- * Utilities for communicating with a jobmanager through a {@link ActorGateway}.
- */
-public class JobManagerCommunicationUtils {
-
-	private static final FiniteDuration askTimeout = new FiniteDuration(30, TimeUnit.SECONDS);
-
-	public static void waitUntilNoJobIsRunning(ActorGateway jobManager) throws Exception {
-		while (true) {
-			// find the jobID
-			Future<Object> listResponse = jobManager.ask(
-					JobManagerMessages.getRequestRunningJobsStatus(), askTimeout);
-
-			Object result = Await.result(listResponse, askTimeout);
-			List<JobStatusMessage> jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages();
-
-			if (jobs.isEmpty()) {
-				return;
-			}
-
-			Thread.sleep(50);
-		}
-	}
-
-	public static void waitUntilJobIsRunning(ActorGateway jobManager, String name) throws Exception {
-		while (true) {
-			Future<Object> listResponse = jobManager.ask(
-				JobManagerMessages.getRequestRunningJobsStatus(),
-				askTimeout);
-
-			List<JobStatusMessage> jobs;
-			try {
-				Object result = Await.result(listResponse, askTimeout);
-				jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages();
-			}
-			catch (Exception e) {
-				throw new Exception("Could not wait for job to start - failed to retrieve running jobs from the JobManager.", e);
-			}
-
-			// see if the running jobs contain the requested job
-			for (JobStatusMessage job : jobs) {
-				if (job.getJobName().equals(name)) {
-					return;
-				}
-			}
-
-			Thread.sleep(50);
-		}
-	}
-
-	public static void cancelCurrentJob(ActorGateway jobManager) throws Exception {
-		cancelCurrentJob(jobManager, null);
-	}
-
-	public static void cancelCurrentJob(ActorGateway jobManager, String name) throws Exception {
-		JobStatusMessage status = null;
-
-		for (int i = 0; i < 200; i++) {
-			// find the jobID
-			Future<Object> listResponse = jobManager.ask(
-					JobManagerMessages.getRequestRunningJobsStatus(),
-					askTimeout);
-
-			List<JobStatusMessage> jobs;
-			try {
-				Object result = Await.result(listResponse, askTimeout);
-				jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages();
-			}
-			catch (Exception e) {
-				throw new Exception("Could not cancel job - failed to retrieve running jobs from the JobManager.", e);
-			}
-
-			if (jobs.isEmpty()) {
-				// try again, fall through the loop
-				Thread.sleep(50);
-			}
-			else if (jobs.size() == 1) {
-				status = jobs.get(0);
-			}
-			else if (name != null) {
-				for (JobStatusMessage msg: jobs) {
-					if (msg.getJobName().equals(name)) {
-						status = msg;
-					}
-				}
-				if (status == null) {
-					throw new Exception("Could not cancel job - no job matched expected name = '" + name + "' in " + jobs);
-				}
-			} else {
-				String jobNames = "";
-				for (JobStatusMessage jsm: jobs) {
-					jobNames += jsm.getJobName() + ", ";
-				}
-				throw new Exception("Could not cancel job - more than one running job: " + jobNames);
-			}
-		}
-
-		if (status == null) {
-			throw new Exception("Could not cancel job - no running jobs");
-		}
-		else if (status.getJobState().isGloballyTerminalState()) {
-			throw new Exception("Could not cancel job - job is not running any more");
-		}
-
-		JobID jobId = status.getJobId();
-
-		Future<Object> response = jobManager.ask(new JobManagerMessages.CancelJob(jobId), askTimeout);
-		try {
-			Await.result(response, askTimeout);
-		}
-		catch (Exception e) {
-			throw new Exception("Sending the 'cancel' message failed.", e);
-		}
-	}
-}