You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/06 13:24:19 UTC
[5/7] flink git commit: [FLINK-8703][tests] Port KafkaTestBase to
MiniClusterResource
[FLINK-8703][tests] Port KafkaTestBase to MiniClusterResource
This closes #5669.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/84ad2cd4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/84ad2cd4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/84ad2cd4
Branch: refs/heads/release-1.5
Commit: 84ad2cd4b13db2dbe4a144aa2e3a2802e79f77b9
Parents: 69b8a92
Author: zentol <ch...@apache.org>
Authored: Wed Mar 7 13:39:25 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Fri Apr 6 15:24:03 2018 +0200
----------------------------------------------------------------------
.../connectors/kafka/KafkaConsumerTestBase.java | 219 +++++++++++--------
.../connectors/kafka/KafkaTestBase.java | 25 +--
.../testutils/ClusterCommunicationUtils.java | 56 +++++
.../testutils/JobManagerCommunicationUtils.java | 147 -------------
4 files changed, 186 insertions(+), 261 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/84ad2cd4/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 959d6f1..6ed9143 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
@@ -34,6 +35,7 @@ import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataInputView;
@@ -42,7 +44,8 @@ import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.client.JobCancellationException;
import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -54,11 +57,11 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators;
import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
-import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
import org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidatingMapper;
import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper;
import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2FlinkPartitioner;
@@ -72,6 +75,9 @@ import org.apache.flink.test.util.SuccessException;
import org.apache.flink.testutils.junit.RetryOnException;
import org.apache.flink.testutils.junit.RetryRule;
import org.apache.flink.util.Collector;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
@@ -106,10 +112,12 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
+import static org.apache.flink.streaming.connectors.kafka.testutils.ClusterCommunicationUtils.getRunningJobs;
+import static org.apache.flink.streaming.connectors.kafka.testutils.ClusterCommunicationUtils.waitUntilJobIsRunning;
+import static org.apache.flink.streaming.connectors.kafka.testutils.ClusterCommunicationUtils.waitUntilNoJobIsRunning;
import static org.apache.flink.test.util.TestUtils.tryExecute;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -123,6 +131,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
@Rule
public RetryRule retryRule = new RetryRule();
+ private ClusterClient<?> client;
+
// ------------------------------------------------------------------------
// Common Test Preparation
// ------------------------------------------------------------------------
@@ -132,8 +142,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
* the same mini cluster. Otherwise, missing slots may happen.
*/
@Before
- public void ensureNoJobIsLingering() throws Exception {
- JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
+ public void setClientAndEnsureNoJobIsLingering() throws Exception {
+ client = flink.getClusterClient();
+ waitUntilNoJobIsRunning(client);
}
// ------------------------------------------------------------------------
@@ -244,7 +255,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
while (System.nanoTime() < deadline);
// cancel the job & wait for the job to finish
- JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+ client.cancel(Iterables.getOnlyElement(getRunningJobs(client)));
runner.join();
final Throwable t = errorRef.get();
@@ -330,7 +341,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
while (System.nanoTime() < deadline);
// cancel the job & wait for the job to finish
- JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+ client.cancel(Iterables.getOnlyElement(getRunningJobs(client)));
runner.join();
final Throwable t = errorRef.get();
@@ -443,14 +454,18 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
}).setParallelism(1)
.addSink(new DiscardingSink<>());
+ JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+ final JobID consumeJobId = jobGraph.getJobID();
+
final AtomicReference<Throwable> error = new AtomicReference<>();
Thread consumeThread = new Thread(new Runnable() {
@Override
public void run() {
try {
- env.execute(consumeExtraRecordsJobName);
+ client.setDetached(false);
+ client.submitJob(jobGraph, KafkaConsumerTestBase.class.getClassLoader());
} catch (Throwable t) {
- if (!(t instanceof JobCancellationException)) {
+ if (!ExceptionUtils.findThrowable(t, JobCancellationException.class).isPresent()) {
error.set(t);
}
}
@@ -459,9 +474,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
consumeThread.start();
// wait until the consuming job has started, to be extra safe
- JobManagerCommunicationUtils.waitUntilJobIsRunning(
- flink.getLeaderGateway(timeout),
- consumeExtraRecordsJobName);
+ waitUntilJobIsRunning(client);
// setup the extra records writing job
final StreamExecutionEnvironment env2 = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -500,9 +513,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
}
// cancel the consume job after all extra records are written
- JobManagerCommunicationUtils.cancelCurrentJob(
- flink.getLeaderGateway(timeout),
- consumeExtraRecordsJobName);
+ client.cancel(consumeJobId);
consumeThread.join();
kafkaOffsetHandler.close();
@@ -989,23 +1000,27 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
final AtomicReference<Throwable> jobError = new AtomicReference<>();
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(parallelism);
+ env.enableCheckpointing(100);
+ env.getConfig().disableSysoutLogging();
+
+ Properties props = new Properties();
+ props.putAll(standardProps);
+ props.putAll(secureProps);
+ FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), props);
+
+ env.addSource(source).addSink(new DiscardingSink<String>());
+
+ JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+ final JobID jobId = jobGraph.getJobID();
+
final Runnable jobRunner = new Runnable() {
@Override
public void run() {
try {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(parallelism);
- env.enableCheckpointing(100);
- env.getConfig().disableSysoutLogging();
-
- Properties props = new Properties();
- props.putAll(standardProps);
- props.putAll(secureProps);
- FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), props);
-
- env.addSource(source).addSink(new DiscardingSink<String>());
-
- env.execute("Runner for CancelingOnFullInputTest");
+ client.setDetached(false);
+ client.submitJob(jobGraph, KafkaConsumerTestBase.class.getClassLoader());
}
catch (Throwable t) {
jobError.set(t);
@@ -1026,14 +1041,12 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
}
// cancel
- JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout), "Runner for CancelingOnFullInputTest");
+ client.cancel(jobId);
// wait for the program to be done and validate that we failed with the right exception
runnerThread.join();
- failueCause = jobError.get();
- assertNotNull("program did not fail properly due to canceling", failueCause);
- assertTrue(failueCause.getMessage().contains("Job was cancelled"));
+ assertEquals(JobStatus.CANCELED, client.getJobStatus(jobId).get());
if (generator.isAlive()) {
generator.shutdown();
@@ -1063,23 +1076,27 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
final AtomicReference<Throwable> error = new AtomicReference<>();
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(parallelism);
+ env.enableCheckpointing(100);
+ env.getConfig().disableSysoutLogging();
+
+ Properties props = new Properties();
+ props.putAll(standardProps);
+ props.putAll(secureProps);
+ FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), props);
+
+ env.addSource(source).addSink(new DiscardingSink<String>());
+
+ JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+ final JobID jobId = jobGraph.getJobID();
+
final Runnable jobRunner = new Runnable() {
@Override
public void run() {
try {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(parallelism);
- env.enableCheckpointing(100);
- env.getConfig().disableSysoutLogging();
-
- Properties props = new Properties();
- props.putAll(standardProps);
- props.putAll(secureProps);
- FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), props);
-
- env.addSource(source).addSink(new DiscardingSink<String>());
-
- env.execute("CancelingOnEmptyInputTest");
+ client.setDetached(false);
+ client.submitJob(jobGraph, KafkaConsumerTestBase.class.getClassLoader());
}
catch (Throwable t) {
LOG.error("Job Runner failed with exception", t);
@@ -1100,14 +1117,12 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
Assert.fail("Test failed prematurely with: " + failueCause.getMessage());
}
// cancel
- JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+ client.cancel(jobId);
// wait for the program to be done and validate that we failed with the right exception
runnerThread.join();
- failueCause = error.get();
- assertNotNull("program did not fail properly due to canceling", failueCause);
- assertTrue(failueCause.getMessage().contains("Job was cancelled"));
+ assertEquals(JobStatus.CANCELED, client.getJobStatus(jobId).get());
deleteTestTopic(topic);
}
@@ -1558,52 +1573,53 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
createTestTopic(topic, 5, 1);
final Tuple1<Throwable> error = new Tuple1<>(null);
- Runnable job = new Runnable() {
+
+ // start job writing & reading data.
+ final StreamExecutionEnvironment env1 = StreamExecutionEnvironment.getExecutionEnvironment();
+ env1.setParallelism(1);
+ env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+ env1.getConfig().disableSysoutLogging();
+ env1.disableOperatorChaining(); // let the source read everything into the network buffers
+
+ TypeInformationSerializationSchema<Tuple2<Integer, Integer>> schema = new TypeInformationSerializationSchema<>(TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>"), env1.getConfig());
+ DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, standardProps));
+ fromKafka.flatMap(new FlatMapFunction<Tuple2<Integer, Integer>, Void>() {
@Override
- public void run() {
- try {
- // start job writing & reading data.
- final StreamExecutionEnvironment env1 = StreamExecutionEnvironment.getExecutionEnvironment();
- env1.setParallelism(1);
- env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
- env1.getConfig().disableSysoutLogging();
- env1.disableOperatorChaining(); // let the source read everything into the network buffers
-
- Properties props = new Properties();
- props.putAll(standardProps);
- props.putAll(secureProps);
-
- TypeInformationSerializationSchema<Tuple2<Integer, Integer>> schema = new TypeInformationSerializationSchema<>(TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>"), env1.getConfig());
- DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, standardProps));
- fromKafka.flatMap(new FlatMapFunction<Tuple2<Integer, Integer>, Void>() {
- @Override
- public void flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws Exception {// no op
- }
- });
-
- DataStream<Tuple2<Integer, Integer>> fromGen = env1.addSource(new RichSourceFunction<Tuple2<Integer, Integer>>() {
- boolean running = true;
-
- @Override
- public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
- int i = 0;
- while (running) {
- ctx.collect(Tuple2.of(i++, getRuntimeContext().getIndexOfThisSubtask()));
- Thread.sleep(1);
- }
- }
+ public void flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws Exception {// no op
+ }
+ });
- @Override
- public void cancel() {
- running = false;
- }
- });
+ DataStream<Tuple2<Integer, Integer>> fromGen = env1.addSource(new RichSourceFunction<Tuple2<Integer, Integer>>() {
+ boolean running = true;
+
+ @Override
+ public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
+ int i = 0;
+ while (running) {
+ ctx.collect(Tuple2.of(i++, getRuntimeContext().getIndexOfThisSubtask()));
+ Thread.sleep(1);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ });
+
+ kafkaServer.produceIntoKafka(fromGen, topic, new KeyedSerializationSchemaWrapper<>(schema), standardProps, null);
- kafkaServer.produceIntoKafka(fromGen, topic, new KeyedSerializationSchemaWrapper<>(schema), standardProps, null);
+ JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env1.getStreamGraph());
+ final JobID jobId = jobGraph.getJobID();
- env1.execute("Metrics test job");
+ Runnable job = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ client.setDetached(false);
+ client.submitJob(jobGraph, KafkaConsumerTestBase.class.getClassLoader());
} catch (Throwable t) {
- if (!(t instanceof JobCancellationException)) { // we'll cancel the job
+ if (!ExceptionUtils.findThrowable(t, JobCancellationException.class).isPresent()) {
LOG.warn("Got exception during execution", t);
error.f0 = t;
}
@@ -1653,7 +1669,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
LOG.info("Found all JMX metrics. Cancelling job.");
} finally {
// cancel
- JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+ client.cancel(jobId);
// wait for the job to finish (it should due to the cancel command above)
jobThread.join();
}
@@ -1903,7 +1919,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
catch (Exception e) {
LOG.error("Write attempt failed, trying again", e);
deleteTestTopic(topicName);
- JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
+ waitUntilNoJobIsRunning(client);
continue;
}
@@ -1914,7 +1930,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
// we need to validate the sequence, because kafka's producers are not exactly once
LOG.info("Validating sequence");
- JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
+ waitUntilNoJobIsRunning(client);
if (validateSequence(topicName, parallelism, deserSchema, numElements)) {
// everything is good!
@@ -1996,7 +2012,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
// we need to validate the sequence, because kafka's producers are not exactly once
LOG.info("Validating sequence");
- JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
+ while (!getRunningJobs(client).isEmpty()){
+ Thread.sleep(50);
+ }
if (!validateSequence(topicName, parallelism, deserSchema, originalNumElements + numElementsToAppend)) {
throw new Exception("Could not append a valid sequence to Kafka.");
@@ -2040,13 +2058,20 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+ JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(readEnv.getStreamGraph());
+ final JobID jobId = jobGraph.getJobID();
+
Thread runner = new Thread() {
@Override
public void run() {
try {
+ client.setDetached(false);
+ client.submitJob(jobGraph, KafkaConsumerTestBase.class.getClassLoader());
tryExecute(readEnv, "sequence validation");
} catch (Throwable t) {
- errorRef.set(t);
+ if (!ExceptionUtils.findThrowable(t, SuccessException.class).isPresent()) {
+ errorRef.set(t);
+ }
}
}
};
@@ -2064,7 +2089,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
// did not finish in time, maybe the producer dropped one or more records and
// the validation did not reach the exit point
success = false;
- JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+ client.cancel(jobId);
}
else {
Throwable error = errorRef.get();
@@ -2077,7 +2102,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
}
}
- JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
+ waitUntilNoJobIsRunning(client);
return success;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/84ad2cd4/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index f471cd4..697e075 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -24,9 +24,9 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.metrics.jmx.JMXReporter;
import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.TestLogger;
@@ -75,13 +75,17 @@ public abstract class KafkaTestBase extends TestLogger {
protected static final int TM_SLOTS = 8;
- protected static final int PARALLELISM = NUM_TMS * TM_SLOTS;
-
protected static String brokerConnectionStrings;
protected static Properties standardProps;
- protected static LocalFlinkMiniCluster flink;
+ @ClassRule
+ public static MiniClusterResource flink = new MiniClusterResource(
+ new MiniClusterResource.MiniClusterResourceConfiguration(
+ getFlinkConfiguration(),
+ NUM_TMS,
+ TM_SLOTS),
+ true);
protected static FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
@@ -107,8 +111,6 @@ public abstract class KafkaTestBase extends TestLogger {
LOG.info("-------------------------------------------------------------------------");
startClusters(false, hideKafkaBehindProxy);
-
- TestStreamEnvironment.setAsContext(flink, PARALLELISM);
}
@AfterClass
@@ -131,8 +133,6 @@ public abstract class KafkaTestBase extends TestLogger {
Configuration flinkConfig = new Configuration();
flinkConfig.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "5 s");
flinkConfig.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "1 s");
- flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
- flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
@@ -163,18 +163,9 @@ public abstract class KafkaTestBase extends TestLogger {
}
secureProps = kafkaServer.getSecureProperties();
}
-
- // start also a re-usable Flink mini cluster
- flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false);
- flink.start();
}
protected static void shutdownClusters() throws Exception {
-
- if (flink != null) {
- flink.stop();
- }
-
if (secureProps != null) {
secureProps.clear();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/84ad2cd4/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ClusterCommunicationUtils.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ClusterCommunicationUtils.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ClusterCommunicationUtils.java
new file mode 100644
index 0000000..41f9d1e
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ClusterCommunicationUtils.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.runtime.client.JobStatusMessage;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Utilities for communicating with a cluster through a {@link ClusterClient}.
+ */
+public class ClusterCommunicationUtils {
+
+ public static void waitUntilJobIsRunning(ClusterClient<?> client) throws Exception {
+ while (getRunningJobs(client).isEmpty()) {
+ Thread.sleep(50);
+ }
+ }
+
+ public static void waitUntilNoJobIsRunning(ClusterClient<?> client) throws Exception {
+ while (!getRunningJobs(client).isEmpty()) {
+ Thread.sleep(50);
+ }
+ }
+
+ public static List<JobID> getRunningJobs(ClusterClient<?> client) throws Exception {
+ Collection<JobStatusMessage> statusMessages = client.listJobs().get();
+ return statusMessages.stream()
+ .filter(status -> !status.getJobState().isGloballyTerminalState())
+ .map(JobStatusMessage::getJobId)
+ .collect(Collectors.toList());
+ }
+
+ private ClusterCommunicationUtils() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/84ad2cd4/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
deleted file mode 100644
index 9bbe1d3..0000000
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-/**
- * Utilities for communicating with a jobmanager through a {@link ActorGateway}.
- */
-public class JobManagerCommunicationUtils {
-
- private static final FiniteDuration askTimeout = new FiniteDuration(30, TimeUnit.SECONDS);
-
- public static void waitUntilNoJobIsRunning(ActorGateway jobManager) throws Exception {
- while (true) {
- // find the jobID
- Future<Object> listResponse = jobManager.ask(
- JobManagerMessages.getRequestRunningJobsStatus(), askTimeout);
-
- Object result = Await.result(listResponse, askTimeout);
- List<JobStatusMessage> jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages();
-
- if (jobs.isEmpty()) {
- return;
- }
-
- Thread.sleep(50);
- }
- }
-
- public static void waitUntilJobIsRunning(ActorGateway jobManager, String name) throws Exception {
- while (true) {
- Future<Object> listResponse = jobManager.ask(
- JobManagerMessages.getRequestRunningJobsStatus(),
- askTimeout);
-
- List<JobStatusMessage> jobs;
- try {
- Object result = Await.result(listResponse, askTimeout);
- jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages();
- }
- catch (Exception e) {
- throw new Exception("Could not wait for job to start - failed to retrieve running jobs from the JobManager.", e);
- }
-
- // see if the running jobs contain the requested job
- for (JobStatusMessage job : jobs) {
- if (job.getJobName().equals(name)) {
- return;
- }
- }
-
- Thread.sleep(50);
- }
- }
-
- public static void cancelCurrentJob(ActorGateway jobManager) throws Exception {
- cancelCurrentJob(jobManager, null);
- }
-
- public static void cancelCurrentJob(ActorGateway jobManager, String name) throws Exception {
- JobStatusMessage status = null;
-
- for (int i = 0; i < 200; i++) {
- // find the jobID
- Future<Object> listResponse = jobManager.ask(
- JobManagerMessages.getRequestRunningJobsStatus(),
- askTimeout);
-
- List<JobStatusMessage> jobs;
- try {
- Object result = Await.result(listResponse, askTimeout);
- jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages();
- }
- catch (Exception e) {
- throw new Exception("Could not cancel job - failed to retrieve running jobs from the JobManager.", e);
- }
-
- if (jobs.isEmpty()) {
- // try again, fall through the loop
- Thread.sleep(50);
- }
- else if (jobs.size() == 1) {
- status = jobs.get(0);
- }
- else if (name != null) {
- for (JobStatusMessage msg: jobs) {
- if (msg.getJobName().equals(name)) {
- status = msg;
- }
- }
- if (status == null) {
- throw new Exception("Could not cancel job - no job matched expected name = '" + name + "' in " + jobs);
- }
- } else {
- String jobNames = "";
- for (JobStatusMessage jsm: jobs) {
- jobNames += jsm.getJobName() + ", ";
- }
- throw new Exception("Could not cancel job - more than one running job: " + jobNames);
- }
- }
-
- if (status == null) {
- throw new Exception("Could not cancel job - no running jobs");
- }
- else if (status.getJobState().isGloballyTerminalState()) {
- throw new Exception("Could not cancel job - job is not running any more");
- }
-
- JobID jobId = status.getJobId();
-
- Future<Object> response = jobManager.ask(new JobManagerMessages.CancelJob(jobId), askTimeout);
- try {
- Await.result(response, askTimeout);
- }
- catch (Exception e) {
- throw new Exception("Sending the 'cancel' message failed.", e);
- }
- }
-}