You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/06 13:23:46 UTC

[1/7] flink git commit: [FLINK-8837][annotations] Add @Experimental annotation and annotate some classes

Repository: flink
Updated Branches:
  refs/heads/master bc9982c36 -> 022cd628c


[FLINK-8837][annotations] Add @Experimental annotation and annotate some classes

This closes #5800.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cdd20221
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cdd20221
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cdd20221

Branch: refs/heads/master
Commit: cdd202217449cc7db7abc3c9a9c8f29f6c463c18
Parents: 7d6afed
Author: Bowen Li <bo...@gmail.com>
Authored: Mon Apr 2 00:59:34 2018 -0700
Committer: zentol <ch...@apache.org>
Committed: Fri Apr 6 15:23:01 2018 +0200

----------------------------------------------------------------------
 .../apache/flink/annotation/Experimental.java   | 40 ++++++++++++++++++++
 .../api/datastream/DataStreamUtils.java         |  7 +---
 .../streaming/experimental/CollectSink.java     |  4 +-
 .../experimental/SocketStreamIterator.java      |  4 +-
 .../streaming/api/scala/DataStreamUtils.scala   |  4 +-
 5 files changed, 48 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cdd20221/flink-annotations/src/main/java/org/apache/flink/annotation/Experimental.java
----------------------------------------------------------------------
diff --git a/flink-annotations/src/main/java/org/apache/flink/annotation/Experimental.java b/flink-annotations/src/main/java/org/apache/flink/annotation/Experimental.java
new file mode 100644
index 0000000..7ddcb1a
--- /dev/null
+++ b/flink-annotations/src/main/java/org/apache/flink/annotation/Experimental.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.annotation;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation to mark classes for experimental use.
+ *
+ * <p>Classes with this annotation are neither battle-tested nor stable, and may be changed or removed
+ * in future versions.
+ *
+ * <p>This annotation also excludes classes with evolving interfaces / signatures
+ * annotated with {@link Public} and {@link PublicEvolving}.
+ *
+ */
+@Documented
+@Target(ElementType.TYPE)
+@Public
+public @interface Experimental {
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cdd20221/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java
index 351c456..32271b6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.api.datastream;
 
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -39,11 +39,8 @@ import java.util.Iterator;
 
 /**
  * A collection of utilities for {@link DataStream DataStreams}.
- *
- * <p>This experimental class is relocated from flink-streaming-contrib. Please see package-info.java
- * for more information.
  */
-@PublicEvolving
+@Experimental
 public final class DataStreamUtils {
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/cdd20221/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/CollectSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/CollectSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/CollectSink.java
index 981f8e1..195d705 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/CollectSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/CollectSink.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.experimental;
 
-import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
@@ -34,7 +34,7 @@ import java.net.Socket;
  * <p>This experimental class is relocated from flink-streaming-contrib. Please see package-info.java
  * for more information.
  */
-@Internal
+@Experimental
 public class CollectSink<IN> extends RichSinkFunction<IN> {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/cdd20221/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java
index 62d46e1..bc730cb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.experimental;
 
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 
@@ -41,7 +41,7 @@ import java.util.NoSuchElementException;
  *
  * @param <T> The type of elements returned from the iterator.
  */
-@PublicEvolving
+@Experimental
 public class SocketStreamIterator<T> implements Iterator<T> {
 
 	/** Server socket to listen at. */

http://git-wip-us.apache.org/repos/asf/flink/blob/cdd20221/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStreamUtils.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStreamUtils.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStreamUtils.scala
index 0f42aa2..fd48f9c 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStreamUtils.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStreamUtils.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.api.scala
 
-import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.annotation.Experimental
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.datastream.{DataStreamUtils => JavaStreamUtils}
 
@@ -33,7 +33,7 @@ import scala.reflect.ClassTag
   *
   * @param self DataStream
   */
-@PublicEvolving
+@Experimental
 class DataStreamUtils[T: TypeInformation : ClassTag](val self: DataStream[T]) {
 
   /**


[4/7] flink git commit: [FLINK-8703][tests] Port KafkaTestBase to MiniClusterResource

Posted by ch...@apache.org.
[FLINK-8703][tests] Port KafkaTestBase to MiniClusterResource

This closes #5669.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7d6afed8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7d6afed8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7d6afed8

Branch: refs/heads/master
Commit: 7d6afed83e370c8e36f6a690704041417e580498
Parents: 4c337dc
Author: zentol <ch...@apache.org>
Authored: Wed Mar 7 13:39:25 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Fri Apr 6 15:23:01 2018 +0200

----------------------------------------------------------------------
 .../connectors/kafka/KafkaConsumerTestBase.java | 219 +++++++++++--------
 .../connectors/kafka/KafkaTestBase.java         |  25 +--
 .../testutils/ClusterCommunicationUtils.java    |  56 +++++
 .../testutils/JobManagerCommunicationUtils.java | 147 -------------
 4 files changed, 186 insertions(+), 261 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7d6afed8/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 959d6f1..6ed9143 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
@@ -34,6 +35,7 @@ import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.DataInputView;
@@ -42,7 +44,8 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.client.JobCancellationException;
 import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -54,11 +57,11 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators;
 import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
-import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
 import org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidatingMapper;
 import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper;
 import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2FlinkPartitioner;
@@ -72,6 +75,9 @@ import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.testutils.junit.RetryOnException;
 import org.apache.flink.testutils.junit.RetryRule;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
 
 import kafka.consumer.Consumer;
 import kafka.consumer.ConsumerConfig;
@@ -106,10 +112,12 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.flink.streaming.connectors.kafka.testutils.ClusterCommunicationUtils.getRunningJobs;
+import static org.apache.flink.streaming.connectors.kafka.testutils.ClusterCommunicationUtils.waitUntilJobIsRunning;
+import static org.apache.flink.streaming.connectors.kafka.testutils.ClusterCommunicationUtils.waitUntilNoJobIsRunning;
 import static org.apache.flink.test.util.TestUtils.tryExecute;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -123,6 +131,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	@Rule
 	public RetryRule retryRule = new RetryRule();
 
+	private ClusterClient<?> client;
+
 	// ------------------------------------------------------------------------
 	//  Common Test Preparation
 	// ------------------------------------------------------------------------
@@ -132,8 +142,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	 * the same mini cluster. Otherwise, missing slots may happen.
 	 */
 	@Before
-	public void ensureNoJobIsLingering() throws Exception {
-		JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
+	public void setClientAndEnsureNoJobIsLingering() throws Exception {
+		client = flink.getClusterClient();
+		waitUntilNoJobIsRunning(client);
 	}
 
 	// ------------------------------------------------------------------------
@@ -244,7 +255,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		while (System.nanoTime() < deadline);
 
 		// cancel the job & wait for the job to finish
-		JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+		client.cancel(Iterables.getOnlyElement(getRunningJobs(client)));
 		runner.join();
 
 		final Throwable t = errorRef.get();
@@ -330,7 +341,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		while (System.nanoTime() < deadline);
 
 		// cancel the job & wait for the job to finish
-		JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+		client.cancel(Iterables.getOnlyElement(getRunningJobs(client)));
 		runner.join();
 
 		final Throwable t = errorRef.get();
@@ -443,14 +454,18 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			}).setParallelism(1)
 			.addSink(new DiscardingSink<>());
 
+		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+		final JobID consumeJobId = jobGraph.getJobID();
+
 		final AtomicReference<Throwable> error = new AtomicReference<>();
 		Thread consumeThread = new Thread(new Runnable() {
 			@Override
 			public void run() {
 				try {
-					env.execute(consumeExtraRecordsJobName);
+					client.setDetached(false);
+					client.submitJob(jobGraph, KafkaConsumerTestBase.class.getClassLoader());
 				} catch (Throwable t) {
-					if (!(t instanceof JobCancellationException)) {
+					if (!ExceptionUtils.findThrowable(t, JobCancellationException.class).isPresent()) {
 						error.set(t);
 					}
 				}
@@ -459,9 +474,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		consumeThread.start();
 
 		// wait until the consuming job has started, to be extra safe
-		JobManagerCommunicationUtils.waitUntilJobIsRunning(
-			flink.getLeaderGateway(timeout),
-			consumeExtraRecordsJobName);
+		waitUntilJobIsRunning(client);
 
 		// setup the extra records writing job
 		final StreamExecutionEnvironment env2 = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -500,9 +513,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		}
 
 		// cancel the consume job after all extra records are written
-		JobManagerCommunicationUtils.cancelCurrentJob(
-			flink.getLeaderGateway(timeout),
-			consumeExtraRecordsJobName);
+		client.cancel(consumeJobId);
 		consumeThread.join();
 
 		kafkaOffsetHandler.close();
@@ -989,23 +1000,27 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		final AtomicReference<Throwable> jobError = new AtomicReference<>();
 
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(parallelism);
+		env.enableCheckpointing(100);
+		env.getConfig().disableSysoutLogging();
+
+		Properties props = new Properties();
+		props.putAll(standardProps);
+		props.putAll(secureProps);
+		FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), props);
+
+		env.addSource(source).addSink(new DiscardingSink<String>());
+
+		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+		final JobID jobId = jobGraph.getJobID();
+
 		final Runnable jobRunner = new Runnable() {
 			@Override
 			public void run() {
 				try {
-					final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-					env.setParallelism(parallelism);
-					env.enableCheckpointing(100);
-					env.getConfig().disableSysoutLogging();
-
-					Properties props = new Properties();
-					props.putAll(standardProps);
-					props.putAll(secureProps);
-					FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), props);
-
-					env.addSource(source).addSink(new DiscardingSink<String>());
-
-					env.execute("Runner for CancelingOnFullInputTest");
+					client.setDetached(false);
+					client.submitJob(jobGraph, KafkaConsumerTestBase.class.getClassLoader());
 				}
 				catch (Throwable t) {
 					jobError.set(t);
@@ -1026,14 +1041,12 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		}
 
 		// cancel
-		JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout), "Runner for CancelingOnFullInputTest");
+		client.cancel(jobId);
 
 		// wait for the program to be done and validate that we failed with the right exception
 		runnerThread.join();
 
-		failueCause = jobError.get();
-		assertNotNull("program did not fail properly due to canceling", failueCause);
-		assertTrue(failueCause.getMessage().contains("Job was cancelled"));
+		assertEquals(JobStatus.CANCELED, client.getJobStatus(jobId).get());
 
 		if (generator.isAlive()) {
 			generator.shutdown();
@@ -1063,23 +1076,27 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		final AtomicReference<Throwable> error = new AtomicReference<>();
 
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(parallelism);
+		env.enableCheckpointing(100);
+		env.getConfig().disableSysoutLogging();
+
+		Properties props = new Properties();
+		props.putAll(standardProps);
+		props.putAll(secureProps);
+		FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), props);
+
+		env.addSource(source).addSink(new DiscardingSink<String>());
+
+		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+		final JobID jobId = jobGraph.getJobID();
+
 		final Runnable jobRunner = new Runnable() {
 			@Override
 			public void run() {
 				try {
-					final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-					env.setParallelism(parallelism);
-					env.enableCheckpointing(100);
-					env.getConfig().disableSysoutLogging();
-
-					Properties props = new Properties();
-					props.putAll(standardProps);
-					props.putAll(secureProps);
-					FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), props);
-
-					env.addSource(source).addSink(new DiscardingSink<String>());
-
-					env.execute("CancelingOnEmptyInputTest");
+					client.setDetached(false);
+					client.submitJob(jobGraph, KafkaConsumerTestBase.class.getClassLoader());
 				}
 				catch (Throwable t) {
 					LOG.error("Job Runner failed with exception", t);
@@ -1100,14 +1117,12 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			Assert.fail("Test failed prematurely with: " + failueCause.getMessage());
 		}
 		// cancel
-		JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+		client.cancel(jobId);
 
 		// wait for the program to be done and validate that we failed with the right exception
 		runnerThread.join();
 
-		failueCause = error.get();
-		assertNotNull("program did not fail properly due to canceling", failueCause);
-		assertTrue(failueCause.getMessage().contains("Job was cancelled"));
+		assertEquals(JobStatus.CANCELED, client.getJobStatus(jobId).get());
 
 		deleteTestTopic(topic);
 	}
@@ -1558,52 +1573,53 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		createTestTopic(topic, 5, 1);
 
 		final Tuple1<Throwable> error = new Tuple1<>(null);
-		Runnable job = new Runnable() {
+
+		// start job writing & reading data.
+		final StreamExecutionEnvironment env1 = StreamExecutionEnvironment.getExecutionEnvironment();
+		env1.setParallelism(1);
+		env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+		env1.getConfig().disableSysoutLogging();
+		env1.disableOperatorChaining(); // let the source read everything into the network buffers
+
+		TypeInformationSerializationSchema<Tuple2<Integer, Integer>> schema = new TypeInformationSerializationSchema<>(TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>"), env1.getConfig());
+		DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, standardProps));
+		fromKafka.flatMap(new FlatMapFunction<Tuple2<Integer, Integer>, Void>() {
 			@Override
-			public void run() {
-				try {
-					// start job writing & reading data.
-					final StreamExecutionEnvironment env1 = StreamExecutionEnvironment.getExecutionEnvironment();
-					env1.setParallelism(1);
-					env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-					env1.getConfig().disableSysoutLogging();
-					env1.disableOperatorChaining(); // let the source read everything into the network buffers
-
-					Properties props = new Properties();
-					props.putAll(standardProps);
-					props.putAll(secureProps);
-
-					TypeInformationSerializationSchema<Tuple2<Integer, Integer>> schema = new TypeInformationSerializationSchema<>(TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>"), env1.getConfig());
-					DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, standardProps));
-					fromKafka.flatMap(new FlatMapFunction<Tuple2<Integer, Integer>, Void>() {
-						@Override
-						public void flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws Exception {// no op
-						}
-					});
-
-					DataStream<Tuple2<Integer, Integer>> fromGen = env1.addSource(new RichSourceFunction<Tuple2<Integer, Integer>>() {
-						boolean running = true;
-
-						@Override
-						public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
-							int i = 0;
-							while (running) {
-								ctx.collect(Tuple2.of(i++, getRuntimeContext().getIndexOfThisSubtask()));
-								Thread.sleep(1);
-							}
-						}
+			public void flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws Exception {// no op
+			}
+		});
 
-						@Override
-						public void cancel() {
-							running = false;
-						}
-					});
+		DataStream<Tuple2<Integer, Integer>> fromGen = env1.addSource(new RichSourceFunction<Tuple2<Integer, Integer>>() {
+			boolean running = true;
+
+			@Override
+			public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
+				int i = 0;
+				while (running) {
+					ctx.collect(Tuple2.of(i++, getRuntimeContext().getIndexOfThisSubtask()));
+					Thread.sleep(1);
+				}
+			}
+
+			@Override
+			public void cancel() {
+				running = false;
+			}
+		});
+
+		kafkaServer.produceIntoKafka(fromGen, topic, new KeyedSerializationSchemaWrapper<>(schema), standardProps, null);
 
