You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/06 13:24:15 UTC

[1/7] flink git commit: [hotfix][tests] Remove unused methods in KafkaConsumerTestBase

Repository: flink
Updated Branches:
  refs/heads/release-1.5 ff3a0ec69 -> 0ec37e44c


[hotfix][tests] Remove unused methods in KafkaConsumerTestBase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/69b8a920
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/69b8a920
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/69b8a920

Branch: refs/heads/release-1.5
Commit: 69b8a920b3fbb1969269b335dd4633d780f3468c
Parents: f4e0cdf
Author: zentol <ch...@apache.org>
Authored: Tue Apr 3 11:42:43 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri Apr 6 15:24:02 2018 +0200

----------------------------------------------------------------------
 .../connectors/kafka/KafkaConsumerTestBase.java | 26 --------------------
 1 file changed, 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/69b8a920/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index f9d745e..959d6f1 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -2121,32 +2121,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		return result;
 	}
 
-	private static void printTopic(String topicName, ConsumerConfig config,
-								DeserializationSchema<?> deserializationSchema,
-								int stopAfter) throws IOException {
-
-		List<MessageAndMetadata<byte[], byte[]>> contents = readTopicToList(topicName, config, stopAfter);
-		LOG.info("Printing contents of topic {} in consumer grouo {}", topicName, config.groupId());
-
-		for (MessageAndMetadata<byte[], byte[]> message: contents) {
-			Object out = deserializationSchema.deserialize(message.message());
-			LOG.info("Message: partition: {} offset: {} msg: {}", message.partition(), message.offset(), out.toString());
-		}
-	}
-
-	private static void printTopic(String topicName, int elements, DeserializationSchema<?> deserializer)
-			throws IOException {
-		// write the sequence to log for debugging purposes
-		Properties newProps = new Properties(standardProps);
-		newProps.setProperty("group.id", "topic-printer" + UUID.randomUUID().toString());
-		newProps.setProperty("auto.offset.reset", "smallest");
-		newProps.setProperty("zookeeper.connect", standardProps.getProperty("zookeeper.connect"));
-		newProps.putAll(secureProps);
-
-		ConsumerConfig printerConfig = new ConsumerConfig(newProps);
-		printTopic(topicName, printerConfig, deserializer, elements);
-	}
-
 	private static class BrokerKillingMapper<T> extends RichMapFunction<T, T>
 			implements ListCheckpointed<Integer>, CheckpointListener {
 


[7/7] flink git commit: [FLINK-8837][annotations] Add @Experimental annotation and annotate some classes

Posted by ch...@apache.org.
[FLINK-8837][annotations] Add @Experimental annotation and annotate some classes

This closes #5800.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c8e0a314
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c8e0a314
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c8e0a314

Branch: refs/heads/release-1.5
Commit: c8e0a314dae31b62578bab9ea4cf823b7e70b014
Parents: 84ad2cd
Author: Bowen Li <bo...@gmail.com>
Authored: Mon Apr 2 00:59:34 2018 -0700
Committer: zentol <ch...@apache.org>
Committed: Fri Apr 6 15:24:03 2018 +0200

----------------------------------------------------------------------
 .../apache/flink/annotation/Experimental.java   | 40 ++++++++++++++++++++
 .../api/datastream/DataStreamUtils.java         |  7 +---
 .../streaming/experimental/CollectSink.java     |  4 +-
 .../experimental/SocketStreamIterator.java      |  4 +-
 .../streaming/api/scala/DataStreamUtils.scala   |  4 +-
 5 files changed, 48 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c8e0a314/flink-annotations/src/main/java/org/apache/flink/annotation/Experimental.java
----------------------------------------------------------------------
diff --git a/flink-annotations/src/main/java/org/apache/flink/annotation/Experimental.java b/flink-annotations/src/main/java/org/apache/flink/annotation/Experimental.java
new file mode 100644
index 0000000..7ddcb1a
--- /dev/null
+++ b/flink-annotations/src/main/java/org/apache/flink/annotation/Experimental.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.annotation;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation to mark classes for experimental use.
+ *
+ * <p>Classes with this annotation are neither battle-tested nor stable, and may be changed or removed
+ * in future versions.
+ *
+ * <p>This annotation also excludes classes with evolving interfaces / signatures
+ * annotated with {@link Public} and {@link PublicEvolving}.
+ *
+ */
+@Documented
+@Target(ElementType.TYPE)
+@Public
+public @interface Experimental {
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c8e0a314/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java
index 351c456..32271b6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.api.datastream;
 
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -39,11 +39,8 @@ import java.util.Iterator;
 
 /**
  * A collection of utilities for {@link DataStream DataStreams}.
- *
- * <p>This experimental class is relocated from flink-streaming-contrib. Please see package-info.java
- * for more information.
  */
-@PublicEvolving
+@Experimental
 public final class DataStreamUtils {
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/c8e0a314/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/CollectSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/CollectSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/CollectSink.java
index 981f8e1..195d705 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/CollectSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/CollectSink.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.experimental;
 
-import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
@@ -34,7 +34,7 @@ import java.net.Socket;
  * <p>This experimental class is relocated from flink-streaming-contrib. Please see package-info.java
  * for more information.
  */
-@Internal
+@Experimental
 public class CollectSink<IN> extends RichSinkFunction<IN> {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/c8e0a314/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java
index 62d46e1..bc730cb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.experimental;
 
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 
@@ -41,7 +41,7 @@ import java.util.NoSuchElementException;
  *
  * @param <T> The type of elements returned from the iterator.
  */
-@PublicEvolving
+@Experimental
 public class SocketStreamIterator<T> implements Iterator<T> {
 
 	/** Server socket to listen at. */

http://git-wip-us.apache.org/repos/asf/flink/blob/c8e0a314/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStreamUtils.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStreamUtils.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStreamUtils.scala
index 0f42aa2..fd48f9c 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStreamUtils.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStreamUtils.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.api.scala
 
-import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.annotation.Experimental
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.datastream.{DataStreamUtils => JavaStreamUtils}
 
@@ -33,7 +33,7 @@ import scala.reflect.ClassTag
   *
   * @param self DataStream
   */
-@PublicEvolving
+@Experimental
 class DataStreamUtils[T: TypeInformation : ClassTag](val self: DataStream[T]) {
 
   /**


[5/7] flink git commit: [FLINK-8703][tests] Port KafkaTestBase to MiniClusterResource

Posted by ch...@apache.org.
[FLINK-8703][tests] Port KafkaTestBase to MiniClusterResource

This closes #5669.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/84ad2cd4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/84ad2cd4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/84ad2cd4

Branch: refs/heads/release-1.5
Commit: 84ad2cd4b13db2dbe4a144aa2e3a2802e79f77b9
Parents: 69b8a92
Author: zentol <ch...@apache.org>
Authored: Wed Mar 7 13:39:25 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Fri Apr 6 15:24:03 2018 +0200

----------------------------------------------------------------------
 .../connectors/kafka/KafkaConsumerTestBase.java | 219 +++++++++++--------
 .../connectors/kafka/KafkaTestBase.java         |  25 +--
 .../testutils/ClusterCommunicationUtils.java    |  56 +++++
 .../testutils/JobManagerCommunicationUtils.java | 147 -------------
 4 files changed, 186 insertions(+), 261 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/84ad2cd4/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 959d6f1..6ed9143 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
@@ -34,6 +35,7 @@ import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.DataInputView;
@@ -42,7 +44,8 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.client.JobCancellationException;
 import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -54,11 +57,11 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators;
 import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
-import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
 import org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidatingMapper;
 import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper;
 import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2FlinkPartitioner;
@@ -72,6 +75,9 @@ import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.testutils.junit.RetryOnException;
 import org.apache.flink.testutils.junit.RetryRule;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
 
 import kafka.consumer.Consumer;
 import kafka.consumer.ConsumerConfig;
@@ -106,10 +112,12 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.flink.streaming.connectors.kafka.testutils.ClusterCommunicationUtils.getRunningJobs;
+import static org.apache.flink.streaming.connectors.kafka.testutils.ClusterCommunicationUtils.waitUntilJobIsRunning;
+import static org.apache.flink.streaming.connectors.kafka.testutils.ClusterCommunicationUtils.waitUntilNoJobIsRunning;
 import static org.apache.flink.test.util.TestUtils.tryExecute;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -123,6 +131,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	@Rule
 	public RetryRule retryRule = new RetryRule();
 
+	private ClusterClient<?> client;
+
 	// ------------------------------------------------------------------------
 	//  Common Test Preparation
 	// ------------------------------------------------------------------------
@@ -132,8 +142,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	 * the same mini cluster. Otherwise, missing slots may happen.
 	 */
 	@Before
-	public void ensureNoJobIsLingering() throws Exception {
-		JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
+	public void setClientAndEnsureNoJobIsLingering() throws Exception {
+		client = flink.getClusterClient();
+		waitUntilNoJobIsRunning(client);
 	}
 
 	// ------------------------------------------------------------------------
@@ -244,7 +255,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		while (System.nanoTime() < deadline);
 
 		// cancel the job & wait for the job to finish
-		JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+		client.cancel(Iterables.getOnlyElement(getRunningJobs(client)));
 		runner.join();
 
 		final Throwable t = errorRef.get();
@@ -330,7 +341,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		while (System.nanoTime() < deadline);
 
 		// cancel the job & wait for the job to finish
-		JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+		client.cancel(Iterables.getOnlyElement(getRunningJobs(client)));
 		runner.join();
 
 		final Throwable t = errorRef.get();
@@ -443,14 +454,18 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			}).setParallelism(1)
 			.addSink(new DiscardingSink<>());
 
+		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+		final JobID consumeJobId = jobGraph.getJobID();
+
 		final AtomicReference<Throwable> error = new AtomicReference<>();
 		Thread consumeThread = new Thread(new Runnable() {
 			@Override
 			public void run() {
 				try {
-					env.execute(consumeExtraRecordsJobName);
+					client.setDetached(false);
+					client.submitJob(jobGraph, KafkaConsumerTestBase.class.getClassLoader());
 				} catch (Throwable t) {
-					if (!(t instanceof JobCancellationException)) {
+					if (!ExceptionUtils.findThrowable(t, JobCancellationException.class).isPresent()) {
 						error.set(t);
 					}
 				}
@@ -459,9 +474,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		consumeThread.start();
 
 		// wait until the consuming job has started, to be extra safe
-		JobManagerCommunicationUtils.waitUntilJobIsRunning(
-			flink.getLeaderGateway(timeout),
-			consumeExtraRecordsJobName);
+		waitUntilJobIsRunning(client);
 
 		// setup the extra records writing job
 		final StreamExecutionEnvironment env2 = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -500,9 +513,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		}
 
 		// cancel the consume job after all extra records are written
-		JobManagerCommunicationUtils.cancelCurrentJob(
-			flink.getLeaderGateway(timeout),
-			consumeExtraRecordsJobName);
+		client.cancel(consumeJobId);
 		consumeThread.join();
 
 		kafkaOffsetHandler.close();
@@ -989,23 +1000,27 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		final AtomicReference<Throwable> jobError = new AtomicReference<>();
 
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(parallelism);
+		env.enableCheckpointing(100);
+		env.getConfig().disableSysoutLogging();
+
+		Properties props = new Properties();
+		props.putAll(standardProps);
+		props.putAll(secureProps);
+		FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), props);
+
+		env.addSource(source).addSink(new DiscardingSink<String>());
+
+		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+		final JobID jobId = jobGraph.getJobID();
+
 		final Runnable jobRunner = new Runnable() {
 			@Override
 			public void run() {
 				try {
-					final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-					env.setParallelism(parallelism);
-					env.enableCheckpointing(100);
-					env.getConfig().disableSysoutLogging();
-
-					Properties props = new Properties();
-					props.putAll(standardProps);
-					props.putAll(secureProps);
-					FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), props);
-
-					env.addSource(source).addSink(new DiscardingSink<String>());
-
-					env.execute("Runner for CancelingOnFullInputTest");
+					client.setDetached(false);
+					client.submitJob(jobGraph, KafkaConsumerTestBase.class.getClassLoader());
 				}
 				catch (Throwable t) {
 					jobError.set(t);
@@ -1026,14 +1041,12 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		}
 
 		// cancel
-		JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout), "Runner for CancelingOnFullInputTest");
+		client.cancel(jobId);
 
 		// wait for the program to be done and validate that we failed with the right exception
 		runnerThread.join();
 
-		failueCause = jobError.get();
-		assertNotNull("program did not fail properly due to canceling", failueCause);
-		assertTrue(failueCause.getMessage().contains("Job was cancelled"));
+		assertEquals(JobStatus.CANCELED, client.getJobStatus(jobId).get());
 
 		if (generator.isAlive()) {
 			generator.shutdown();
@@ -1063,23 +1076,27 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		final AtomicReference<Throwable> error = new AtomicReference<>();
 
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(parallelism);
+		env.enableCheckpointing(100);
+		env.getConfig().disableSysoutLogging();
+
+		Properties props = new Properties();
+		props.putAll(standardProps);
+		props.putAll(secureProps);
+		FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), props);
+
+		env.addSource(source).addSink(new DiscardingSink<String>());
+
+		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+		final JobID jobId = jobGraph.getJobID();
+
 		final Runnable jobRunner = new Runnable() {
 			@Override
 			public void run() {
 				try {
-					final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-					env.setParallelism(parallelism);
-					env.enableCheckpointing(100);
-					env.getConfig().disableSysoutLogging();
-
-					Properties props = new Properties();
-					props.putAll(standardProps);
-					props.putAll(secureProps);
-					FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), props);
-
-					env.addSource(source).addSink(new DiscardingSink<String>());
-
-					env.execute("CancelingOnEmptyInputTest");
+					client.setDetached(false);
+					client.submitJob(jobGraph, KafkaConsumerTestBase.class.getClassLoader());
 				}
 				catch (Throwable t) {
 					LOG.error("Job Runner failed with exception", t);
@@ -1100,14 +1117,12 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			Assert.fail("Test failed prematurely with: " + failueCause.getMessage());
 		}
 		// cancel
-		JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+		client.cancel(jobId);
 
 		// wait for the program to be done and validate that we failed with the right exception
 		runnerThread.join();
 
-		failueCause = error.get();
-		assertNotNull("program did not fail properly due to canceling", failueCause);
-		assertTrue(failueCause.getMessage().contains("Job was cancelled"));
+		assertEquals(JobStatus.CANCELED, client.getJobStatus(jobId).get());
 
 		deleteTestTopic(topic);
 	}
@@ -1558,52 +1573,53 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		createTestTopic(topic, 5, 1);
 
 		final Tuple1<Throwable> error = new Tuple1<>(null);
-		Runnable job = new Runnable() {
+
+		// start job writing & reading data.
+		final StreamExecutionEnvironment env1 = StreamExecutionEnvironment.getExecutionEnvironment();
+		env1.setParallelism(1);
+		env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+		env1.getConfig().disableSysoutLogging();
+		env1.disableOperatorChaining(); // let the source read everything into the network buffers
+
+		TypeInformationSerializationSchema<Tuple2<Integer, Integer>> schema = new TypeInformationSerializationSchema<>(TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>"), env1.getConfig());
+		DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, standardProps));
+		fromKafka.flatMap(new FlatMapFunction<Tuple2<Integer, Integer>, Void>() {
 			@Override
-			public void run() {
-				try {
-					// start job writing & reading data.
-					final StreamExecutionEnvironment env1 = StreamExecutionEnvironment.getExecutionEnvironment();
-					env1.setParallelism(1);
-					env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-					env1.getConfig().disableSysoutLogging();
-					env1.disableOperatorChaining(); // let the source read everything into the network buffers
-
-					Properties props = new Properties();
-					props.putAll(standardProps);
-					props.putAll(secureProps);
-
-					TypeInformationSerializationSchema<Tuple2<Integer, Integer>> schema = new TypeInformationSerializationSchema<>(TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>"), env1.getConfig());
-					DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, standardProps));
-					fromKafka.flatMap(new FlatMapFunction<Tuple2<Integer, Integer>, Void>() {
-						@Override
-						public void flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws Exception {// no op
-						}
-					});
-
-					DataStream<Tuple2<Integer, Integer>> fromGen = env1.addSource(new RichSourceFunction<Tuple2<Integer, Integer>>() {
-						boolean running = true;
-
-						@Override
-						public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
-							int i = 0;
-							while (running) {
-								ctx.collect(Tuple2.of(i++, getRuntimeContext().getIndexOfThisSubtask()));
-								Thread.sleep(1);
-							}
-						}
+			public void flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws Exception {// no op
+			}
+		});
 
-						@Override
-						public void cancel() {
-							running = false;
-						}
-					});
+		DataStream<Tuple2<Integer, Integer>> fromGen = env1.addSource(new RichSourceFunction<Tuple2<Integer, Integer>>() {
+			boolean running = true;
+
+			@Override
+			public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
+				int i = 0;
+				while (running) {
+					ctx.collect(Tuple2.of(i++, getRuntimeContext().getIndexOfThisSubtask()));
+					Thread.sleep(1);
+				}
+			}
+
+			@Override
+			public void cancel() {
+				running = false;
+			}
+		});
+
+		kafkaServer.produceIntoKafka(fromGen, topic, new KeyedSerializationSchemaWrapper<>(schema), standardProps, null);
 
-					kafkaServer.produceIntoKafka(fromGen, topic, new KeyedSerializationSchemaWrapper<>(schema), standardProps, null);
+		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env1.getStreamGraph());
+		final JobID jobId = jobGraph.getJobID();
 
-					env1.execute("Metrics test job");
+		Runnable job = new Runnable() {
+			@Override
+			public void run() {
+				try {
+					client.setDetached(false);
+					client.submitJob(jobGraph, KafkaConsumerTestBase.class.getClassLoader());
 				} catch (Throwable t) {
-					if (!(t instanceof JobCancellationException)) { // we'll cancel the job
+					if (!ExceptionUtils.findThrowable(t, JobCancellationException.class).isPresent()) {
 						LOG.warn("Got exception during execution", t);
 						error.f0 = t;
 					}
@@ -1653,7 +1669,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			LOG.info("Found all JMX metrics. Cancelling job.");
 		} finally {
 			// cancel
-			JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+			client.cancel(jobId);
 			// wait for the job to finish (it should due to the cancel command above)
 			jobThread.join();
 		}
@@ -1903,7 +1919,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			catch (Exception e) {
 				LOG.error("Write attempt failed, trying again", e);
 				deleteTestTopic(topicName);
-				JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
+				waitUntilNoJobIsRunning(client);
 				continue;
 			}
 
@@ -1914,7 +1930,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			// we need to validate the sequence, because kafka's producers are not exactly once
 			LOG.info("Validating sequence");
 
-			JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
+			waitUntilNoJobIsRunning(client);
 
 			if (validateSequence(topicName, parallelism, deserSchema, numElements)) {
 				// everything is good!
@@ -1996,7 +2012,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		// we need to validate the sequence, because kafka's producers are not exactly once
 		LOG.info("Validating sequence");
-		JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
+		while (!getRunningJobs(client).isEmpty()){
+			Thread.sleep(50);
+		}
 
 		if (!validateSequence(topicName, parallelism, deserSchema, originalNumElements + numElementsToAppend)) {
 			throw new Exception("Could not append a valid sequence to Kafka.");
@@ -2040,13 +2058,20 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		final AtomicReference<Throwable> errorRef = new AtomicReference<>();
 
+		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(readEnv.getStreamGraph());
+		final JobID jobId = jobGraph.getJobID();
+
 		Thread runner = new Thread() {
 			@Override
 			public void run() {
 				try {
+					client.setDetached(false);
+					client.submitJob(jobGraph, KafkaConsumerTestBase.class.getClassLoader());
 					tryExecute(readEnv, "sequence validation");
 				} catch (Throwable t) {
-					errorRef.set(t);
+					if (!ExceptionUtils.findThrowable(t, SuccessException.class).isPresent()) {
+						errorRef.set(t);
+					}
 				}
 			}
 		};
@@ -2064,7 +2089,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			// did not finish in time, maybe the producer dropped one or more records and
 			// the validation did not reach the exit point
 			success = false;
-			JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+			client.cancel(jobId);
 		}
 		else {
 			Throwable error = errorRef.get();
@@ -2077,7 +2102,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			}
 		}
 
-		JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
+		waitUntilNoJobIsRunning(client);
 
 		return success;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/84ad2cd4/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index f471cd4..697e075 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -24,9 +24,9 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.metrics.jmx.JMXReporter;
 import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
@@ -75,13 +75,17 @@ public abstract class KafkaTestBase extends TestLogger {
 
 	protected static final int TM_SLOTS = 8;
 
-	protected static final int PARALLELISM = NUM_TMS * TM_SLOTS;
-
 	protected static String brokerConnectionStrings;
 
 	protected static Properties standardProps;
 
-	protected static LocalFlinkMiniCluster flink;
+	@ClassRule
+	public static MiniClusterResource flink = new MiniClusterResource(
+		new MiniClusterResource.MiniClusterResourceConfiguration(
+			getFlinkConfiguration(),
+			NUM_TMS,
+			TM_SLOTS),
+		true);
 
 	protected static FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
 
@@ -107,8 +111,6 @@ public abstract class KafkaTestBase extends TestLogger {
 		LOG.info("-------------------------------------------------------------------------");
 
 		startClusters(false, hideKafkaBehindProxy);
-
-		TestStreamEnvironment.setAsContext(flink, PARALLELISM);
 	}
 
 	@AfterClass
@@ -131,8 +133,6 @@ public abstract class KafkaTestBase extends TestLogger {
 		Configuration flinkConfig = new Configuration();
 		flinkConfig.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "5 s");
 		flinkConfig.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "1 s");
-		flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
-		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
 		flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
 		flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
 		flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
@@ -163,18 +163,9 @@ public abstract class KafkaTestBase extends TestLogger {
 			}
 			secureProps = kafkaServer.getSecureProperties();
 		}
-
-		// start also a re-usable Flink mini cluster
-		flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false);
-		flink.start();
 	}
 
 	protected static void shutdownClusters() throws Exception {
-
-		if (flink != null) {
-			flink.stop();
-		}
-
 		if (secureProps != null) {
 			secureProps.clear();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/84ad2cd4/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ClusterCommunicationUtils.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ClusterCommunicationUtils.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ClusterCommunicationUtils.java
new file mode 100644
index 0000000..41f9d1e
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ClusterCommunicationUtils.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.runtime.client.JobStatusMessage;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Utilities for communicating with a cluster through a {@link ClusterClient}.
+ */
+public class ClusterCommunicationUtils {
+
+	public static void waitUntilJobIsRunning(ClusterClient<?> client) throws Exception {
+		while (getRunningJobs(client).isEmpty()) {
+			Thread.sleep(50);
+		}
+	}
+
+	public static void waitUntilNoJobIsRunning(ClusterClient<?> client) throws Exception {
+		while (!getRunningJobs(client).isEmpty()) {
+			Thread.sleep(50);
+		}
+	}
+
+	public static List<JobID> getRunningJobs(ClusterClient<?> client) throws Exception {
+		Collection<JobStatusMessage> statusMessages = client.listJobs().get();
+		return statusMessages.stream()
+			.filter(status -> !status.getJobState().isGloballyTerminalState())
+			.map(JobStatusMessage::getJobId)
+			.collect(Collectors.toList());
+	}
+
+	private ClusterCommunicationUtils() {
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/84ad2cd4/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
deleted file mode 100644
index 9bbe1d3..0000000
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-/**
- * Utilities for communicating with a jobmanager through a {@link ActorGateway}.
- */
-public class JobManagerCommunicationUtils {
-
-	private static final FiniteDuration askTimeout = new FiniteDuration(30, TimeUnit.SECONDS);
-
-	public static void waitUntilNoJobIsRunning(ActorGateway jobManager) throws Exception {
-		while (true) {
-			// find the jobID
-			Future<Object> listResponse = jobManager.ask(
-					JobManagerMessages.getRequestRunningJobsStatus(), askTimeout);
-
-			Object result = Await.result(listResponse, askTimeout);
-			List<JobStatusMessage> jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages();
-
-			if (jobs.isEmpty()) {
-				return;
-			}
-
-			Thread.sleep(50);
-		}
-	}
-
-	public static void waitUntilJobIsRunning(ActorGateway jobManager, String name) throws Exception {
-		while (true) {
-			Future<Object> listResponse = jobManager.ask(
-				JobManagerMessages.getRequestRunningJobsStatus(),
-				askTimeout);
-
-			List<JobStatusMessage> jobs;
-			try {
-				Object result = Await.result(listResponse, askTimeout);
-				jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages();
-			}
-			catch (Exception e) {
-				throw new Exception("Could not wait for job to start - failed to retrieve running jobs from the JobManager.", e);
-			}
-
-			// see if the running jobs contain the requested job
-			for (JobStatusMessage job : jobs) {
-				if (job.getJobName().equals(name)) {
-					return;
-				}
-			}
-
-			Thread.sleep(50);
-		}
-	}
-
-	public static void cancelCurrentJob(ActorGateway jobManager) throws Exception {
-		cancelCurrentJob(jobManager, null);
-	}
-
-	public static void cancelCurrentJob(ActorGateway jobManager, String name) throws Exception {
-		JobStatusMessage status = null;
-
-		for (int i = 0; i < 200; i++) {
-			// find the jobID
-			Future<Object> listResponse = jobManager.ask(
-					JobManagerMessages.getRequestRunningJobsStatus(),
-					askTimeout);
-
-			List<JobStatusMessage> jobs;
-			try {
-				Object result = Await.result(listResponse, askTimeout);
-				jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages();
-			}
-			catch (Exception e) {
-				throw new Exception("Could not cancel job - failed to retrieve running jobs from the JobManager.", e);
-			}
-
-			if (jobs.isEmpty()) {
-				// try again, fall through the loop
-				Thread.sleep(50);
-			}
-			else if (jobs.size() == 1) {
-				status = jobs.get(0);
-			}
-			else if (name != null) {
-				for (JobStatusMessage msg: jobs) {
-					if (msg.getJobName().equals(name)) {
-						status = msg;
-					}
-				}
-				if (status == null) {
-					throw new Exception("Could not cancel job - no job matched expected name = '" + name + "' in " + jobs);
-				}
-			} else {
-				String jobNames = "";
-				for (JobStatusMessage jsm: jobs) {
-					jobNames += jsm.getJobName() + ", ";
-				}
-				throw new Exception("Could not cancel job - more than one running job: " + jobNames);
-			}
-		}
-
-		if (status == null) {
-			throw new Exception("Could not cancel job - no running jobs");
-		}
-		else if (status.getJobState().isGloballyTerminalState()) {
-			throw new Exception("Could not cancel job - job is not running any more");
-		}
-
-		JobID jobId = status.getJobId();
-
-		Future<Object> response = jobManager.ask(new JobManagerMessages.CancelJob(jobId), askTimeout);
-		try {
-			Await.result(response, askTimeout);
-		}
-		catch (Exception e) {
-			throw new Exception("Sending the 'cancel' message failed.", e);
-		}
-	}
-}


[3/7] flink git commit: [FLINK-8835] [taskmanager] Cleanup TaskManager config keys

Posted by ch...@apache.org.
[FLINK-8835] [taskmanager] Cleanup TaskManager config keys

This closes #5808.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9e2581e0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9e2581e0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9e2581e0

Branch: refs/heads/release-1.5
Commit: 9e2581e0443ff47124de41a8cdcd9c18e64b0fab
Parents: c8e0a31
Author: zhangminglei <zm...@163.com>
Authored: Wed Apr 4 17:05:22 2018 +0800
Committer: zentol <ch...@apache.org>
Committed: Fri Apr 6 15:24:03 2018 +0200

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    | 20 +++----
 .../flink/configuration/TaskManagerOptions.java | 55 +++++++++++---------
 .../clusterframework/BootstrapTools.java        |  2 +-
 .../runtime/io/network/netty/NettyConfig.java   |  2 +-
 .../taskexecutor/TaskManagerConfiguration.java  | 16 +++---
 .../flink/runtime/taskmanager/TaskManager.scala |  2 +-
 .../TaskManagerRegistrationTest.java            |  8 +--
 .../runtime/io/InputProcessorUtil.java          |  2 +-
 .../jobmanager/JobManagerFailsITCase.scala      |  4 +-
 9 files changed, 59 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9e2581e0/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index b716d9e..f148360 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -299,7 +299,7 @@ public final class ConfigConstants {
 	/**
 	 * Flag indicating whether to start a thread, which repeatedly logs the memory usage of the JVM.
 	 *
-	 * @deprecated use {@link TaskManagerOptions#DEBUG_MEMORY_USAGE_START_LOG_THREAD} instead
+	 * @deprecated use {@link TaskManagerOptions#DEBUG_MEMORY_LOG} instead
 	 */
 	@Deprecated
 	public static final String TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD = "taskmanager.debug.memory.startLogThread";
@@ -316,7 +316,7 @@ public final class ConfigConstants {
 	 * Defines the maximum time it can take for the TaskManager registration. If the duration is
 	 * exceeded without a successful registration, then the TaskManager terminates.
 	 *
-	 * @deprecated use {@link TaskManagerOptions#MAX_REGISTRATION_DURATION} instead
+	 * @deprecated use {@link TaskManagerOptions#REGISTRATION_TIMEOUT} instead
 	 */
 	@Deprecated
 	public static final String TASK_MANAGER_MAX_REGISTRATION_DURATION = "taskmanager.maxRegistrationDuration";
@@ -325,7 +325,7 @@ public final class ConfigConstants {
 	 * The initial registration pause between two consecutive registration attempts. The pause
 	 * is doubled for each new registration attempt until it reaches the maximum registration pause.
 	 *
-	 * @deprecated use {@link TaskManagerOptions#INITIAL_REGISTRATION_PAUSE} instead
+	 * @deprecated use {@link TaskManagerOptions#INITIAL_REGISTRATION_BACKOFF} instead
 	 */
 	@Deprecated
 	public static final String TASK_MANAGER_INITIAL_REGISTRATION_PAUSE = "taskmanager.initial-registration-pause";
@@ -333,7 +333,7 @@ public final class ConfigConstants {
 	/**
 	 * The maximum registration pause between two consecutive registration attempts.
 	 *
-	 * @deprecated use {@link TaskManagerOptions#MAX_REGISTRATION_PAUSE} instead
+	 * @deprecated use {@link TaskManagerOptions#REGISTRATION_MAX_BACKOFF} instead
 	 */
 	@Deprecated
 	public static final String TASK_MANAGER_MAX_REGISTARTION_PAUSE = "taskmanager.max-registration-pause";
@@ -341,7 +341,7 @@ public final class ConfigConstants {
 	/**
 	 * The pause after a registration has been refused by the job manager before retrying to connect.
 	 *
-	 * @deprecated use {@link TaskManagerOptions#REFUSED_REGISTRATION_PAUSE} instead
+	 * @deprecated use {@link TaskManagerOptions#REFUSED_REGISTRATION_BACKOFF} instead
 	 */
 	@Deprecated
 	public static final String TASK_MANAGER_REFUSED_REGISTRATION_PAUSE = "taskmanager.refused-registration-pause";
@@ -1441,7 +1441,7 @@ public final class ConfigConstants {
 	/**
 	 * Flag indicating whether to start a thread, which repeatedly logs the memory usage of the JVM.
 	 *
-	 * @deprecated use {@link TaskManagerOptions#DEBUG_MEMORY_USAGE_START_LOG_THREAD} instead
+	 * @deprecated use {@link TaskManagerOptions#DEBUG_MEMORY_LOG} instead
 	 */
 	@Deprecated
 	public static final boolean DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD = false;
@@ -1457,7 +1457,7 @@ public final class ConfigConstants {
 	/**
 	 * The default task manager's maximum registration duration.
 	 *
-	 * @deprecated use {@link TaskManagerOptions#MAX_REGISTRATION_DURATION} instead
+	 * @deprecated use {@link TaskManagerOptions#REGISTRATION_TIMEOUT} instead
 	 */
 	@Deprecated
 	public static final String DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION = "Inf";
@@ -1465,7 +1465,7 @@ public final class ConfigConstants {
 	/**
 	 * The default task manager's initial registration pause.
 	 *
-	 * @deprecated use {@link TaskManagerOptions#INITIAL_REGISTRATION_PAUSE} instead
+	 * @deprecated use {@link TaskManagerOptions#INITIAL_REGISTRATION_BACKOFF} instead
 	 */
 	@Deprecated
 	public static final String DEFAULT_TASK_MANAGER_INITIAL_REGISTRATION_PAUSE = "500 ms";
@@ -1473,7 +1473,7 @@ public final class ConfigConstants {
 	/**
 	 * The default task manager's maximum registration pause.
 	 *
-	 * @deprecated use {@link TaskManagerOptions#MAX_REGISTRATION_PAUSE} instead
+	 * @deprecated use {@link TaskManagerOptions#REGISTRATION_MAX_BACKOFF} instead
 	 */
 	@Deprecated
 	public static final String DEFAULT_TASK_MANAGER_MAX_REGISTRATION_PAUSE = "30 s";
@@ -1481,7 +1481,7 @@ public final class ConfigConstants {
 	/**
 	 * The default task manager's refused registration pause.
 	 *
-	 * @deprecated use {@link TaskManagerOptions#REFUSED_REGISTRATION_PAUSE} instead
+	 * @deprecated use {@link TaskManagerOptions#REFUSED_REGISTRATION_BACKOFF} instead
 	 */
 	@Deprecated
 	public static final String DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE = "10 s";

http://git-wip-us.apache.org/repos/asf/flink/blob/9e2581e0/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index c7b0782..2bd3091 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -104,40 +104,44 @@ public class TaskManagerOptions {
 				" global ssl flag " + SecurityOptions.SSL_ENABLED.key() + " is set to true");
 
 	/**
-	 * The initial registration pause between two consecutive registration attempts. The pause
-	 * is doubled for each new registration attempt until it reaches the maximum registration pause.
+	 * The initial registration backoff between two consecutive registration attempts. The backoff
+	 * is doubled for each new registration attempt until it reaches the maximum registration backoff.
 	 */
-	public static final ConfigOption<String> INITIAL_REGISTRATION_PAUSE =
-		key("taskmanager.initial-registration-pause")
+	public static final ConfigOption<String> INITIAL_REGISTRATION_BACKOFF =
+		key("taskmanager.registration.initial-backoff")
 			.defaultValue("500 ms")
-			.withDescription("The initial registration pause between two consecutive registration attempts. The pause" +
-				" is doubled for each new registration attempt until it reaches the maximum registration pause.");
+			.withDeprecatedKeys("taskmanager.initial-registration-pause")
+			.withDescription("The initial registration backoff between two consecutive registration attempts. The backoff" +
+				" is doubled for each new registration attempt until it reaches the maximum registration backoff.");
 
 	/**
-	 * The maximum registration pause between two consecutive registration attempts.
+	 * The maximum registration backoff between two consecutive registration attempts.
 	 */
-	public static final ConfigOption<String> MAX_REGISTRATION_PAUSE =
-		key("taskmanager.max-registration-pause")
+	public static final ConfigOption<String> REGISTRATION_MAX_BACKOFF =
+		key("taskmanager.registration.max-backoff")
 			.defaultValue("30 s")
-			.withDescription("The maximum registration pause between two consecutive registration attempts. The max" +
-				" registration pause requires a time unit specifier (ms/s/min/h/d).");
+			.withDeprecatedKeys("taskmanager.max-registration-pause")
+			.withDescription("The maximum registration backoff between two consecutive registration attempts. The max" +
+				" registration backoff requires a time unit specifier (ms/s/min/h/d).");
 
 	/**
-	 * The pause after a registration has been refused by the job manager before retrying to connect.
+	 * The backoff after a registration has been refused by the job manager before retrying to connect.
 	 */
-	public static final ConfigOption<String> REFUSED_REGISTRATION_PAUSE =
-		key("taskmanager.refused-registration-pause")
+	public static final ConfigOption<String> REFUSED_REGISTRATION_BACKOFF =
+		key("taskmanager.registration.refused-backoff")
 			.defaultValue("10 s")
-			.withDescription("The pause after a registration has been refused by the job manager before retrying to connect.");
+			.withDeprecatedKeys("taskmanager.refused-registration-pause")
+			.withDescription("The backoff after a registration has been refused by the job manager before retrying to connect.");
 
 	/**
-	 * Defines the maximum time it can take for the TaskManager registration. If the duration is
+	 * Defines the timeout it can take for the TaskManager registration. If the duration is
 	 * exceeded without a successful registration, then the TaskManager terminates.
 	 */
-	public static final ConfigOption<String> MAX_REGISTRATION_DURATION =
-		key("taskmanager.maxRegistrationDuration")
+	public static final ConfigOption<String> REGISTRATION_TIMEOUT =
+		key("taskmanager.registration.timeout")
 			.defaultValue("Inf")
-			.withDescription("Defines the maximum time it can take for the TaskManager registration. If the duration is" +
+			.withDeprecatedKeys("taskmanager.maxRegistrationDuration")
+			.withDescription("Defines the timeout for the TaskManager registration. If the duration is" +
 				" exceeded without a successful registration, then the TaskManager terminates.");
 
 	/**
@@ -153,14 +157,16 @@ public class TaskManagerOptions {
 				" is typically proportional to the number of physical CPU cores that the TaskManager's machine has" +
 				" (e.g., equal to the number of cores, or half the number of cores).");
 
-	public static final ConfigOption<Boolean> DEBUG_MEMORY_USAGE_START_LOG_THREAD =
-		key("taskmanager.debug.memory.startLogThread")
+	public static final ConfigOption<Boolean> DEBUG_MEMORY_LOG =
+		key("taskmanager.debug.memory.log")
 			.defaultValue(false)
+			.withDeprecatedKeys("taskmanager.debug.memory.startLogThread")
 			.withDescription("Flag indicating whether to start a thread, which repeatedly logs the memory usage of the JVM.");
 
 	public static final ConfigOption<Long> DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS =
-		key("taskmanager.debug.memory.logIntervalMs")
+		key("taskmanager.debug.memory.log-interval")
 			.defaultValue(5000L)
+			.withDeprecatedKeys("taskmanager.debug.memory.logIntervalMs")
 			.withDescription("The interval (in ms) for the log thread to log the current memory usage.");
 
 	// ------------------------------------------------------------------------
@@ -321,9 +327,10 @@ public class TaskManagerOptions {
 	 * credit-based flow control.
 	 */
 	@Deprecated
-	public static final ConfigOption<Boolean> NETWORK_CREDIT_BASED_FLOW_CONTROL_ENABLED =
-			key("taskmanager.network.credit-based-flow-control.enabled")
+	public static final ConfigOption<Boolean> NETWORK_CREDIT_MODEL =
+			key("taskmanager.network.credit-model")
 			.defaultValue(true)
+			.withDeprecatedKeys("taskmanager.network.credit-based-flow-control.enabled")
 			.withDescription("Boolean flag to enable/disable network credit-based flow control.");
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/9e2581e0/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index eab7382..102274d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -245,7 +245,7 @@ public class BootstrapTools {
 			cfg.setInteger(JobManagerOptions.PORT, jobManagerPort);
 		}
 
-		cfg.setString(TaskManagerOptions.MAX_REGISTRATION_DURATION, registrationTimeout.toString());
+		cfg.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, registrationTimeout.toString());
 		if (numSlots != -1){
 			cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/9e2581e0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
index 8572361..18527c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
@@ -225,7 +225,7 @@ public class NettyConfig {
 	}
 
 	public boolean isCreditBasedEnabled() {
-		return config.getBoolean(TaskManagerOptions.NETWORK_CREDIT_BASED_FLOW_CONTROL_ENABLED);
+		return config.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9e2581e0/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
index cb6fe51..1bf42ee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -185,7 +185,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 		final Time finiteRegistrationDuration;
 
 		try {
-			Duration maxRegistrationDuration = Duration.create(configuration.getString(TaskManagerOptions.MAX_REGISTRATION_DURATION));
+			Duration maxRegistrationDuration = Duration.create(configuration.getString(TaskManagerOptions.REGISTRATION_TIMEOUT));
 			if (maxRegistrationDuration.isFinite()) {
 				finiteRegistrationDuration = Time.milliseconds(maxRegistrationDuration.toMillis());
 			} else {
@@ -193,12 +193,12 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 			}
 		} catch (NumberFormatException e) {
 			throw new IllegalArgumentException("Invalid format for parameter " +
-				TaskManagerOptions.MAX_REGISTRATION_DURATION.key(), e);
+				TaskManagerOptions.REGISTRATION_TIMEOUT.key(), e);
 		}
 
 		final Time initialRegistrationPause;
 		try {
-			Duration pause = Duration.create(configuration.getString(TaskManagerOptions.INITIAL_REGISTRATION_PAUSE));
+			Duration pause = Duration.create(configuration.getString(TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF));
 			if (pause.isFinite()) {
 				initialRegistrationPause = Time.milliseconds(pause.toMillis());
 			} else {
@@ -206,13 +206,13 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 			}
 		} catch (NumberFormatException e) {
 			throw new IllegalArgumentException("Invalid format for parameter " +
-				TaskManagerOptions.INITIAL_REGISTRATION_PAUSE.key(), e);
+				TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF.key(), e);
 		}
 
 		final Time maxRegistrationPause;
 		try {
 			Duration pause = Duration.create(configuration.getString(
-				TaskManagerOptions.MAX_REGISTRATION_PAUSE));
+				TaskManagerOptions.REGISTRATION_MAX_BACKOFF));
 			if (pause.isFinite()) {
 				maxRegistrationPause = Time.milliseconds(pause.toMillis());
 			} else {
@@ -220,12 +220,12 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 			}
 		} catch (NumberFormatException e) {
 			throw new IllegalArgumentException("Invalid format for parameter " +
-				TaskManagerOptions.INITIAL_REGISTRATION_PAUSE.key(), e);
+				TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF.key(), e);
 		}
 
 		final Time refusedRegistrationPause;
 		try {
-			Duration pause = Duration.create(configuration.getString(TaskManagerOptions.REFUSED_REGISTRATION_PAUSE));
+			Duration pause = Duration.create(configuration.getString(TaskManagerOptions.REFUSED_REGISTRATION_BACKOFF));
 			if (pause.isFinite()) {
 				refusedRegistrationPause = Time.milliseconds(pause.toMillis());
 			} else {
@@ -233,7 +233,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 			}
 		} catch (NumberFormatException e) {
 			throw new IllegalArgumentException("Invalid format for parameter " +
-				TaskManagerOptions.INITIAL_REGISTRATION_PAUSE.key(), e);
+				TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF.key(), e);
 		}
 
 		final boolean exitOnOom = configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY);

http://git-wip-us.apache.org/repos/asf/flink/blob/9e2581e0/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 0aaeae3..071a333 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1884,7 +1884,7 @@ object TaskManager {
       // if desired, start the logging daemon that periodically logs the
       // memory usage information
       if (LOG.isInfoEnabled && configuration.getBoolean(
-        TaskManagerOptions.DEBUG_MEMORY_USAGE_START_LOG_THREAD))
+        TaskManagerOptions.DEBUG_MEMORY_LOG))
       {
         LOG.info("Starting periodic memory usage logger")
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9e2581e0/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index 6b65095..ad32a4f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -267,7 +267,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
 			try {
 				// registration timeout of 1 second
 				Configuration tmConfig = new Configuration();
-				tmConfig.setString(TaskManagerOptions.MAX_REGISTRATION_DURATION, "500 ms");
+				tmConfig.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, "500 ms");
 
 				highAvailabilityServices.setJobMasterLeaderRetriever(
 					HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -325,7 +325,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
 
 				FiniteDuration refusedRegistrationPause = new FiniteDuration(500, TimeUnit.MILLISECONDS);
 				Configuration tmConfig = new Configuration(config);
-				tmConfig.setString(TaskManagerOptions.REFUSED_REGISTRATION_PAUSE, refusedRegistrationPause.toString());
+				tmConfig.setString(TaskManagerOptions.REFUSED_REGISTRATION_BACKOFF, refusedRegistrationPause.toString());
 
 				highAvailabilityServices.setJobMasterLeaderRetriever(
 					HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -407,8 +407,8 @@ public class TaskManagerRegistrationTest extends TestLogger {
 				long maxDelay = 30000;
 
 				Configuration tmConfig = new Configuration(config);
-				tmConfig.setString(TaskManagerOptions.REFUSED_REGISTRATION_PAUSE, refusedRegistrationPause + " ms");
-				tmConfig.setString(TaskManagerOptions.INITIAL_REGISTRATION_PAUSE, initialRegistrationPause + " ms");
+				tmConfig.setString(TaskManagerOptions.REFUSED_REGISTRATION_BACKOFF, refusedRegistrationPause + " ms");
+				tmConfig.setString(TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF, initialRegistrationPause + " ms");
 
 				// we make the test actor (the test kit) the JobManager to intercept
 				// the messages

http://git-wip-us.apache.org/repos/asf/flink/blob/9e2581e0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
index 1ae34b3..d1c5b72 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
@@ -51,7 +51,7 @@ public class InputProcessorUtil {
 					+ " must be positive or -1 (infinite)");
 			}
 
-			if (taskManagerConfig.getBoolean(TaskManagerOptions.NETWORK_CREDIT_BASED_FLOW_CONTROL_ENABLED)) {
+			if (taskManagerConfig.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL)) {
 				barrierHandler = new BarrierBuffer(inputGate, new CachedBufferBlocker(inputGate.getPageSize()), maxAlign);
 			} else {
 				barrierHandler = new BarrierBuffer(inputGate, new BufferSpiller(ioManager, inputGate.getPageSize()), maxAlign);

http://git-wip-us.apache.org/repos/asf/flink/blob/9e2581e0/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
index 1a3419b..5fe7b1d 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
@@ -136,8 +136,8 @@ class JobManagerFailsITCase(_system: ActorSystem)
     config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
     config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskmanagers)
     config.setInteger(JobManagerOptions.PORT, 0)
-    config.setString(TaskManagerOptions.INITIAL_REGISTRATION_PAUSE, "50 ms")
-    config.setString(TaskManagerOptions.MAX_REGISTRATION_PAUSE, "100 ms")
+    config.setString(TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF, "50 ms")
+    config.setString(TaskManagerOptions.REGISTRATION_MAX_BACKOFF, "100 ms")
 
     val cluster = new TestingCluster(config, singleActorSystem = false)
 


[4/7] flink git commit: [hotfix][docs] Update maven version in building flink docs

Posted by ch...@apache.org.
[hotfix][docs] Update maven version in building flink docs

This closes #5760.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0ec37e44
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0ec37e44
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0ec37e44

Branch: refs/heads/release-1.5
Commit: 0ec37e44ceb1b39ee3c1b2f2480d52134f1f5b42
Parents: 14f0c56
Author: Bowen Li <bl...@Bowens-MacBook-Pro.local>
Authored: Sat Mar 24 23:11:16 2018 +0800
Committer: zentol <ch...@apache.org>
Committed: Fri Apr 6 15:24:03 2018 +0200

----------------------------------------------------------------------
 README.md              | 2 +-
 docs/start/building.md | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0ec37e44/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 1e00e65..095f652a 100644
--- a/README.md
+++ b/README.md
@@ -69,7 +69,7 @@ Prerequisites for building Flink:
 
 * Unix-like environment (We use Linux, Mac OS X, Cygwin)
 * git
-* Maven (we recommend version 3.0.4)
+* Maven (we recommend version 3.2.5)
 * Java 8
 
 ```

http://git-wip-us.apache.org/repos/asf/flink/blob/0ec37e44/docs/start/building.md
----------------------------------------------------------------------
diff --git a/docs/start/building.md b/docs/start/building.md
index cbf1d1f..568c4b8 100644
--- a/docs/start/building.md
+++ b/docs/start/building.md
@@ -33,7 +33,7 @@ In order to build Flink you need the source code. Either [download the source of
 
 In addition you need **Maven 3** and a **JDK** (Java Development Kit). Flink requires **at least Java 8** to build.
 
-*NOTE: Maven 3.3.x can build Flink, but will not properly shade away certain dependencies. Maven 3.0.3 creates the libraries properly.
+*NOTE: Maven 3.3.x can build Flink, but will not properly shade away certain dependencies. Maven 3.2.5 creates the libraries properly.
 To build unit tests use Java 8u51 or above to prevent failures in unit tests that use the PowerMock runner.*
 
 To clone from git, enter:


[6/7] flink git commit: [FLINK-8742][docs] Move docs generator annotations to flink-annotations

Posted by ch...@apache.org.
[FLINK-8742][docs] Move docs generator annotations to flink-annotations

This closes #5821.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/14f0c569
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/14f0c569
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/14f0c569

Branch: refs/heads/release-1.5
Commit: 14f0c5694ab73ba1fdb6038ce23c10f17d31d8d8
Parents: 9e2581e
Author: zentol <ch...@apache.org>
Authored: Thu Apr 5 12:17:27 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri Apr 6 15:24:03 2018 +0200

----------------------------------------------------------------------
 .../flink/annotation/docs/ConfigGroup.java      | 36 ++++++++++++++++++
 .../flink/annotation/docs/ConfigGroups.java     | 39 ++++++++++++++++++++
 .../apache/flink/configuration/ConfigGroup.java | 36 ------------------
 .../flink/configuration/ConfigGroups.java       | 39 --------------------
 .../apache/flink/configuration/CoreOptions.java |  2 +
 .../configuration/HighAvailabilityOptions.java  |  2 +
 .../configuration/ResourceManagerOptions.java   |  2 +
 .../flink/configuration/SecurityOptions.java    |  2 +
 flink-docs/pom.xml                              |  5 +++
 .../ConfigOptionsDocGenerator.java              |  4 +-
 .../ConfigOptionsDocGeneratorTest.java          |  4 +-
 11 files changed, 92 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/14f0c569/flink-annotations/src/main/java/org/apache/flink/annotation/docs/ConfigGroup.java
----------------------------------------------------------------------
diff --git a/flink-annotations/src/main/java/org/apache/flink/annotation/docs/ConfigGroup.java b/flink-annotations/src/main/java/org/apache/flink/annotation/docs/ConfigGroup.java
new file mode 100644
index 0000000..2b70bd6
--- /dev/null
+++ b/flink-annotations/src/main/java/org/apache/flink/annotation/docs/ConfigGroup.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.annotation.docs;
+
+import org.apache.flink.annotation.Internal;
+
+import java.lang.annotation.Target;
+
+/**
+ * A class that specifies a group of config options. The name of the group will be used as the basis for the
+ * filename of the generated html file, as defined in {@link ConfigOptionsDocGenerator}.
+ *
+ * @see ConfigGroups
+ */
+@Target({})
+@Internal
+public @interface ConfigGroup {
+	String name();
+	String keyPrefix();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/14f0c569/flink-annotations/src/main/java/org/apache/flink/annotation/docs/ConfigGroups.java
----------------------------------------------------------------------
diff --git a/flink-annotations/src/main/java/org/apache/flink/annotation/docs/ConfigGroups.java b/flink-annotations/src/main/java/org/apache/flink/annotation/docs/ConfigGroups.java
new file mode 100644
index 0000000..53bf856
--- /dev/null
+++ b/flink-annotations/src/main/java/org/apache/flink/annotation/docs/ConfigGroups.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.annotation.docs;
+
+import org.apache.flink.annotation.Internal;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation used on classes containing config optionss that enables the separation of options into different
+ * tables based on key prefixes. A config option is assigned to a {@link ConfigGroup} if the option key matches
+ * the group prefix. If a key matches multiple prefixes the longest matching prefix takes priority. An option is never
+ * assigned to multiple groups. Options that don't match any group are implicitly added to a default group.
+ */
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+@Internal
+public @interface ConfigGroups {
+	ConfigGroup[] groups() default {};
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/14f0c569/flink-core/src/main/java/org/apache/flink/configuration/ConfigGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigGroup.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigGroup.java
deleted file mode 100644
index 3cd1d7f..0000000
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigGroup.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.configuration;
-
-import org.apache.flink.annotation.Internal;
-
-import java.lang.annotation.Target;
-
-/**
- * A class that specifies a group of {@link ConfigOption}. The name of the group will be used as the basis for the
- * filename of the generated html file, as defined in {@link ConfigOptionsDocGenerator}.
- *
- * @see ConfigGroups
- */
-@Target({})
-@Internal
-public @interface ConfigGroup {
-	String name();
-	String keyPrefix();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/14f0c569/flink-core/src/main/java/org/apache/flink/configuration/ConfigGroups.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigGroups.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigGroups.java
deleted file mode 100644
index 94e52ee..0000000
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigGroups.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.configuration;
-
-import org.apache.flink.annotation.Internal;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-/**
- * Annotation used on classes containing {@link ConfigOption}s that enables the separation of options into different
- * tables based on key prefixes. A {@link ConfigOption} is assigned to a {@link ConfigGroup} if the option key matches
- * the group prefix. If a key matches multiple prefixes the longest matching prefix takes priority. An option is never
- * assigned to multiple groups. Options that don't match any group are implicitly added to a default group.
- */
-@Target(ElementType.TYPE)
-@Retention(RetentionPolicy.RUNTIME)
-@Internal
-public @interface ConfigGroups {
-	ConfigGroup[] groups() default {};
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/14f0c569/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index 4f8c074..343e2d2 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -19,6 +19,8 @@
 package org.apache.flink.configuration;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.ConfigGroup;
+import org.apache.flink.annotation.docs.ConfigGroups;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/14f0c569/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
index 8ef605b..f9d4ad9 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
@@ -19,6 +19,8 @@
 package org.apache.flink.configuration;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.ConfigGroup;
+import org.apache.flink.annotation.docs.ConfigGroups;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/14f0c569/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
index dfaf89f..4ce4981 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
@@ -19,6 +19,8 @@
 package org.apache.flink.configuration;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.ConfigGroup;
+import org.apache.flink.annotation.docs.ConfigGroups;
 
 /**
  * The set of configuration options relating to the ResourceManager.

http://git-wip-us.apache.org/repos/asf/flink/blob/14f0c569/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
index 17ff622..0f25c6c 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
@@ -19,6 +19,8 @@
 package org.apache.flink.configuration;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.ConfigGroup;
+import org.apache.flink.annotation.docs.ConfigGroups;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/14f0c569/flink-docs/pom.xml
----------------------------------------------------------------------
diff --git a/flink-docs/pom.xml b/flink-docs/pom.xml
index 94716ab..ad53007 100644
--- a/flink-docs/pom.xml
+++ b/flink-docs/pom.xml
@@ -35,6 +35,11 @@ under the License.
 	<dependencies>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-annotations</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-core</artifactId>
 			<version>${project.version}</version>
 		</dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/14f0c569/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
----------------------------------------------------------------------
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
index 93112af..3bec09c 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
@@ -19,9 +19,9 @@
 package org.apache.flink.docs.configuration;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.annotation.docs.ConfigGroup;
+import org.apache.flink.annotation.docs.ConfigGroups;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigGroup;
-import org.apache.flink.configuration.ConfigGroups;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.WebOptions;

http://git-wip-us.apache.org/repos/asf/flink/blob/14f0c569/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocGeneratorTest.java b/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocGeneratorTest.java
index c8969f2..ba30617 100644
--- a/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocGeneratorTest.java
+++ b/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocGeneratorTest.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.docs.configuration;
 
+import org.apache.flink.annotation.docs.ConfigGroup;
+import org.apache.flink.annotation.docs.ConfigGroups;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigGroup;
-import org.apache.flink.configuration.ConfigGroups;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 


[2/7] flink git commit: [hotfix][tests] Remove Kafka testFailOnDeploy test

Posted by ch...@apache.org.
[hotfix][tests] Remove Kafka testFailOnDeploy test


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f4e0cdf6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f4e0cdf6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f4e0cdf6

Branch: refs/heads/release-1.5
Commit: f4e0cdf6d82d4acbbf91a45c58c72dd48822833a
Parents: ff3a0ec
Author: zentol <ch...@apache.org>
Authored: Wed Mar 7 13:38:03 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Fri Apr 6 15:24:02 2018 +0200

----------------------------------------------------------------------
 .../connectors/kafka/Kafka010ITCase.java        |  6 ---
 .../connectors/kafka/Kafka011ITCase.java        |  5 --
 .../connectors/kafka/Kafka08ITCase.java         |  5 --
 .../connectors/kafka/Kafka09ITCase.java         |  5 --
 .../connectors/kafka/KafkaConsumerTestBase.java | 49 --------------------
 5 files changed, 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f4e0cdf6/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
index 06f627d..a038c8e 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
@@ -81,12 +81,6 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
 	public void testCancelingFullTopic() throws Exception {
 		runCancelingOnFullInputTest();
 	}
-
-	@Test(timeout = 60000)
-	public void testFailOnDeploy() throws Exception {
-		runFailOnDeployTest();
-	}
-
 	// --- source to partition mappings and exactly once ---
 
 	@Test(timeout = 60000)

http://git-wip-us.apache.org/repos/asf/flink/blob/f4e0cdf6/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
index fd6eb61..f48f87a 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
@@ -90,11 +90,6 @@ public class Kafka011ITCase extends KafkaConsumerTestBase {
 		runCancelingOnFullInputTest();
 	}
 
-	@Test(timeout = 60000)
-	public void testFailOnDeploy() throws Exception {
-		runFailOnDeployTest();
-	}
-
 	// --- source to partition mappings and exactly once ---
 
 	@Test(timeout = 60000)

http://git-wip-us.apache.org/repos/asf/flink/blob/f4e0cdf6/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index 6abccde..5af219e 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -75,11 +75,6 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 	}
 
 	@Test(timeout = 60000)
-	public void testFailOnDeploy() throws Exception {
-		runFailOnDeployTest();
-	}
-
-	@Test(timeout = 60000)
 	public void testInvalidOffset() throws Exception {
 
 		final int parallelism = 1;

http://git-wip-us.apache.org/repos/asf/flink/blob/f4e0cdf6/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
index 3594854..f022c8e 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
@@ -55,11 +55,6 @@ public class Kafka09ITCase extends KafkaConsumerTestBase {
 		runCancelingOnFullInputTest();
 	}
 
-	@Test(timeout = 60000)
-	public void testFailOnDeploy() throws Exception {
-		runFailOnDeployTest();
-	}
-
 	// --- source to partition mappings and exactly once ---
 
 	@Test(timeout = 60000)

http://git-wip-us.apache.org/repos/asf/flink/blob/f4e0cdf6/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index f07c0bb..f9d745e 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -1113,55 +1113,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	}
 
 	/**
-	 * Tests that the source can be properly canceled when reading full partitions.
-	 */
-	public void runFailOnDeployTest() throws Exception {
-		final String topic = "failOnDeployTopic";
-
-		createTestTopic(topic, 2, 1);
-
-		DeserializationSchema<Integer> schema =
-				new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(12); // needs to be more that the mini cluster has slots
-		env.getConfig().disableSysoutLogging();
-
-		Properties props = new Properties();
-		props.putAll(standardProps);
-		props.putAll(secureProps);
-		FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props);
-
-		env
-				.addSource(kafkaSource)
-				.addSink(new DiscardingSink<Integer>());
-
-		try {
-			env.execute("test fail on deploy");
-			fail("this test should fail with an exception");
-		}
-		catch (JobExecutionException e) {
-
-			// validate that we failed due to a NoResourceAvailableException
-			Throwable cause = e.getCause();
-			int depth = 0;
-			boolean foundResourceException = false;
-
-			while (cause != null && depth++ < 20) {
-				if (cause instanceof NoResourceAvailableException) {
-					foundResourceException = true;
-					break;
-				}
-				cause = cause.getCause();
-			}
-
-			assertTrue("Wrong exception", foundResourceException);
-		}
-
-		deleteTestTopic(topic);
-	}
-
-	/**
 	 * Test producing and consuming into multiple topics.
 	 * @throws Exception
 	 */