-					kafkaServer.produceIntoKafka(fromGen, topic, new KeyedSerializationSchemaWrapper<>(schema), standardProps, null);
+		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env1.getStreamGraph());
+		final JobID jobId = jobGraph.getJobID();
 
-					env1.execute("Metrics test job");
+		Runnable job = new Runnable() {
+			@Override
+			public void run() {
+				try {
+					client.setDetached(false);
+					client.submitJob(jobGraph, KafkaConsumerTestBase.class.getClassLoader());
 				} catch (Throwable t) {
-					if (!(t instanceof JobCancellationException)) { // we'll cancel the job
+					if (!ExceptionUtils.findThrowable(t, JobCancellationException.class).isPresent()) {
 						LOG.warn("Got exception during execution", t);
 						error.f0 = t;
 					}
@@ -1653,7 +1669,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			LOG.info("Found all JMX metrics. Cancelling job.");
 		} finally {
 			// cancel
-			JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+			client.cancel(jobId);
 			// wait for the job to finish (it should due to the cancel command above)
 			jobThread.join();
 		}
@@ -1903,7 +1919,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			catch (Exception e) {
 				LOG.error("Write attempt failed, trying again", e);
 				deleteTestTopic(topicName);
-				JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
+				waitUntilNoJobIsRunning(client);
 				continue;
 			}
 
@@ -1914,7 +1930,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			// we need to validate the sequence, because kafka's producers are not exactly once
 			LOG.info("Validating sequence");
 
-			JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
+			waitUntilNoJobIsRunning(client);
 
 			if (validateSequence(topicName, parallelism, deserSchema, numElements)) {
 				// everything is good!
@@ -1996,7 +2012,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		// we need to validate the sequence, because kafka's producers are not exactly once
 		LOG.info("Validating sequence");
-		JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
+		while (!getRunningJobs(client).isEmpty()){
+			Thread.sleep(50);
+		}
 
 		if (!validateSequence(topicName, parallelism, deserSchema, originalNumElements + numElementsToAppend)) {
 			throw new Exception("Could not append a valid sequence to Kafka.");
@@ -2040,13 +2058,20 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		final AtomicReference<Throwable> errorRef = new AtomicReference<>();
 
+		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(readEnv.getStreamGraph());
+		final JobID jobId = jobGraph.getJobID();
+
 		Thread runner = new Thread() {
 			@Override
 			public void run() {
 				try {
+					client.setDetached(false);
+					client.submitJob(jobGraph, KafkaConsumerTestBase.class.getClassLoader());
 					tryExecute(readEnv, "sequence validation");
 				} catch (Throwable t) {
-					errorRef.set(t);
+					if (!ExceptionUtils.findThrowable(t, SuccessException.class).isPresent()) {
+						errorRef.set(t);
+					}
 				}
 			}
 		};
@@ -2064,7 +2089,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			// did not finish in time, maybe the producer dropped one or more records and
 			// the validation did not reach the exit point
 			success = false;
-			JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+			client.cancel(jobId);
 		}
 		else {
 			Throwable error = errorRef.get();
@@ -2077,7 +2102,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			}
 		}
 
-		JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
+		waitUntilNoJobIsRunning(client);
 
 		return success;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/7d6afed8/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index f471cd4..697e075 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -24,9 +24,9 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.metrics.jmx.JMXReporter;
 import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
@@ -75,13 +75,17 @@ public abstract class KafkaTestBase extends TestLogger {
 
 	protected static final int TM_SLOTS = 8;
 
-	protected static final int PARALLELISM = NUM_TMS * TM_SLOTS;
-
 	protected static String brokerConnectionStrings;
 
 	protected static Properties standardProps;
 
-	protected static LocalFlinkMiniCluster flink;
+	@ClassRule
+	public static MiniClusterResource flink = new MiniClusterResource(
+		new MiniClusterResource.MiniClusterResourceConfiguration(
+			getFlinkConfiguration(),
+			NUM_TMS,
+			TM_SLOTS),
+		true);
 
 	protected static FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
 
@@ -107,8 +111,6 @@ public abstract class KafkaTestBase extends TestLogger {
 		LOG.info("-------------------------------------------------------------------------");
 
 		startClusters(false, hideKafkaBehindProxy);
-
-		TestStreamEnvironment.setAsContext(flink, PARALLELISM);
 	}
 
 	@AfterClass
@@ -131,8 +133,6 @@ public abstract class KafkaTestBase extends TestLogger {
 		Configuration flinkConfig = new Configuration();
 		flinkConfig.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "5 s");
 		flinkConfig.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "1 s");
-		flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
-		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
 		flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
 		flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
 		flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
@@ -163,18 +163,9 @@ public abstract class KafkaTestBase extends TestLogger {
 			}
 			secureProps = kafkaServer.getSecureProperties();
 		}
-
-		// start also a re-usable Flink mini cluster
-		flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false);
-		flink.start();
 	}
 
 	protected static void shutdownClusters() throws Exception {
-
-		if (flink != null) {
-			flink.stop();
-		}
-
 		if (secureProps != null) {
 			secureProps.clear();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/7d6afed8/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ClusterCommunicationUtils.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ClusterCommunicationUtils.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ClusterCommunicationUtils.java
new file mode 100644
index 0000000..41f9d1e
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ClusterCommunicationUtils.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.runtime.client.JobStatusMessage;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Utilities for communicating with a cluster through a {@link ClusterClient}.
+ */
+public class ClusterCommunicationUtils {
+
+	public static void waitUntilJobIsRunning(ClusterClient<?> client) throws Exception {
+		while (getRunningJobs(client).isEmpty()) {
+			Thread.sleep(50);
+		}
+	}
+
+	public static void waitUntilNoJobIsRunning(ClusterClient<?> client) throws Exception {
+		while (!getRunningJobs(client).isEmpty()) {
+			Thread.sleep(50);
+		}
+	}
+
+	public static List<JobID> getRunningJobs(ClusterClient<?> client) throws Exception {
+		Collection<JobStatusMessage> statusMessages = client.listJobs().get();
+		return statusMessages.stream()
+			.filter(status -> !status.getJobState().isGloballyTerminalState())
+			.map(JobStatusMessage::getJobId)
+			.collect(Collectors.toList());
+	}
+
+	private ClusterCommunicationUtils() {
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7d6afed8/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
deleted file mode 100644
index 9bbe1d3..0000000
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-/**
- * Utilities for communicating with a jobmanager through a {@link ActorGateway}.
- */
-public class JobManagerCommunicationUtils {
-
-	private static final FiniteDuration askTimeout = new FiniteDuration(30, TimeUnit.SECONDS);
-
-	public static void waitUntilNoJobIsRunning(ActorGateway jobManager) throws Exception {
-		while (true) {
-			// find the jobID
-			Future<Object> listResponse = jobManager.ask(
-					JobManagerMessages.getRequestRunningJobsStatus(), askTimeout);
-
-			Object result = Await.result(listResponse, askTimeout);
-			List<JobStatusMessage> jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages();
-
-			if (jobs.isEmpty()) {
-				return;
-			}
-
-			Thread.sleep(50);
-		}
-	}
-
-	public static void waitUntilJobIsRunning(ActorGateway jobManager, String name) throws Exception {
-		while (true) {
-			Future<Object> listResponse = jobManager.ask(
-				JobManagerMessages.getRequestRunningJobsStatus(),
-				askTimeout);
-
-			List<JobStatusMessage> jobs;
-			try {
-				Object result = Await.result(listResponse, askTimeout);
-				jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages();
-			}
-			catch (Exception e) {
-				throw new Exception("Could not wait for job to start - failed to retrieve running jobs from the JobManager.", e);
-			}
-
-			// see if the running jobs contain the requested job
-			for (JobStatusMessage job : jobs) {
-				if (job.getJobName().equals(name)) {
-					return;
-				}
-			}
-
-			Thread.sleep(50);
-		}
-	}
-
-	public static void cancelCurrentJob(ActorGateway jobManager) throws Exception {
-		cancelCurrentJob(jobManager, null);
-	}
-
-	public static void cancelCurrentJob(ActorGateway jobManager, String name) throws Exception {
-		JobStatusMessage status = null;
-
-		for (int i = 0; i < 200; i++) {
-			// find the jobID
-			Future<Object> listResponse = jobManager.ask(
-					JobManagerMessages.getRequestRunningJobsStatus(),
-					askTimeout);
-
-			List<JobStatusMessage> jobs;
-			try {
-				Object result = Await.result(listResponse, askTimeout);
-				jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages();
-			}
-			catch (Exception e) {
-				throw new Exception("Could not cancel job - failed to retrieve running jobs from the JobManager.", e);
-			}
-
-			if (jobs.isEmpty()) {
-				// try again, fall through the loop
-				Thread.sleep(50);
-			}
-			else if (jobs.size() == 1) {
-				status = jobs.get(0);
-			}
-			else if (name != null) {
-				for (JobStatusMessage msg: jobs) {
-					if (msg.getJobName().equals(name)) {
-						status = msg;
-					}
-				}
-				if (status == null) {
-					throw new Exception("Could not cancel job - no job matched expected name = '" + name + "' in " + jobs);
-				}
-			} else {
-				String jobNames = "";
-				for (JobStatusMessage jsm: jobs) {
-					jobNames += jsm.getJobName() + ", ";
-				}
-				throw new Exception("Could not cancel job - more than one running job: " + jobNames);
-			}
-		}
-
-		if (status == null) {
-			throw new Exception("Could not cancel job - no running jobs");
-		}
-		else if (status.getJobState().isGloballyTerminalState()) {
-			throw new Exception("Could not cancel job - job is not running any more");
-		}
-
-		JobID jobId = status.getJobId();
-
-		Future<Object> response = jobManager.ask(new JobManagerMessages.CancelJob(jobId), askTimeout);
-		try {
-			Await.result(response, askTimeout);
-		}
-		catch (Exception e) {
-			throw new Exception("Sending the 'cancel' message failed.", e);
-		}
-	}
-}


[6/7] flink git commit: [FLINK-8742][docs] Move docs generator annotations to flink-annotations

Posted by ch...@apache.org.
[FLINK-8742][docs] Move docs generator annotations to flink-annotations

This closes #5821.
This closes #5787.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/47ac3684
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/47ac3684
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/47ac3684

Branch: refs/heads/master
Commit: 47ac3684a195eef5ce319f3e1ac3f966d98b418c
Parents: 8b0f590
Author: zentol <ch...@apache.org>
Authored: Thu Apr 5 12:17:27 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri Apr 6 15:23:02 2018 +0200

----------------------------------------------------------------------
 .../flink/annotation/docs/ConfigGroup.java      | 36 ++++++++++++++++++
 .../flink/annotation/docs/ConfigGroups.java     | 39 ++++++++++++++++++++
 .../apache/flink/configuration/ConfigGroup.java | 36 ------------------
 .../flink/configuration/ConfigGroups.java       | 39 --------------------
 .../apache/flink/configuration/CoreOptions.java |  2 +
 .../configuration/HighAvailabilityOptions.java  |  2 +
 .../configuration/ResourceManagerOptions.java   |  2 +
 .../flink/configuration/SecurityOptions.java    |  2 +
 flink-docs/pom.xml                              |  5 +++
 .../ConfigOptionsDocGenerator.java              |  4 +-
 .../ConfigOptionsDocGeneratorTest.java          |  4 +-
 11 files changed, 92 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/47ac3684/flink-annotations/src/main/java/org/apache/flink/annotation/docs/ConfigGroup.java
----------------------------------------------------------------------
diff --git a/flink-annotations/src/main/java/org/apache/flink/annotation/docs/ConfigGroup.java b/flink-annotations/src/main/java/org/apache/flink/annotation/docs/ConfigGroup.java
new file mode 100644
index 0000000..2b70bd6
--- /dev/null
+++ b/flink-annotations/src/main/java/org/apache/flink/annotation/docs/ConfigGroup.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.annotation.docs;
+
+import org.apache.flink.annotation.Internal;
+
+import java.lang.annotation.Target;
+
+/**
+ * A class that specifies a group of config options. The name of the group will be used as the basis for the
+ * filename of the generated html file, as defined in {@link ConfigOptionsDocGenerator}.
+ *
+ * @see ConfigGroups
+ */
+@Target({})
+@Internal
+public @interface ConfigGroup {
+	String name();
+	String keyPrefix();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/47ac3684/flink-annotations/src/main/java/org/apache/flink/annotation/docs/ConfigGroups.java
----------------------------------------------------------------------
diff --git a/flink-annotations/src/main/java/org/apache/flink/annotation/docs/ConfigGroups.java b/flink-annotations/src/main/java/org/apache/flink/annotation/docs/ConfigGroups.java
new file mode 100644
index 0000000..53bf856
--- /dev/null
+++ b/flink-annotations/src/main/java/org/apache/flink/annotation/docs/ConfigGroups.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.annotation.docs;
+
+import org.apache.flink.annotation.Internal;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation used on classes containing config optionss that enables the separation of options into different
+ * tables based on key prefixes. A config option is assigned to a {@link ConfigGroup} if the option key matches
+ * the group prefix. If a key matches multiple prefixes the longest matching prefix takes priority. An option is never
+ * assigned to multiple groups. Options that don't match any group are implicitly added to a default group.
+ */
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+@Internal
+public @interface ConfigGroups {
+	ConfigGroup[] groups() default {};
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/47ac3684/flink-core/src/main/java/org/apache/flink/configuration/ConfigGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigGroup.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigGroup.java
deleted file mode 100644
index 3cd1d7f..0000000
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigGroup.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.configuration;
-
-import org.apache.flink.annotation.Internal;
-
-import java.lang.annotation.Target;
-
-/**
- * A class that specifies a group of {@link ConfigOption}. The name of the group will be used as the basis for the
- * filename of the generated html file, as defined in {@link ConfigOptionsDocGenerator}.
- *
- * @see ConfigGroups
- */
-@Target({})
-@Internal
-public @interface ConfigGroup {
-	String name();
-	String keyPrefix();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/47ac3684/flink-core/src/main/java/org/apache/flink/configuration/ConfigGroups.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigGroups.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigGroups.java
deleted file mode 100644
index 94e52ee..0000000
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigGroups.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.configuration;
-
-import org.apache.flink.annotation.Internal;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-/**
- * Annotation used on classes containing {@link ConfigOption}s that enables the separation of options into different
- * tables based on key prefixes. A {@link ConfigOption} is assigned to a {@link ConfigGroup} if the option key matches
- * the group prefix. If a key matches multiple prefixes the longest matching prefix takes priority. An option is never
- * assigned to multiple groups. Options that don't match any group are implicitly added to a default group.
- */
-@Target(ElementType.TYPE)
-@Retention(RetentionPolicy.RUNTIME)
-@Internal
-public @interface ConfigGroups {
-	ConfigGroup[] groups() default {};
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/47ac3684/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index 4f8c074..343e2d2 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -19,6 +19,8 @@
 package org.apache.flink.configuration;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.ConfigGroup;
+import org.apache.flink.annotation.docs.ConfigGroups;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/47ac3684/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
index 8ef605b..f9d4ad9 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
@@ -19,6 +19,8 @@
 package org.apache.flink.configuration;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.ConfigGroup;
+import org.apache.flink.annotation.docs.ConfigGroups;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/47ac3684/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
index dfaf89f..4ce4981 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
@@ -19,6 +19,8 @@
 package org.apache.flink.configuration;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.ConfigGroup;
+import org.apache.flink.annotation.docs.ConfigGroups;
 
 /**
  * The set of configuration options relating to the ResourceManager.

http://git-wip-us.apache.org/repos/asf/flink/blob/47ac3684/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
index 17ff622..0f25c6c 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
@@ -19,6 +19,8 @@
 package org.apache.flink.configuration;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.ConfigGroup;
+import org.apache.flink.annotation.docs.ConfigGroups;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/47ac3684/flink-docs/pom.xml
----------------------------------------------------------------------
diff --git a/flink-docs/pom.xml b/flink-docs/pom.xml
index 33d1a2c..b06e587 100644
--- a/flink-docs/pom.xml
+++ b/flink-docs/pom.xml
@@ -35,6 +35,11 @@ under the License.
 	<dependencies>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-annotations</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-core</artifactId>
 			<version>${project.version}</version>
 		</dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/47ac3684/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
----------------------------------------------------------------------
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
index 93112af..3bec09c 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
@@ -19,9 +19,9 @@
 package org.apache.flink.docs.configuration;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.annotation.docs.ConfigGroup;
+import org.apache.flink.annotation.docs.ConfigGroups;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigGroup;
-import org.apache.flink.configuration.ConfigGroups;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.WebOptions;

http://git-wip-us.apache.org/repos/asf/flink/blob/47ac3684/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocGeneratorTest.java b/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocGeneratorTest.java
index c8969f2..ba30617 100644
--- a/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocGeneratorTest.java
+++ b/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocGeneratorTest.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.docs.configuration;
 
+import org.apache.flink.annotation.docs.ConfigGroup;
+import org.apache.flink.annotation.docs.ConfigGroups;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigGroup;
-import org.apache.flink.configuration.ConfigGroups;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 


[3/7] flink git commit: [hotfix][tests] Remove Kafka testFailOnDeploy test

Posted by ch...@apache.org.
[hotfix][tests] Remove Kafka testFailOnDeploy test


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cead9a97
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cead9a97
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cead9a97

Branch: refs/heads/master
Commit: cead9a97306b9c957008d8fbbab9f60d39ee70ad
Parents: bc9982c
Author: zentol <ch...@apache.org>
Authored: Wed Mar 7 13:38:03 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Fri Apr 6 15:23:01 2018 +0200

----------------------------------------------------------------------
 .../connectors/kafka/Kafka010ITCase.java        |  6 ---
 .../connectors/kafka/Kafka011ITCase.java        |  5 --
 .../connectors/kafka/Kafka08ITCase.java         |  5 --
 .../connectors/kafka/Kafka09ITCase.java         |  5 --
 .../connectors/kafka/KafkaConsumerTestBase.java | 49 --------------------
 5 files changed, 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cead9a97/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
index 06f627d..a038c8e 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
@@ -81,12 +81,6 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
 	public void testCancelingFullTopic() throws Exception {
 		runCancelingOnFullInputTest();
 	}
-
-	@Test(timeout = 60000)
-	public void testFailOnDeploy() throws Exception {
-		runFailOnDeployTest();
-	}
-
 	// --- source to partition mappings and exactly once ---
 
 	@Test(timeout = 60000)

http://git-wip-us.apache.org/repos/asf/flink/blob/cead9a97/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
index fd6eb61..f48f87a 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
@@ -90,11 +90,6 @@ public class Kafka011ITCase extends KafkaConsumerTestBase {
 		runCancelingOnFullInputTest();
 	}
 
-	@Test(timeout = 60000)
-	public void testFailOnDeploy() throws Exception {
-		runFailOnDeployTest();
-	}
-
 	// --- source to partition mappings and exactly once ---
 
 	@Test(timeout = 60000)

http://git-wip-us.apache.org/repos/asf/flink/blob/cead9a97/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index 6abccde..5af219e 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -75,11 +75,6 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 	}
 
 	@Test(timeout = 60000)
-	public void testFailOnDeploy() throws Exception {
-		runFailOnDeployTest();
-	}
-
-	@Test(timeout = 60000)
 	public void testInvalidOffset() throws Exception {
 
 		final int parallelism = 1;

http://git-wip-us.apache.org/repos/asf/flink/blob/cead9a97/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
index 3594854..f022c8e 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
@@ -55,11 +55,6 @@ public class Kafka09ITCase extends KafkaConsumerTestBase {
 		runCancelingOnFullInputTest();
 	}
 
-	@Test(timeout = 60000)
-	public void testFailOnDeploy() throws Exception {
-		runFailOnDeployTest();
-	}
-
 	// --- source to partition mappings and exactly once ---
 
 	@Test(timeout = 60000)

http://git-wip-us.apache.org/repos/asf/flink/blob/cead9a97/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index f07c0bb..f9d745e 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -1113,55 +1113,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	}
 
 	/**
-	 * Tests that the source can be properly canceled when reading full partitions.
-	 */
-	public void runFailOnDeployTest() throws Exception {
-		final String topic = "failOnDeployTopic";
-
-		createTestTopic(topic, 2, 1);
-
-		DeserializationSchema<Integer> schema =
-				new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(12); // needs to be more that the mini cluster has slots
-		env.getConfig().disableSysoutLogging();
-
-		Properties props = new Properties();
-		props.putAll(standardProps);
-		props.putAll(secureProps);
-		FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props);
-
-		env
-				.addSource(kafkaSource)
-				.addSink(new DiscardingSink<Integer>());
-
-		try {
-			env.execute("test fail on deploy");
-			fail("this test should fail with an exception");
-		}
-		catch (JobExecutionException e) {
-
-			// validate that we failed due to a NoResourceAvailableException
-			Throwable cause = e.getCause();
-			int depth = 0;
-			boolean foundResourceException = false;
-
-			while (cause != null && depth++ < 20) {
-				if (cause instanceof NoResourceAvailableException) {
-					foundResourceException = true;
-					break;
-				}
-				cause = cause.getCause();
-			}
-
-			assertTrue("Wrong exception", foundResourceException);
-		}
-
-		deleteTestTopic(topic);
-	}
-
-	/**
 	 * Test producing and consuming into multiple topics.
 	 * @throws Exception
 	 */


[5/7] flink git commit: [FLINK-8835] [taskmanager] Cleanup TaskManager config keys

Posted by ch...@apache.org.
[FLINK-8835] [taskmanager] Cleanup TaskManager config keys

This closes #5808.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8b0f590c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8b0f590c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8b0f590c

Branch: refs/heads/master
Commit: 8b0f590c52d698b3439a2c3524889802c893e985
Parents: cdd2022
Author: zhangminglei <zm...@163.com>
Authored: Wed Apr 4 17:05:22 2018 +0800
Committer: zentol <ch...@apache.org>
Committed: Fri Apr 6 15:23:02 2018 +0200

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    | 20 +++----
 .../flink/configuration/TaskManagerOptions.java | 55 +++++++++++---------
 .../clusterframework/BootstrapTools.java        |  2 +-
 .../runtime/io/network/netty/NettyConfig.java   |  2 +-
 .../taskexecutor/TaskManagerConfiguration.java  | 16 +++---
 .../flink/runtime/taskmanager/TaskManager.scala |  2 +-
 .../TaskManagerRegistrationTest.java            |  8 +--
 .../runtime/io/InputProcessorUtil.java          |  2 +-
 .../jobmanager/JobManagerFailsITCase.scala      |  4 +-
 9 files changed, 59 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8b0f590c/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index b716d9e..f148360 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -299,7 +299,7 @@ public final class ConfigConstants {
 	/**
 	 * Flag indicating whether to start a thread, which repeatedly logs the memory usage of the JVM.
 	 *
-	 * @deprecated use {@link TaskManagerOptions#DEBUG_MEMORY_USAGE_START_LOG_THREAD} instead
+	 * @deprecated use {@link TaskManagerOptions#DEBUG_MEMORY_LOG} instead
 	 */
 	@Deprecated
 	public static final String TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD = "taskmanager.debug.memory.startLogThread";
@@ -316,7 +316,7 @@ public final class ConfigConstants {
 	 * Defines the maximum time it can take for the TaskManager registration. If the duration is
 	 * exceeded without a successful registration, then the TaskManager terminates.
 	 *
-	 * @deprecated use {@link TaskManagerOptions#MAX_REGISTRATION_DURATION} instead
+	 * @deprecated use {@link TaskManagerOptions#REGISTRATION_TIMEOUT} instead
 	 */
 	@Deprecated
 	public static final String TASK_MANAGER_MAX_REGISTRATION_DURATION = "taskmanager.maxRegistrationDuration";
@@ -325,7 +325,7 @@ public final class ConfigConstants {
 	 * The initial registration pause between two consecutive registration attempts. The pause
 	 * is doubled for each new registration attempt until it reaches the maximum registration pause.
 	 *
-	 * @deprecated use {@link TaskManagerOptions#INITIAL_REGISTRATION_PAUSE} instead
+	 * @deprecated use {@link TaskManagerOptions#INITIAL_REGISTRATION_BACKOFF} instead
 	 */
 	@Deprecated
 	public static final String TASK_MANAGER_INITIAL_REGISTRATION_PAUSE = "taskmanager.initial-registration-pause";
@@ -333,7 +333,7 @@ public final class ConfigConstants {
 	/**
 	 * The maximum registration pause between two consecutive registration attempts.
 	 *
-	 * @deprecated use {@link TaskManagerOptions#MAX_REGISTRATION_PAUSE} instead
+	 * @deprecated use {@link TaskManagerOptions#REGISTRATION_MAX_BACKOFF} instead
 	 */
 	@Deprecated
 	public static final String TASK_MANAGER_MAX_REGISTARTION_PAUSE = "taskmanager.max-registration-pause";
@@ -341,7 +341,7 @@ public final class ConfigConstants {
 	/**
 	 * The pause after a registration has been refused by the job manager before retrying to connect.
 	 *
-	 * @deprecated use {@link TaskManagerOptions#REFUSED_REGISTRATION_PAUSE} instead
+	 * @deprecated use {@link TaskManagerOptions#REFUSED_REGISTRATION_BACKOFF} instead
 	 */
 	@Deprecated
 	public static final String TASK_MANAGER_REFUSED_REGISTRATION_PAUSE = "taskmanager.refused-registration-pause";
@@ -1441,7 +1441,7 @@ public final class ConfigConstants {
 	/**
 	 * Flag indicating whether to start a thread, which repeatedly logs the memory usage of the JVM.
 	 *
-	 * @deprecated use {@link TaskManagerOptions#DEBUG_MEMORY_USAGE_START_LOG_THREAD} instead
+	 * @deprecated use {@link TaskManagerOptions#DEBUG_MEMORY_LOG} instead
 	 */
 	@Deprecated
 	public static final boolean DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD = false;
@@ -1457,7 +1457,7 @@ public final class ConfigConstants {
 	/**
 	 * The default task manager's maximum registration duration.
 	 *
-	 * @deprecated use {@link TaskManagerOptions#MAX_REGISTRATION_DURATION} instead
+	 * @deprecated use {@link TaskManagerOptions#REGISTRATION_TIMEOUT} instead
 	 */
 	@Deprecated
 	public static final String DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION = "Inf";
@@ -1465,7 +1465,7 @@ public final class ConfigConstants {
 	/**
 	 * The default task manager's initial registration pause.
 	 *
-	 * @deprecated use {@link TaskManagerOptions#INITIAL_REGISTRATION_PAUSE} instead
+	 * @deprecated use {@link TaskManagerOptions#INITIAL_REGISTRATION_BACKOFF} instead
 	 */
 	@Deprecated
 	public static final String DEFAULT_TASK_MANAGER_INITIAL_REGISTRATION_PAUSE = "500 ms";
@@ -1473,7 +1473,7 @@ public final class ConfigConstants {
 	/**
 	 * The default task manager's maximum registration pause.
 	 *
-	 * @deprecated use {@link TaskManagerOptions#MAX_REGISTRATION_PAUSE} instead
+	 * @deprecated use {@link TaskManagerOptions#REGISTRATION_MAX_BACKOFF} instead
 	 */
 	@Deprecated
 	public static final String DEFAULT_TASK_MANAGER_MAX_REGISTRATION_PAUSE = "30 s";
@@ -1481,7 +1481,7 @@ public final class ConfigConstants {
 	/**
 	 * The default task manager's refused registration pause.
 	 *
-	 * @deprecated use {@link TaskManagerOptions#REFUSED_REGISTRATION_PAUSE} instead
+	 * @deprecated use {@link TaskManagerOptions#REFUSED_REGISTRATION_BACKOFF} instead
 	 */
 	@Deprecated
 	public static final String DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE = "10 s";

http://git-wip-us.apache.org/repos/asf/flink/blob/8b0f590c/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index c7b0782..2bd3091 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -104,40 +104,44 @@ public class TaskManagerOptions {
 				" global ssl flag " + SecurityOptions.SSL_ENABLED.key() + " is set to true");
 
 	/**
-	 * The initial registration pause between two consecutive registration attempts. The pause
-	 * is doubled for each new registration attempt until it reaches the maximum registration pause.
+	 * The initial registration backoff between two consecutive registration attempts. The backoff
+	 * is doubled for each new registration attempt until it reaches the maximum registration backoff.
 	 */
-	public static final ConfigOption<String> INITIAL_REGISTRATION_PAUSE =
-		key("taskmanager.initial-registration-pause")
+	public static final ConfigOption<String> INITIAL_REGISTRATION_BACKOFF =
+		key("taskmanager.registration.initial-backoff")
 			.defaultValue("500 ms")
-			.withDescription("The initial registration pause between two consecutive registration attempts. The pause" +
-				" is doubled for each new registration attempt until it reaches the maximum registration pause.");
+			.withDeprecatedKeys("taskmanager.initial-registration-pause")
+			.withDescription("The initial registration backoff between two consecutive registration attempts. The backoff" +
+				" is doubled for each new registration attempt until it reaches the maximum registration backoff.");
 
 	/**
-	 * The maximum registration pause between two consecutive registration attempts.
+	 * The maximum registration backoff between two consecutive registration attempts.
 	 */
-	public static final ConfigOption<String> MAX_REGISTRATION_PAUSE =
-		key("taskmanager.max-registration-pause")
+	public static final ConfigOption<String> REGISTRATION_MAX_BACKOFF =
+		key("taskmanager.registration.max-backoff")
 			.defaultValue("30 s")
-			.withDescription("The maximum registration pause between two consecutive registration attempts. The max" +
-				" registration pause requires a time unit specifier (ms/s/min/h/d).");
+			.withDeprecatedKeys("taskmanager.max-registration-pause")
+			.withDescription("The maximum registration backoff between two consecutive registration attempts. The max" +
+				" registration backoff requires a time unit specifier (ms/s/min/h/d).");
 
 	/**
-	 * The pause after a registration has been refused by the job manager before retrying to connect.
+	 * The backoff after a registration has been refused by the job manager before retrying to connect.
 	 */
-	public static final ConfigOption<String> REFUSED_REGISTRATION_PAUSE =
-		key("taskmanager.refused-registration-pause")
+	public static final ConfigOption<String> REFUSED_REGISTRATION_BACKOFF =
+		key("taskmanager.registration.refused-backoff")
 			.defaultValue("10 s")
-			.withDescription("The pause after a registration has been refused by the job manager before retrying to connect.");
+			.withDeprecatedKeys("taskmanager.refused-registration-pause")
+			.withDescription("The backoff after a registration has been refused by the job manager before retrying to connect.");
 
 	/**
-	 * Defines the maximum time it can take for the TaskManager registration. If the duration is
+	 * Defines the timeout it can take for the TaskManager registration. If the duration is
 	 * exceeded without a successful registration, then the TaskManager terminates.
 	 */
-	public static final ConfigOption<String> MAX_REGISTRATION_DURATION =
-		key("taskmanager.maxRegistrationDuration")
+	public static final ConfigOption<String> REGISTRATION_TIMEOUT =
+		key("taskmanager.registration.timeout")
 			.defaultValue("Inf")
-			.withDescription("Defines the maximum time it can take for the TaskManager registration. If the duration is" +
+			.withDeprecatedKeys("taskmanager.maxRegistrationDuration")
+			.withDescription("Defines the timeout for the TaskManager registration. If the duration is" +
 				" exceeded without a successful registration, then the TaskManager terminates.");
 
 	/**
@@ -153,14 +157,16 @@ public class TaskManagerOptions {
 				" is typically proportional to the number of physical CPU cores that the TaskManager's machine has" +
 				" (e.g., equal to the number of cores, or half the number of cores).");
 
-	public static final ConfigOption<Boolean> DEBUG_MEMORY_USAGE_START_LOG_THREAD =
-		key("taskmanager.debug.memory.startLogThread")
+	public static final ConfigOption<Boolean> DEBUG_MEMORY_LOG =
+		key("taskmanager.debug.memory.log")
 			.defaultValue(false)
+			.withDeprecatedKeys("taskmanager.debug.memory.startLogThread")
 			.withDescription("Flag indicating whether to start a thread, which repeatedly logs the memory usage of the JVM.");
 
 	public static final ConfigOption<Long> DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS =
-		key("taskmanager.debug.memory.logIntervalMs")
+		key("taskmanager.debug.memory.log-interval")
 			.defaultValue(5000L)
+			.withDeprecatedKeys("taskmanager.debug.memory.logIntervalMs")
 			.withDescription("The interval (in ms) for the log thread to log the current memory usage.");
 
 	// ------------------------------------------------------------------------
@@ -321,9 +327,10 @@ public class TaskManagerOptions {
 	 * credit-based flow control.
 	 */
 	@Deprecated
-	public static final ConfigOption<Boolean> NETWORK_CREDIT_BASED_FLOW_CONTROL_ENABLED =
-			key("taskmanager.network.credit-based-flow-control.enabled")
+	public static final ConfigOption<Boolean> NETWORK_CREDIT_MODEL =
+			key("taskmanager.network.credit-model")
 			.defaultValue(true)
+			.withDeprecatedKeys("taskmanager.network.credit-based-flow-control.enabled")
 			.withDescription("Boolean flag to enable/disable network credit-based flow control.");
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/8b0f590c/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index eab7382..102274d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -245,7 +245,7 @@ public class BootstrapTools {
 			cfg.setInteger(JobManagerOptions.PORT, jobManagerPort);
 		}
 
-		cfg.setString(TaskManagerOptions.MAX_REGISTRATION_DURATION, registrationTimeout.toString());
+		cfg.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, registrationTimeout.toString());
 		if (numSlots != -1){
 			cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b0f590c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
index 8572361..18527c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
@@ -225,7 +225,7 @@ public class NettyConfig {
 	}
 
 	public boolean isCreditBasedEnabled() {
-		return config.getBoolean(TaskManagerOptions.NETWORK_CREDIT_BASED_FLOW_CONTROL_ENABLED);
+		return config.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8b0f590c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
index cb6fe51..1bf42ee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -185,7 +185,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 		final Time finiteRegistrationDuration;
 
 		try {
-			Duration maxRegistrationDuration = Duration.create(configuration.getString(TaskManagerOptions.MAX_REGISTRATION_DURATION));
+			Duration maxRegistrationDuration = Duration.create(configuration.getString(TaskManagerOptions.REGISTRATION_TIMEOUT));
 			if (maxRegistrationDuration.isFinite()) {
 				finiteRegistrationDuration = Time.milliseconds(maxRegistrationDuration.toMillis());
 			} else {
@@ -193,12 +193,12 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 			}
 		} catch (NumberFormatException e) {
 			throw new IllegalArgumentException("Invalid format for parameter " +
-				TaskManagerOptions.MAX_REGISTRATION_DURATION.key(), e);
+				TaskManagerOptions.REGISTRATION_TIMEOUT.key(), e);
 		}
 
 		final Time initialRegistrationPause;
 		try {
-			Duration pause = Duration.create(configuration.getString(TaskManagerOptions.INITIAL_REGISTRATION_PAUSE));
+			Duration pause = Duration.create(configuration.getString(TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF));
 			if (pause.isFinite()) {
 				initialRegistrationPause = Time.milliseconds(pause.toMillis());
 			} else {
@@ -206,13 +206,13 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 			}
 		} catch (NumberFormatException e) {
 			throw new IllegalArgumentException("Invalid format for parameter " +
-				TaskManagerOptions.INITIAL_REGISTRATION_PAUSE.key(), e);
+				TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF.key(), e);
 		}
 
 		final Time maxRegistrationPause;
 		try {
 			Duration pause = Duration.create(configuration.getString(
-				TaskManagerOptions.MAX_REGISTRATION_PAUSE));
+				TaskManagerOptions.REGISTRATION_MAX_BACKOFF));
 			if (pause.isFinite()) {
 				maxRegistrationPause = Time.milliseconds(pause.toMillis());
 			} else {
@@ -220,12 +220,12 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 			}
 		} catch (NumberFormatException e) {
 			throw new IllegalArgumentException("Invalid format for parameter " +
-				TaskManagerOptions.INITIAL_REGISTRATION_PAUSE.key(), e);
+				TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF.key(), e);
 		}
 
 		final Time refusedRegistrationPause;
 		try {
-			Duration pause = Duration.create(configuration.getString(TaskManagerOptions.REFUSED_REGISTRATION_PAUSE));
+			Duration pause = Duration.create(configuration.getString(TaskManagerOptions.REFUSED_REGISTRATION_BACKOFF));
 			if (pause.isFinite()) {
 				refusedRegistrationPause = Time.milliseconds(pause.toMillis());
 			} else {
@@ -233,7 +233,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 			}
 		} catch (NumberFormatException e) {
 			throw new IllegalArgumentException("Invalid format for parameter " +
-				TaskManagerOptions.INITIAL_REGISTRATION_PAUSE.key(), e);
+				TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF.key(), e);
 		}
 
 		final boolean exitOnOom = configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY);

http://git-wip-us.apache.org/repos/asf/flink/blob/8b0f590c/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 0aaeae3..071a333 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1884,7 +1884,7 @@ object TaskManager {
       // if desired, start the logging daemon that periodically logs the
       // memory usage information
       if (LOG.isInfoEnabled && configuration.getBoolean(
-        TaskManagerOptions.DEBUG_MEMORY_USAGE_START_LOG_THREAD))
+        TaskManagerOptions.DEBUG_MEMORY_LOG))
       {
         LOG.info("Starting periodic memory usage logger")
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8b0f590c/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index 6b65095..ad32a4f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -267,7 +267,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
 			try {
 				// registration timeout of 1 second
 				Configuration tmConfig = new Configuration();
-				tmConfig.setString(TaskManagerOptions.MAX_REGISTRATION_DURATION, "500 ms");
+				tmConfig.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, "500 ms");
 
 				highAvailabilityServices.setJobMasterLeaderRetriever(
 					HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -325,7 +325,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
 
 				FiniteDuration refusedRegistrationPause = new FiniteDuration(500, TimeUnit.MILLISECONDS);
 				Configuration tmConfig = new Configuration(config);
-				tmConfig.setString(TaskManagerOptions.REFUSED_REGISTRATION_PAUSE, refusedRegistrationPause.toString());
+				tmConfig.setString(TaskManagerOptions.REFUSED_REGISTRATION_BACKOFF, refusedRegistrationPause.toString());
 
 				highAvailabilityServices.setJobMasterLeaderRetriever(
 					HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -407,8 +407,8 @@ public class TaskManagerRegistrationTest extends TestLogger {
 				long maxDelay = 30000;
 
 				Configuration tmConfig = new Configuration(config);
-				tmConfig.setString(TaskManagerOptions.REFUSED_REGISTRATION_PAUSE, refusedRegistrationPause + " ms");
-				tmConfig.setString(TaskManagerOptions.INITIAL_REGISTRATION_PAUSE, initialRegistrationPause + " ms");
+				tmConfig.setString(TaskManagerOptions.REFUSED_REGISTRATION_BACKOFF, refusedRegistrationPause + " ms");
+				tmConfig.setString(TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF, initialRegistrationPause + " ms");
 
 				// we make the test actor (the test kit) the JobManager to intercept
 				// the messages

http://git-wip-us.apache.org/repos/asf/flink/blob/8b0f590c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
index 1ae34b3..d1c5b72 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
@@ -51,7 +51,7 @@ public class InputProcessorUtil {
 					+ " must be positive or -1 (infinite)");
 			}
 
-			if (taskManagerConfig.getBoolean(TaskManagerOptions.NETWORK_CREDIT_BASED_FLOW_CONTROL_ENABLED)) {
+			if (taskManagerConfig.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL)) {
 				barrierHandler = new BarrierBuffer(inputGate, new CachedBufferBlocker(inputGate.getPageSize()), maxAlign);
 			} else {
 				barrierHandler = new BarrierBuffer(inputGate, new BufferSpiller(ioManager, inputGate.getPageSize()), maxAlign);

http://git-wip-us.apache.org/repos/asf/flink/blob/8b0f590c/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
index 1a3419b..5fe7b1d 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
@@ -136,8 +136,8 @@ class JobManagerFailsITCase(_system: ActorSystem)
     config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
     config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskmanagers)
     config.setInteger(JobManagerOptions.PORT, 0)
-    config.setString(TaskManagerOptions.INITIAL_REGISTRATION_PAUSE, "50 ms")
-    config.setString(TaskManagerOptions.MAX_REGISTRATION_PAUSE, "100 ms")
+    config.setString(TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF, "50 ms")
+    config.setString(TaskManagerOptions.REGISTRATION_MAX_BACKOFF, "100 ms")
 
     val cluster = new TestingCluster(config, singleActorSystem = false)
 


[7/7] flink git commit: [hotfix][docs] Update maven version in building flink docs

Posted by ch...@apache.org.
[hotfix][docs] Update maven version in building flink docs

This closes #5760.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/022cd628
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/022cd628
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/022cd628

Branch: refs/heads/master
Commit: 022cd628cea67ce8eba1d56987301d20f192f480
Parents: 47ac368
Author: Bowen Li <bl...@Bowens-MacBook-Pro.local>
Authored: Sat Mar 24 23:11:16 2018 +0800
Committer: zentol <ch...@apache.org>
Committed: Fri Apr 6 15:23:02 2018 +0200

----------------------------------------------------------------------
 README.md              | 2 +-
 docs/start/building.md | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/022cd628/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 1e00e65..095f652a 100644
--- a/README.md
+++ b/README.md
@@ -69,7 +69,7 @@ Prerequisites for building Flink:
 
 * Unix-like environment (We use Linux, Mac OS X, Cygwin)
 * git
-* Maven (we recommend version 3.0.4)
+* Maven (we recommend version 3.2.5)
 * Java 8
 
 ```

http://git-wip-us.apache.org/repos/asf/flink/blob/022cd628/docs/start/building.md
----------------------------------------------------------------------
diff --git a/docs/start/building.md b/docs/start/building.md
index cbf1d1f..568c4b8 100644
--- a/docs/start/building.md
+++ b/docs/start/building.md
@@ -33,7 +33,7 @@ In order to build Flink you need the source code. Either [download the source of
 
 In addition you need **Maven 3** and a **JDK** (Java Development Kit). Flink requires **at least Java 8** to build.
 
-*NOTE: Maven 3.3.x can build Flink, but will not properly shade away certain dependencies. Maven 3.0.3 creates the libraries properly.
+*NOTE: Maven 3.3.x can build Flink, but will not properly shade away certain dependencies. Maven 3.2.5 creates the libraries properly.
 To build unit tests use Java 8u51 or above to prevent failures in unit tests that use the PowerMock runner.*
 
 To clone from git, enter:


[2/7] flink git commit: [hotfix][tests] Remove unused methods in KafkaConsumerTestBase

Posted by ch...@apache.org.
[hotfix][tests] Remove unused methods in KafkaConsumerTestBase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4c337dc9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4c337dc9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4c337dc9

Branch: refs/heads/master
Commit: 4c337dc9a89dfb549a3c2cb62893d74129582143
Parents: cead9a9
Author: zentol <ch...@apache.org>
Authored: Tue Apr 3 11:42:43 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri Apr 6 15:23:01 2018 +0200

----------------------------------------------------------------------
 .../connectors/kafka/KafkaConsumerTestBase.java | 26 --------------------
 1 file changed, 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4c337dc9/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index f9d745e..959d6f1 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -2121,32 +2121,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		return result;
 	}
 
-	private static void printTopic(String topicName, ConsumerConfig config,
-								DeserializationSchema<?> deserializationSchema,
-								int stopAfter) throws IOException {
-
-		List<MessageAndMetadata<byte[], byte[]>> contents = readTopicToList(topicName, config, stopAfter);
-		LOG.info("Printing contents of topic {} in consumer grouo {}", topicName, config.groupId());
-
-		for (MessageAndMetadata<byte[], byte[]> message: contents) {
-			Object out = deserializationSchema.deserialize(message.message());
-			LOG.info("Message: partition: {} offset: {} msg: {}", message.partition(), message.offset(), out.toString());
-		}
-	}
-
-	private static void printTopic(String topicName, int elements, DeserializationSchema<?> deserializer)
-			throws IOException {
-		// write the sequence to log for debugging purposes
-		Properties newProps = new Properties(standardProps);
-		newProps.setProperty("group.id", "topic-printer" + UUID.randomUUID().toString());
-		newProps.setProperty("auto.offset.reset", "smallest");
-		newProps.setProperty("zookeeper.connect", standardProps.getProperty("zookeeper.connect"));
-		newProps.putAll(secureProps);
-
-		ConsumerConfig printerConfig = new ConsumerConfig(newProps);
-		printTopic(topicName, printerConfig, deserializer, elements);
-	}
-
 	private static class BrokerKillingMapper<T> extends RichMapFunction<T, T>
 			implements ListCheckpointed<Integer>, CheckpointListener {