You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/25 07:32:15 UTC

[01/10] flink git commit: [FLINK-9240][tests] Harden WebFrontendITCase#testStopYarn()

Repository: flink
Updated Branches:
  refs/heads/master 0321eb35e -> c50b573e9


[FLINK-9240][tests] Harden WebFrontendITCase#testStopYarn()

This closes #5886.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/26eb8bb2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/26eb8bb2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/26eb8bb2

Branch: refs/heads/master
Commit: 26eb8bb29989b9e65dbd0afe9c107c285b98cf87
Parents: 0321eb3
Author: sihuazhou <su...@163.com>
Authored: Sat Apr 21 13:34:21 2018 +0800
Committer: zentol <ch...@apache.org>
Committed: Tue Apr 24 13:24:18 2018 +0200

----------------------------------------------------------------------
 .../runtime/webmonitor/WebFrontendITCase.java   | 34 +++++++++++---------
 1 file changed, 19 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/26eb8bb2/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
index f512766..994966e 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
@@ -317,6 +317,8 @@ public class WebFrontendITCase extends TestLogger {
 				"\"execution-config\":{\"execution-mode\":\"PIPELINED\",\"restart-strategy\":\"default\"," +
 				"\"job-parallelism\":-1,\"object-reuse-mode\":false,\"user-config\":{}}}", response.getContent());
 		}
+
+		BlockingInvokable.reset();
 	}
 
 	@Test
@@ -347,25 +349,27 @@ public class WebFrontendITCase extends TestLogger {
 		final FiniteDuration testTimeout = new FiniteDuration(2, TimeUnit.MINUTES);
 		final Deadline deadline = testTimeout.fromNow();
 
-		while (!getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
-			try (HttpTestClient client = new HttpTestClient("localhost", CLUSTER.getWebUIPort())) {
-				// Request the file from the web server
-				client.sendGetRequest("/jobs/" + jid + "/yarn-stop", deadline.timeLeft());
-
-				HttpTestClient.SimpleHttpResponse response = client
-					.getNextResponse(deadline.timeLeft());
-
-				if (Objects.equals(MiniClusterResource.NEW_CODEBASE, System.getProperty(MiniClusterResource.CODEBASE_KEY))) {
-					assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
-				} else {
-					assertEquals(HttpResponseStatus.OK, response.getStatus());
-				}
-				assertEquals("application/json; charset=UTF-8", response.getType());
-				assertEquals("{}", response.getContent());
+		try (HttpTestClient client = new HttpTestClient("localhost", CLUSTER.getWebUIPort())) {
+			// Request the file from the web server
+			client.sendGetRequest("/jobs/" + jid + "/yarn-stop", deadline.timeLeft());
+
+			HttpTestClient.SimpleHttpResponse response = client
+				.getNextResponse(deadline.timeLeft());
+
+			if (Objects.equals(MiniClusterResource.NEW_CODEBASE, System.getProperty(MiniClusterResource.CODEBASE_KEY))) {
+				assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
+			} else {
+				assertEquals(HttpResponseStatus.OK, response.getStatus());
 			}
+			assertEquals("application/json; charset=UTF-8", response.getType());
+			assertEquals("{}", response.getContent());
+		}
 
+		// wait for cancellation to finish
+		while (!getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
 			Thread.sleep(20);
 		}
+
 		BlockingInvokable.reset();
 	}
 


[08/10] flink git commit: [FLINK-9100][core] Hide configured value for "password"/"secret"

Posted by ch...@apache.org.
[FLINK-9100][core] Hide configured value for "password"/"secret"

This closes #9100.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1a9675d5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1a9675d5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1a9675d5

Branch: refs/heads/master
Commit: 1a9675d54fda7c6d7c519935dde05f47eb449401
Parents: aa4bdc0
Author: sihuazhou <su...@163.com>
Authored: Tue Apr 24 13:39:11 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Apr 24 13:41:43 2018 +0200

----------------------------------------------------------------------
 .../configuration/GlobalConfiguration.java      | 24 +++++++++++++++++++-
 .../configuration/GlobalConfigurationTest.java  | 10 ++++++++
 2 files changed, 33 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1a9675d5/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
index 2f2a9cf..fd7d441 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
@@ -19,6 +19,7 @@
 package org.apache.flink.configuration;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,6 +43,11 @@ public final class GlobalConfiguration {
 
 	public static final String FLINK_CONF_FILENAME = "flink-conf.yaml";
 
+	// the keys whose values should be hidden
+	private static final String[] SENSITIVE_KEYS = new String[] {"password", "secret"};
+
+	// the hidden content to be displayed
+	public static final String HIDDEN_CONTENT = "******";
 
 	// --------------------------------------------------------------------------------------------
 
@@ -183,7 +189,7 @@ public final class GlobalConfiguration {
 						continue;
 					}
 
-					LOG.info("Loading configuration property: {}, {}", key, value);
+					LOG.info("Loading configuration property: {}, {}", key, isSensitive(key) ? HIDDEN_CONTENT : value);
 					config.setString(key, value);
 				}
 			}
@@ -194,4 +200,20 @@ public final class GlobalConfiguration {
 		return config;
 	}
 
+	/**
+	 * Check whether the key is a hidden key.
+	 *
+	 * @param key the config key
+	 */
+	public static boolean isSensitive(String key) {
+		Preconditions.checkNotNull(key, "key is null");
+		final String keyInLower = key.toLowerCase();
+		for (String hideKey : SENSITIVE_KEYS) {
+			if (keyInLower.length() >= hideKey.length()
+				&& keyInLower.contains(hideKey)) {
+				return true;
+			}
+		}
+		return false;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1a9675d5/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
index c5d2e62..a42564d 100644
--- a/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
@@ -31,7 +31,9 @@ import java.io.PrintWriter;
 import java.util.UUID;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 /**
  * This class contains tests for the global configuration (parsing configuration directory information).
@@ -120,4 +122,12 @@ public class GlobalConfigurationTest extends TestLogger {
 		assertNotNull(GlobalConfiguration.loadConfiguration(tempFolder.getRoot().getAbsolutePath()));
 	}
 
+	@Test
+	public void testHiddenKey() {
+		assertTrue(GlobalConfiguration.isSensitive("password123"));
+		assertTrue(GlobalConfiguration.isSensitive("123pasSword"));
+		assertTrue(GlobalConfiguration.isSensitive("PasSword"));
+		assertTrue(GlobalConfiguration.isSensitive("Secret"));
+		assertFalse(GlobalConfiguration.isSensitive("Hello"));
+	}
 }


[04/10] flink git commit: [hotfix][tests] Enable NetworkStackThroughputITCase

Posted by ch...@apache.org.
[hotfix][tests] Enable NetworkStackThroughputITCase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f633a80b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f633a80b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f633a80b

Branch: refs/heads/master
Commit: f633a80bb22011ca1e7a0e4fc7a754d79b213e00
Parents: 598cb66
Author: zentol <ch...@apache.org>
Authored: Thu Apr 19 10:28:44 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Apr 24 13:30:07 2018 +0200

----------------------------------------------------------------------
 .../test/runtime/NetworkStackThroughputITCase.java    | 14 +++++---------
 1 file changed, 5 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f633a80b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
index 1b46f46..b5c233a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
@@ -36,7 +36,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.Ignore;
+import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,7 +46,6 @@ import java.util.concurrent.TimeUnit;
 /**
  * Manually test the throughput of the network stack.
  */
-@Ignore
 public class NetworkStackThroughputITCase extends TestLogger {
 
 	private static final Logger LOG = LoggerFactory.getLogger(NetworkStackThroughputITCase.class);
@@ -84,7 +83,7 @@ public class NetworkStackThroughputITCase extends TestLogger {
 				// Determine the amount of data to send per subtask
 				int dataVolumeGb = getTaskConfiguration().getInteger(NetworkStackThroughputITCase.DATA_VOLUME_GB_CONFIG_KEY, 1);
 
-				long dataMbPerSubtask = (dataVolumeGb * 1024) / getCurrentNumberOfSubtasks();
+				long dataMbPerSubtask = (dataVolumeGb * 10) / getCurrentNumberOfSubtasks();
 				long numRecordsToEmit = (dataMbPerSubtask * 1024 * 1024) / SpeedTestRecord.RECORD_SIZE;
 
 				LOG.info(String.format("%d/%d: Producing %d records (each record: %d bytes, total: %.2f GB)",
@@ -209,6 +208,7 @@ public class NetworkStackThroughputITCase extends TestLogger {
 
 	// ------------------------------------------------------------------------
 
+	@Test
 	public void testThroughput() throws Exception {
 		Object[][] configParams = new Object[][]{
 				new Object[]{1, false, false, false, 4, 2},
@@ -335,13 +335,9 @@ public class NetworkStackThroughputITCase extends TestLogger {
 		return jobGraph;
 	}
 
-	private void runAllTests() throws Exception {
-		testThroughput();
+	public static void main(String[] args) throws Exception {
+		new NetworkStackThroughputITCase().testThroughput();
 
 		System.out.println("Done.");
 	}
-
-	public static void main(String[] args) throws Exception {
-		new NetworkStackThroughputITCase().runAllTests();
-	}
 }


[07/10] flink git commit: [FLINK-8793][REST] Hide configured value for "password"/"secret"

Posted by ch...@apache.org.
[FLINK-8793][REST] Hide configured value for "password"/"secret"


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aa4bdc03
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aa4bdc03
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aa4bdc03

Branch: refs/heads/master
Commit: aa4bdc037caaa2b516864007b90a7271e998702a
Parents: 512083a
Author: sihuazhou <su...@163.com>
Authored: Tue Apr 24 13:39:22 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Apr 24 13:41:31 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/rest/handler/legacy/ClusterConfigHandler.java | 5 +++--
 .../flink/runtime/rest/messages/ClusterConfigurationInfo.java   | 5 +++--
 2 files changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/aa4bdc03/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterConfigHandler.java
index 76221b5..ff2f04e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterConfigHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterConfigHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.rest.handler.legacy;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoEntry;
 import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoHeaders;
@@ -74,8 +75,8 @@ public class ClusterConfigHandler extends AbstractJsonRequestHandler {
 
 				String value = config.getString(key, null);
 				// Mask key values which contain sensitive information
-				if (value != null && key.toLowerCase().contains("password")) {
-					value = "******";
+				if (value != null && GlobalConfiguration.isSensitive(key)) {
+					value = GlobalConfiguration.HIDDEN_CONTENT;
 				}
 				gen.writeStringField(ClusterConfigurationInfoEntry.FIELD_NAME_CONFIG_VALUE, value);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/aa4bdc03/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfo.java
index 627dc4c..06e6765 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfo.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.rest.messages;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler;
 
 import java.util.ArrayList;
@@ -45,8 +46,8 @@ public class ClusterConfigurationInfo extends ArrayList<ClusterConfigurationInfo
 			String value = config.getString(key, null);
 
 			// Mask key values which contain sensitive information
-			if (value != null && key.toLowerCase().contains("password")) {
-				value = "******";
+			if (value != null && GlobalConfiguration.isSensitive(key)) {
+				value = GlobalConfiguration.HIDDEN_CONTENT;
 			}
 
 			clusterConfig.add(new ClusterConfigurationInfoEntry(key, value));


[10/10] flink git commit: [FLINK-8704][tests] Port PartialConsumerPipelinedResultTest

Posted by ch...@apache.org.
[FLINK-8704][tests] Port PartialConsumerPipelinedResultTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c50b573e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c50b573e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c50b573e

Branch: refs/heads/master
Commit: c50b573e9712786da22bd2863a51c26a005e0d15
Parents: 73e9f90
Author: zentol <ch...@apache.org>
Authored: Mon Mar 12 13:09:44 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Tue Apr 24 13:47:56 2018 +0200

----------------------------------------------------------------------
 ...LegacyPartialConsumePipelinedResultTest.java | 150 +++++++++++++++++++
 .../PartialConsumePipelinedResultTest.java      |  33 ++--
 2 files changed, 171 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c50b573e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/LegacyPartialConsumePipelinedResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/LegacyPartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/LegacyPartialConsumePipelinedResultTest.java
new file mode 100644
index 0000000..aecab75
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/LegacyPartialConsumePipelinedResultTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class LegacyPartialConsumePipelinedResultTest extends TestLogger {
+
+	// Test configuration
+	private final static int NUMBER_OF_TMS = 1;
+	private final static int NUMBER_OF_SLOTS_PER_TM = 1;
+	private final static int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
+
+	private final static int NUMBER_OF_NETWORK_BUFFERS = 128;
+
+	private static TestingCluster flink;
+
+	@BeforeClass
+	public static void setUp() throws Exception {
+		final Configuration config = new Configuration();
+		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUMBER_OF_TMS);
+		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUMBER_OF_SLOTS_PER_TM);
+		config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
+		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, NUMBER_OF_NETWORK_BUFFERS);
+
+		flink = new TestingCluster(config, true);
+
+		flink.start();
+	}
+
+	@AfterClass
+	public static void tearDown() throws Exception {
+		flink.stop();
+	}
+
+	/**
+	 * Tests a fix for FLINK-1930.
+	 *
+	 * <p> When consuming a pipelined result only partially, is is possible that local channels
+	 * release the buffer pool, which is associated with the result partition, too early.  If the
+	 * producer is still producing data when this happens, it runs into an IllegalStateException,
+	 * because of the destroyed buffer pool.
+	 *
+	 * @see <a href="https://issues.apache.org/jira/browse/FLINK-1930">FLINK-1930</a>
+	 */
+	@Test
+	public void testPartialConsumePipelinedResultReceiver() throws Exception {
+		final JobVertex sender = new JobVertex("Sender");
+		sender.setInvokableClass(SlowBufferSender.class);
+		sender.setParallelism(PARALLELISM);
+
+		final JobVertex receiver = new JobVertex("Receiver");
+		receiver.setInvokableClass(SingleBufferReceiver.class);
+		receiver.setParallelism(PARALLELISM);
+
+		// The partition needs to be pipelined, otherwise the original issue does not occur, because
+		// the sender and receiver are not online at the same time.
+		receiver.connectNewDataSetAsInput(
+			sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+
+		final JobGraph jobGraph = new JobGraph("Partial Consume of Pipelined Result", sender, receiver);
+
+		final SlotSharingGroup slotSharingGroup = new SlotSharingGroup(
+			sender.getID(), receiver.getID());
+
+		sender.setSlotSharingGroup(slotSharingGroup);
+		receiver.setSlotSharingGroup(slotSharingGroup);
+
+		flink.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION());
+	}
+
+	// ---------------------------------------------------------------------------------------------
+
+	/**
+	 * Sends a fixed number of buffers and sleeps in-between sends.
+	 */
+	public static class SlowBufferSender extends AbstractInvokable {
+
+		public SlowBufferSender(Environment environment) {
+			super(environment);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			final ResultPartitionWriter writer = getEnvironment().getWriter(0);
+
+			for (int i = 0; i < 8; i++) {
+				final BufferBuilder bufferBuilder = writer.getBufferProvider().requestBufferBuilderBlocking();
+				writer.addBufferConsumer(bufferBuilder.createBufferConsumer(), 0);
+				Thread.sleep(50);
+				bufferBuilder.finish();
+			}
+		}
+	}
+
+	/**
+	 * Reads a single buffer and recycles it.
+	 */
+	public static class SingleBufferReceiver extends AbstractInvokable {
+
+		public SingleBufferReceiver(Environment environment) {
+			super(environment);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			InputGate gate = getEnvironment().getInputGate(0);
+			Buffer buffer = gate.getNextBufferOrEvent().orElseThrow(IllegalStateException::new).getBuffer();
+			if (buffer != null) {
+				buffer.recycleBuffer();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c50b573e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index 8080ca4..7169796 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.execution.Environment;
@@ -32,41 +31,51 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.testutils.category.New;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
+@Category(New.class)
 public class PartialConsumePipelinedResultTest extends TestLogger {
 
 	// Test configuration
-	private final static int NUMBER_OF_TMS = 1;
-	private final static int NUMBER_OF_SLOTS_PER_TM = 1;
-	private final static int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
+	private static final int NUMBER_OF_TMS = 1;
+	private static final int NUMBER_OF_SLOTS_PER_TM = 1;
+	private static final int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
 
-	private final static int NUMBER_OF_NETWORK_BUFFERS = 128;
+	private static final int NUMBER_OF_NETWORK_BUFFERS = 128;
 
-	private static TestingCluster flink;
+	private static MiniCluster flink;
 
 	@BeforeClass
 	public static void setUp() throws Exception {
 		final Configuration config = new Configuration();
-		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUMBER_OF_TMS);
-		config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUMBER_OF_SLOTS_PER_TM);
 		config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
 		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, NUMBER_OF_NETWORK_BUFFERS);
 
-		flink = new TestingCluster(config, true);
+		final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
+			.setConfiguration(config)
+			.setNumTaskManagers(NUMBER_OF_TMS)
+			.setNumSlotsPerTaskManager(NUMBER_OF_SLOTS_PER_TM)
+			.build();
+
+		flink = new MiniCluster(miniClusterConfiguration);
 
 		flink.start();
 	}
 
 	@AfterClass
 	public static void tearDown() throws Exception {
-		flink.stop();
+		if (flink != null) {
+			flink.close();
+		}
 	}
 
 	/**
@@ -102,7 +111,7 @@ public class PartialConsumePipelinedResultTest extends TestLogger {
 		sender.setSlotSharingGroup(slotSharingGroup);
 		receiver.setSlotSharingGroup(slotSharingGroup);
 
-		flink.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION());
+		flink.executeJobBlocking(jobGraph);
 	}
 
 	// ---------------------------------------------------------------------------------------------


[06/10] flink git commit: [FLINK-9212][REST] Port SubtasksAllAccumulatorsHandler to new REST endpoint

Posted by ch...@apache.org.
[FLINK-9212][REST] Port SubtasksAllAccumulatorsHandler to new REST endpoint

This closes #5893.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/512083a7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/512083a7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/512083a7

Branch: refs/heads/master
Commit: 512083a7dd8d0574afbfa8e2c7065f16e9a83d12
Parents: 8b98f79
Author: zhouhai02 <zh...@meituan.com>
Authored: Sun Apr 22 18:59:11 2018 +0800
Committer: zentol <ch...@apache.org>
Committed: Tue Apr 24 13:32:22 2018 +0200

----------------------------------------------------------------------
 .../job/SubtasksAllAccumulatorsHandler.java     |  82 +++++++++++
 .../SubtasksAllAccumulatorsHandlers.java        |  75 ++++++++++
 .../job/SubtasksAllAccumulatorsInfo.java        | 144 +++++++++++++++++++
 .../runtime/webmonitor/WebMonitorEndpoint.java  |  12 ++
 .../job/SubtasksAllAccumulatorsInfoTest.java    |  58 ++++++++
 5 files changed, 371 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/512083a7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksAllAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksAllAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksAllAccumulatorsHandler.java
new file mode 100644
index 0000000..51efba2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksAllAccumulatorsHandler.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.job.SubtasksAllAccumulatorsInfo;
+import org.apache.flink.runtime.rest.messages.job.UserAccumulator;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler for the subtasks all accumulators.
+ */
+public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexHandler<SubtasksAllAccumulatorsInfo, JobVertexMessageParameters> {
+
+	public SubtasksAllAccumulatorsHandler(CompletableFuture<String> localRestAddress, GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, MessageHeaders<EmptyRequestBody, SubtasksAllAccumulatorsInfo, JobVertexMessageParameters> messageHeaders, ExecutionGraphCache executionGraphCache, Executor executor) {
+		super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor);
+	}
+
+	@Override
+	protected SubtasksAllAccumulatorsInfo handleRequest(HandlerRequest<EmptyRequestBody, JobVertexMessageParameters> request, AccessExecutionJobVertex jobVertex) throws RestHandlerException {
+		JobVertexID jobVertexId = jobVertex.getJobVertexId();
+		int parallelism = jobVertex.getParallelism();
+
+		final List<SubtasksAllAccumulatorsInfo.SubtaskAccumulatorsInfo> subtaskAccumulatorsInfos = new ArrayList<>();
+
+		for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
+			TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
+			String locationString = location == null ? "(unassigned)" : location.getHostname();
+
+			StringifiedAccumulatorResult[] accs = vertex.getCurrentExecutionAttempt().getUserAccumulatorsStringified();
+			List<UserAccumulator> userAccumulators = new ArrayList<>(accs.length);
+			for (StringifiedAccumulatorResult acc : accs) {
+				userAccumulators.add(new UserAccumulator(acc.getName(), acc.getType(), acc.getValue()));
+			}
+
+			subtaskAccumulatorsInfos.add(
+				new SubtasksAllAccumulatorsInfo.SubtaskAccumulatorsInfo(
+					vertex.getCurrentExecutionAttempt().getParallelSubtaskIndex(),
+					vertex.getCurrentExecutionAttempt().getAttemptNumber(),
+					locationString,
+					userAccumulators
+				));
+		}
+
+		return new SubtasksAllAccumulatorsInfo(jobVertexId, parallelism, subtaskAccumulatorsInfos);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/512083a7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksAllAccumulatorsHandlers.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksAllAccumulatorsHandlers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksAllAccumulatorsHandlers.java
new file mode 100644
index 0000000..e178c93
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksAllAccumulatorsHandlers.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.job.SubtasksAllAccumulatorsHandler;
+import org.apache.flink.runtime.rest.messages.job.SubtasksAllAccumulatorsInfo;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link SubtasksAllAccumulatorsHandler}.
+ */
+public class SubtasksAllAccumulatorsHandlers implements MessageHeaders<EmptyRequestBody, SubtasksAllAccumulatorsInfo, JobVertexMessageParameters> {
+
+	private static final SubtasksAllAccumulatorsHandlers INSTANCE = new SubtasksAllAccumulatorsHandlers();
+
+	public static final String URL = "/jobs" +
+		"/:" + JobIDPathParameter.KEY +
+		"/vertices" +
+		"/:" + JobVertexIdPathParameter.KEY +
+		"/subtasks/accumulators";
+
+	private SubtasksAllAccumulatorsHandlers() {}
+
+	@Override
+	public Class<EmptyRequestBody> getRequestClass() {
+		return EmptyRequestBody.class;
+	}
+
+	@Override
+	public Class<SubtasksAllAccumulatorsInfo> getResponseClass() {
+		return SubtasksAllAccumulatorsInfo.class;
+	}
+
+	@Override
+	public HttpResponseStatus getResponseStatusCode() {
+		return HttpResponseStatus.OK;
+	}
+
+	@Override
+	public JobVertexMessageParameters getUnresolvedMessageParameters() {
+		return new JobVertexMessageParameters();
+	}
+
+	@Override
+	public HttpMethodWrapper getHttpMethod() {
+		return HttpMethodWrapper.GET;
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return URL;
+	}
+
+	public static SubtasksAllAccumulatorsHandlers getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/512083a7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtasksAllAccumulatorsInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtasksAllAccumulatorsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtasksAllAccumulatorsInfo.java
new file mode 100644
index 0000000..ee2535f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtasksAllAccumulatorsInfo.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.job.SubtasksAllAccumulatorsHandler;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link SubtasksAllAccumulatorsHandler}.
+ */
+public class SubtasksAllAccumulatorsInfo implements ResponseBody {
+
+	public static final String FIELD_NAME_JOB_VERTEX_ID = "id";
+	public static final String FIELD_NAME_PARALLELISM = "parallelism";
+	public static final String FIELD_NAME_SUBTASKS = "subtasks";
+
+	@JsonProperty(FIELD_NAME_JOB_VERTEX_ID)
+	@JsonSerialize(using = JobVertexIDSerializer.class)
+	private final JobVertexID jobVertexId;
+
+	@JsonProperty(FIELD_NAME_PARALLELISM)
+	private final int parallelism;
+
+	@JsonProperty(FIELD_NAME_SUBTASKS)
+	private final Collection<SubtaskAccumulatorsInfo> subtaskAccumulatorsInfos;
+
+	@JsonCreator
+	public SubtasksAllAccumulatorsInfo(
+		@JsonDeserialize(using = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_JOB_VERTEX_ID) JobVertexID jobVertexId,
+		@JsonProperty(FIELD_NAME_PARALLELISM) int parallelism,
+		@JsonProperty(FIELD_NAME_SUBTASKS) Collection<SubtaskAccumulatorsInfo> subtaskAccumulatorsInfos) {
+		this.jobVertexId = Preconditions.checkNotNull(jobVertexId);
+		this.parallelism = parallelism;
+		this.subtaskAccumulatorsInfos = Preconditions.checkNotNull(subtaskAccumulatorsInfos);
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		SubtasksAllAccumulatorsInfo that = (SubtasksAllAccumulatorsInfo) o;
+		return Objects.equals(jobVertexId, that.jobVertexId) &&
+			parallelism == that.parallelism &&
+			Objects.equals(subtaskAccumulatorsInfos, that.subtaskAccumulatorsInfos);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(jobVertexId, parallelism, subtaskAccumulatorsInfos);
+	}
+
+	// ---------------------------------------------------
+	// Static inner classes
+	// ---------------------------------------------------
+
+	/**
+	 * Detailed information about subtask accumulators.
+	 */
+	public static class SubtaskAccumulatorsInfo {
+		public static final String FIELD_NAME_SUBTASK_INDEX = "subtask";
+		public static final String FIELD_NAME_ATTEMPT_NUM = "attempt";
+		public static final String FIELD_NAME_HOST = "host";
+		public static final String FIELD_NAME_USER_ACCUMULATORS = "user-accumulators";
+
+
+		@JsonProperty(FIELD_NAME_SUBTASK_INDEX)
+		private final int subtaskIndex;
+
+		@JsonProperty(FIELD_NAME_ATTEMPT_NUM)
+		private final int attemptNum;
+
+		@JsonProperty(FIELD_NAME_HOST)
+		private final String host;
+
+		@JsonProperty(FIELD_NAME_USER_ACCUMULATORS)
+		private final Collection<UserAccumulator> userAccumulators;
+
+		@JsonCreator
+		public SubtaskAccumulatorsInfo(
+			@JsonProperty(FIELD_NAME_SUBTASK_INDEX) int subtaskIndex,
+			@JsonProperty(FIELD_NAME_ATTEMPT_NUM) int attemptNum,
+			@JsonProperty(FIELD_NAME_HOST) String host,
+			@JsonProperty(FIELD_NAME_USER_ACCUMULATORS) Collection<UserAccumulator> userAccumulators) {
+
+			this.subtaskIndex = subtaskIndex;
+			this.attemptNum = attemptNum;
+			this.host = Preconditions.checkNotNull(host);
+			this.userAccumulators = Preconditions.checkNotNull(userAccumulators);
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			SubtaskAccumulatorsInfo that = (SubtaskAccumulatorsInfo) o;
+			return subtaskIndex == that.subtaskIndex &&
+				attemptNum == that.attemptNum &&
+				Objects.equals(host, that.host) &&
+				Objects.equals(userAccumulators, that.userAccumulators);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(subtaskIndex, attemptNum, host, userAccumulators);
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/512083a7/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 0ea7550..1a67d92 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -50,6 +50,7 @@ import org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler;
 import org.apache.flink.runtime.rest.handler.job.SubtaskCurrentAttemptDetailsHandler;
 import org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptAccumulatorsHandler;
 import org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler;
+import org.apache.flink.runtime.rest.handler.job.SubtasksAllAccumulatorsHandler;
 import org.apache.flink.runtime.rest.handler.job.SubtasksTimesHandler;
 import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
 import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticDetailsHandler;
@@ -94,6 +95,7 @@ import org.apache.flink.runtime.rest.messages.JobVertexBackPressureHeaders;
 import org.apache.flink.runtime.rest.messages.JobVertexDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.JobVertexTaskManagersHeaders;
 import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
+import org.apache.flink.runtime.rest.messages.SubtasksAllAccumulatorsHandlers;
 import org.apache.flink.runtime.rest.messages.SubtasksTimesHeaders;
 import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
 import org.apache.flink.runtime.rest.messages.YarnCancelJobTerminationHeaders;
@@ -318,6 +320,15 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 			executionGraphCache,
 			executor);
 
+		SubtasksAllAccumulatorsHandler subtasksAllAccumulatorsHandler = new SubtasksAllAccumulatorsHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			SubtasksAllAccumulatorsHandlers.getInstance(),
+			executionGraphCache,
+			executor);
+
 		TaskManagersHandler taskManagersHandler = new TaskManagersHandler(
 			restAddressFuture,
 			leaderRetriever,
@@ -575,6 +586,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 		handlers.add(Tuple2.of(TaskCheckpointStatisticsHeaders.getInstance(), taskCheckpointStatisticDetailsHandler));
 		handlers.add(Tuple2.of(JobExceptionsHeaders.getInstance(), jobExceptionsHandler));
 		handlers.add(Tuple2.of(JobVertexAccumulatorsHeaders.getInstance(), jobVertexAccumulatorsHandler));
+		handlers.add(Tuple2.of(JobVertexAccumulatorsHeaders.getInstance(), subtasksAllAccumulatorsHandler));
 		handlers.add(Tuple2.of(JobDetailsHeaders.getInstance(), jobDetailsHandler));
 		handlers.add(Tuple2.of(JobAccumulatorsHeaders.getInstance(), jobAccumulatorsHandler));
 		handlers.add(Tuple2.of(TaskManagersHeaders.getInstance(), taskManagersHandler));

http://git-wip-us.apache.org/repos/asf/flink/blob/512083a7/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtasksAllAccumulatorsInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtasksAllAccumulatorsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtasksAllAccumulatorsInfoTest.java
new file mode 100644
index 0000000..2a71239
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtasksAllAccumulatorsInfoTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests (un)marshalling of the {@link SubtasksAllAccumulatorsInfo}.
+ */
+public class SubtasksAllAccumulatorsInfoTest extends RestResponseMarshallingTestBase<SubtasksAllAccumulatorsInfo> {
+	@Override
+	protected Class<SubtasksAllAccumulatorsInfo> getTestResponseClass() {
+		return SubtasksAllAccumulatorsInfo.class;
+	}
+
+	@Override
+	protected SubtasksAllAccumulatorsInfo getTestResponseInstance() throws Exception {
+		List<SubtasksAllAccumulatorsInfo.SubtaskAccumulatorsInfo> subtaskAccumulatorsInfos = new ArrayList<>(3);
+
+		List<UserAccumulator> userAccumulators = new ArrayList<>(2);
+		userAccumulators.add(new UserAccumulator("test name1", "test type1", "test value1"));
+		userAccumulators.add(new UserAccumulator("test name2", "test type2", "test value2"));
+
+		for (int i = 0; i < 3; ++i) {
+			subtaskAccumulatorsInfos.add(
+				new SubtasksAllAccumulatorsInfo.SubtaskAccumulatorsInfo(
+					i,
+					i,
+					"host-" + String.valueOf(i),
+					userAccumulators
+				));
+
+		}
+		return new SubtasksAllAccumulatorsInfo(new JobVertexID(),
+			4,
+			subtaskAccumulatorsInfos);
+	}
+}


[05/10] flink git commit: [FLINK-9225][core] Minor code comments fix in RuntimeContext

Posted by ch...@apache.org.
[FLINK-9225][core] Minor code comments fix in RuntimeContext

This closes #5883.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8b98f79c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8b98f79c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8b98f79c

Branch: refs/heads/master
Commit: 8b98f79c90c26291755dbccb5b7065a9fba768a2
Parents: f633a80
Author: binlijin <bi...@gmail.com>
Authored: Fri Apr 20 15:02:59 2018 +0800
Committer: zentol <ch...@apache.org>
Committed: Tue Apr 24 13:30:08 2018 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/api/common/functions/RuntimeContext.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8b98f79c/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index 3429143..e221091 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -249,7 +249,7 @@ public interface RuntimeContext {
 	 *
 	 * keyedStream.map(new RichMapFunction<MyType, Tuple2<MyType, Long>>() {
 	 *
-	 *     private ValueState<Long> count;
+	 *     private ValueState<Long> state;
 	 *
 	 *     public void open(Configuration cfg) {
 	 *         state = getRuntimeContext().getState(


[03/10] flink git commit: [hotfix][tests] Properly close writer in NetworkStackThroughputITCase

Posted by ch...@apache.org.
[hotfix][tests] Properly close writer in NetworkStackThroughputITCase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/598cb668
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/598cb668
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/598cb668

Branch: refs/heads/master
Commit: 598cb6689cbfc54ddf0bce69eaec18a8d29c20e0
Parents: b28e163
Author: zentol <ch...@apache.org>
Authored: Thu Apr 19 10:27:56 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Apr 24 13:30:06 2018 +0200

----------------------------------------------------------------------
 .../apache/flink/test/runtime/NetworkStackThroughputITCase.java    | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/598cb668/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
index 3b93ca2..1b46f46 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
@@ -104,6 +104,7 @@ public class NetworkStackThroughputITCase extends TestLogger {
 				}
 			}
 			finally {
+				writer.clearBuffers();
 				writer.flushAll();
 			}
 		}
@@ -137,6 +138,7 @@ public class NetworkStackThroughputITCase extends TestLogger {
 			}
 			finally {
 				reader.clearBuffers();
+				writer.clearBuffers();
 				writer.flushAll();
 			}
 		}


[09/10] flink git commit: [FLINK-9033][config] Replace usages of deprecated TASK_MANAGER_NUM_TASK_SLOTS

Posted by ch...@apache.org.
[FLINK-9033][config] Replace usages of deprecated TASK_MANAGER_NUM_TASK_SLOTS

This closes #5731.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/73e9f901
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/73e9f901
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/73e9f901

Branch: refs/heads/master
Commit: 73e9f9013391a4e5be18f1cfc1f9462a78e95ca7
Parents: 1a9675d
Author: zhouhai02 <zh...@meituan.com>
Authored: Wed Mar 21 00:01:54 2018 +0800
Committer: zentol <ch...@apache.org>
Committed: Tue Apr 24 13:45:17 2018 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/flink/client/LocalExecutor.java  |  7 ++++---
 .../flink/client/deployment/ClusterSpecification.java     |  3 +--
 .../kinesis/manualtests/ManualExactlyOnceTest.java        |  2 +-
 .../ManualExactlyOnceWithStreamReshardingTest.java        |  2 +-
 .../org/apache/flink/storm/api/FlinkLocalCluster.java     |  3 +--
 .../flink/runtime/clusterframework/BootstrapTools.java    |  2 +-
 .../runtime/minicluster/MiniClusterConfiguration.java     |  3 ++-
 .../runtime/taskexecutor/TaskManagerConfiguration.java    |  2 +-
 .../taskexecutor/TaskManagerServicesConfiguration.java    |  4 ++--
 .../flink/runtime/checkpoint/CoordinatorShutdownTest.java |  5 +++--
 .../partition/PartialConsumePipelinedResultTest.java      |  2 +-
 .../flink/runtime/jobmanager/JobManagerCleanupITCase.java |  3 ++-
 .../runtime/jobmanager/JobManagerHARecoveryTest.java      |  4 ++--
 .../apache/flink/runtime/jobmanager/JobManagerTest.java   |  5 ++---
 .../leaderelection/LeaderChangeJobRecoveryTest.java       |  3 ++-
 .../leaderelection/LeaderChangeStateCleanupTest.java      |  3 ++-
 .../backpressure/BackPressureStatsTrackerImplITCase.java  |  4 ++--
 .../backpressure/StackTraceSampleCoordinatorITCase.java   |  4 ++--
 .../org/apache/flink/runtime/akka/AkkaSslITCase.scala     | 10 +++++-----
 .../apache/flink/runtime/jobmanager/RecoveryITCase.scala  |  4 ++--
 .../apache/flink/runtime/testingUtils/TestingUtils.scala  |  2 +-
 .../org/apache/flink/api/scala/ScalaShellITCase.scala     |  4 ++--
 .../org/apache/flink/test/util/MiniClusterResource.java   |  2 +-
 .../java/org/apache/flink/test/util/TestBaseUtils.java    |  2 +-
 .../checkpointing/utils/SavepointMigrationTestBase.java   |  3 ++-
 .../flink/test/operators/ExecutionEnvironmentITCase.java  |  4 ++--
 .../flink/test/operators/RemoteEnvironmentITCase.java     |  4 ++--
 .../AbstractTaskManagerProcessFailureRecoveryTest.java    |  2 +-
 .../recovery/JobManagerHACheckpointRecoveryITCase.java    |  3 ++-
 .../JobManagerHAProcessFailureBatchRecoveryITCase.java    |  3 +--
 .../test/recovery/TaskManagerFailureRecoveryITCase.java   |  2 +-
 .../leaderelection/ZooKeeperLeaderElectionITCase.java     |  3 ++-
 .../runtime/minicluster/LocalFlinkMiniClusterITCase.java  |  3 ++-
 .../scala/runtime/jobmanager/JobManagerFailsITCase.scala  |  2 +-
 .../runtime/taskmanager/TaskManagerFailsITCase.scala      |  5 ++---
 .../org/apache/flink/yarn/cli/FlinkYarnSessionCli.java    |  3 +--
 36 files changed, 63 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index 01c281f..f837c4f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.dag.DataSinkNode;
@@ -139,7 +140,7 @@ public class LocalExecutor extends PlanExecutor {
 				.setRpcServiceSharing(MiniClusterConfiguration.RpcServiceSharing.SHARED)
 				.setNumSlotsPerTaskManager(
 					configuration.getInteger(
-						ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1))
+						TaskManagerOptions.NUM_TASK_SLOTS, 1))
 				.build();
 
 			final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);
@@ -220,7 +221,7 @@ public class LocalExecutor extends PlanExecutor {
 
 			try {
 				// TODO: Set job's default parallelism to max number of slots
-				final int slotsPerTaskManager = jobExecutorServiceConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots);
+				final int slotsPerTaskManager = jobExecutorServiceConfiguration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, taskManagerNumSlots);
 				final int numTaskManagers = jobExecutorServiceConfiguration.getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
 				plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers);
 
@@ -265,7 +266,7 @@ public class LocalExecutor extends PlanExecutor {
 
 	private Configuration createConfiguration() {
 		Configuration newConfiguration = new Configuration();
-		newConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, getTaskManagerNumSlots());
+		newConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, getTaskManagerNumSlots());
 		newConfiguration.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, isDefaultOverwriteFiles());
 
 		newConfiguration.addAll(baseConfiguration);

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
index 8650cab..cf2ae4c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.client.deployment;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -66,7 +65,7 @@ public final class ClusterSpecification {
 	}
 
 	public static ClusterSpecification fromConfiguration(Configuration configuration) {
-		int slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+		int slots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
 
 		int jobManagerMemoryMb = configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
 		int taskManagerMemoryMb = configuration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
index 963002f..40225fb 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
@@ -78,7 +78,7 @@ public class ManualExactlyOnceTest {
 
 		final Configuration flinkConfig = new Configuration();
 		flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
+		flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 8);
 		flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16);
 		flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
index 93b9caf..34dcdc0 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
@@ -90,7 +90,7 @@ public class ManualExactlyOnceWithStreamReshardingTest {
 
 		final Configuration flinkConfig = new Configuration();
 		flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
+		flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 8);
 		flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16);
 		flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
index bff8c80..6b0b503 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.storm.api;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -90,7 +89,7 @@ public class FlinkLocalCluster {
 			configuration.addAll(jobGraph.getJobConfiguration());
 
 			configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
-			configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
+			configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
 
 			this.flink = new LocalFlinkMiniCluster(configuration, true);
 			this.flink.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index 102274d1..7a8403a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -247,7 +247,7 @@ public class BootstrapTools {
 
 		cfg.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, registrationTimeout.toString());
 		if (numSlots != -1){
-			cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
+			cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots);
 		}
 
 		return cfg; 

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
index 44a567b..0a0c692 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.util.Preconditions;
@@ -167,7 +168,7 @@ public class MiniClusterConfiguration {
 
 		public MiniClusterConfiguration build() {
 			final Configuration modifiedConfiguration = new Configuration(configuration);
-			modifiedConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
+			modifiedConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlotsPerTaskManager);
 			modifiedConfiguration.setString(
 				RestOptions.ADDRESS,
 				modifiedConfiguration.getString(RestOptions.ADDRESS, "localhost"));

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
index 1bf42ee..e8a7ae8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -162,7 +162,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 	// --------------------------------------------------------------------------------------------
 
 	public static TaskManagerConfiguration fromConfiguration(Configuration configuration) {
-		int numberSlots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+		int numberSlots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
 
 		if (numberSlots == -1) {
 			numberSlots = 1;

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index d029bc5..b80320c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -196,7 +196,7 @@ public class TaskManagerServicesConfiguration {
 			boolean localCommunication) throws Exception {
 
 		// we need this because many configs have been written with a "-1" entry
-		int slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+		int slots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
 		if (slots == -1) {
 			slots = 1;
 		}
@@ -290,7 +290,7 @@ public class TaskManagerServicesConfiguration {
 		checkConfigParameter(dataport >= 0, dataport, TaskManagerOptions.DATA_PORT.key(),
 			"Leave config parameter empty or use 0 to let the system choose a port automatically.");
 
-		checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
+		checkConfigParameter(slots >= 1, slots, TaskManagerOptions.NUM_TASK_SLOTS.key(),
 			"Number of task slots must be at least one.");
 
 		final int pageSize = configuration.getInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE);

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index 1d44444..8a6a9d8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.execution.Environment;
@@ -59,7 +60,7 @@ public class CoordinatorShutdownTest extends TestLogger {
 		try {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+			config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
 			cluster = new LocalFlinkMiniCluster(config, true);
 			cluster.start();
 			
@@ -128,7 +129,7 @@ public class CoordinatorShutdownTest extends TestLogger {
 		try {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+			config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
 			cluster = new LocalFlinkMiniCluster(config, true);
 			cluster.start();
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index ced1a33..8080ca4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -55,7 +55,7 @@ public class PartialConsumePipelinedResultTest extends TestLogger {
 	public static void setUp() throws Exception {
 		final Configuration config = new Configuration();
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUMBER_OF_TMS);
-		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUMBER_OF_SLOTS_PER_TM);
+		config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUMBER_OF_SLOTS_PER_TM);
 		config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
 		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, NUMBER_OF_NETWORK_BUFFERS);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
index 8806dec..33a4a28 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
@@ -146,7 +147,7 @@ public class JobManagerCleanupITCase extends TestLogger {
 
 					try {
 						Configuration config = new Configuration();
-						config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+						config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
 						config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
 						config.setString(AkkaOptions.ASK_TIMEOUT, DEFAULT_AKKA_ASK_TIMEOUT());
 						config.setString(BlobServerOptions.STORAGE_DIRECTORY, blobBaseDir.getAbsolutePath());

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index bb2dbf7..d991983 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -20,9 +20,9 @@ package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.OneShotLatch;
@@ -174,7 +174,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
 
 		flinkConfiguration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
 		flinkConfiguration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().toString());
-		flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slots);
+		flinkConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, slots);
 		flinkConfiguration.setLong(BlobServerOptions.CLEANUP_INTERVAL, 3_600L);
 
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 417294c..79e6d20 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.jobmanager;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -641,7 +640,7 @@ public class JobManagerTest extends TestLogger {
 
 		Configuration tmConfig = new Configuration();
 		tmConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
-		tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
+		tmConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 8);
 
 		ActorRef taskManager = TaskManager.startTaskManagerComponentsAndActor(
 			tmConfig,
@@ -1300,7 +1299,7 @@ public class JobManagerTest extends TestLogger {
 			archiver = new AkkaActorGateway(master._2(), leaderId);
 
 			Configuration tmConfig = new Configuration();
-			tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
+			tmConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 4);
 
 			ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
 				tmConfig,

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
index 942fcf3..2ebaeba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.leaderelection;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
@@ -72,7 +73,7 @@ public class LeaderChangeJobRecoveryTest extends TestLogger {
 
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1);
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
-		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM);
+		configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlotsPerTM);
 
 		configuration.setString(ConfigConstants.RESTART_STRATEGY, "fixeddelay");
 		configuration.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 9999);

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
index bbcbbf0..6880d9b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.leaderelection;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.TestingManualHighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -78,7 +79,7 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
 
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
-		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM);
+		configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlotsPerTM);
 
 		highAvailabilityServices = new TestingManualHighAvailabilityServices();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
index 994d02e..0e837a5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.rest.handler.legacy.backpressure;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobClient;
@@ -138,7 +138,7 @@ public class BackPressureStatsTrackerImplITCase extends TestLogger {
 					config,
 					highAvailabilityServices);
 
-				config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
+				config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism);
 
 				taskManager = TestingUtils.createTaskManager(
 					testActorSystem,

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java
index 8fa302a..ccf5f60 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.rest.handler.legacy.backpressure;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobClient;
@@ -109,7 +109,7 @@ public class StackTraceSampleCoordinatorITCase extends TestLogger {
 					config,
 					highAvailabilityServices);
 
-				config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
+				config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism);
 
 				taskManager = TestingUtils.createTaskManager(
 					testActorSystem,

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
index 72596cd..ebdae31 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
@@ -51,7 +51,7 @@ class AkkaSslITCase(_system: ActorSystem)
       val config = new Configuration()
       config.setString(JobManagerOptions.ADDRESS, "127.0.0.1")
       config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "127.0.0.1")
-      config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
+      config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1)
       config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
 
       config.setBoolean(SecurityOptions.SSL_ENABLED, true)
@@ -78,7 +78,7 @@ class AkkaSslITCase(_system: ActorSystem)
         val config = new Configuration()
         config.setString(JobManagerOptions.ADDRESS, "127.0.0.1")
         config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "127.0.0.1")
-        config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
+        config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1)
         config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
 
         config.setBoolean(SecurityOptions.SSL_ENABLED, true)
@@ -101,7 +101,7 @@ class AkkaSslITCase(_system: ActorSystem)
     "start with akka ssl disabled" in {
 
       val config = new Configuration()
-      config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
+      config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1)
       config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
       config.setBoolean(SecurityOptions.SSL_ENABLED, false)
 
@@ -117,7 +117,7 @@ class AkkaSslITCase(_system: ActorSystem)
       an[Exception] should be thrownBy {
 
         val config = new Configuration()
-        config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
+        config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1)
         config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
         config.setString(AkkaOptions.ASK_TIMEOUT, "2 s")
 
@@ -139,7 +139,7 @@ class AkkaSslITCase(_system: ActorSystem)
       an[Exception] should be thrownBy {
 
         val config = new Configuration()
-        config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
+        config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1)
         config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
         config.setString(AkkaOptions.ASK_TIMEOUT, "2 s")
 

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
index 71d2ee9..a2dffc8 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
@@ -22,7 +22,7 @@ import akka.actor.{ActorSystem, PoisonPill}
 import akka.testkit.{ImplicitSender, TestKit}
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.restartstrategy.RestartStrategies
-import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration}
+import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration, TaskManagerOptions}
 import org.apache.flink.runtime.akka.ListeningBehaviour
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType
 import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobStatus, JobVertex}
@@ -59,7 +59,7 @@ class RecoveryITCase(_system: ActorSystem)
       heartbeatTimeout: String)
     : TestingCluster = {
     val config = new Configuration()
-    config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
+    config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots)
     config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers)
     config.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, heartbeatTimeout)
     new TestingCluster(config)

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 2d8d02d..b89d2a6 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -88,7 +88,7 @@ object TestingUtils {
   def startTestingCluster(numSlots: Int, numTMs: Int = 1,
                           timeout: String = DEFAULT_AKKA_ASK_TIMEOUT): TestingCluster = {
     val config = new Configuration()
-    config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
+    config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots)
     config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs)
     config.setString(AkkaOptions.ASK_TIMEOUT, timeout)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
index cb6231a..2e07fb9 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -22,8 +22,8 @@ import java.io._
 
 import akka.actor.ActorRef
 import akka.pattern.Patterns
+import org.apache.flink.configuration.{Configuration, CoreOptions, TaskManagerOptions}
 import org.apache.flink.runtime.minicluster.StandaloneMiniCluster
-import org.apache.flink.configuration.{ConfigConstants, Configuration, CoreOptions, GlobalConfiguration}
 import org.apache.flink.runtime.clusterframework.BootstrapTools
 import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.util.TestLogger
@@ -321,7 +321,7 @@ object ScalaShellITCase {
 
   @BeforeClass
   def beforeAll(): Unit = {
-    configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism)
+    configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism)
     configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE)
 
     cluster = Option(new StandaloneMiniCluster(configuration))

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index 8c21b37..844de01 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -198,7 +198,7 @@ public class MiniClusterResource extends ExternalResource {
 	private void startLegacyMiniCluster() throws Exception {
 		final Configuration configuration = new Configuration(miniClusterResourceConfiguration.getConfiguration());
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, miniClusterResourceConfiguration.getNumberTaskManagers());
-		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, miniClusterResourceConfiguration.getNumberSlotsPerTaskManager());
+		configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, miniClusterResourceConfiguration.getNumberSlotsPerTaskManager());
 
 		final LocalFlinkMiniCluster flinkMiniCluster = TestBaseUtils.startCluster(
 			configuration,

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index dd255fd..7e9b12e 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -127,7 +127,7 @@ public class TestBaseUtils extends TestLogger {
 		Configuration config = new Configuration();
 
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
-		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots);
+		config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, taskManagerNumSlots);
 
 		config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, startWebserver);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index 8f2aaa1..cfa155b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -25,6 +25,7 @@ import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -99,7 +100,7 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils {
 		final Configuration config = new Configuration();
 
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, DEFAULT_PARALLELISM);
+		config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, DEFAULT_PARALLELISM);
 
 		UUID id = UUID.randomUUID();
 		final File checkpointDir = TEMP_FOLDER.newFolder("checkpoints_" + id).getAbsoluteFile();

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/java/org/apache/flink/test/operators/ExecutionEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/ExecutionEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/ExecutionEnvironmentITCase.java
index f7f993b..65a21d1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/ExecutionEnvironmentITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/ExecutionEnvironmentITCase.java
@@ -23,8 +23,8 @@ import org.apache.flink.api.common.functions.RichMapPartitionFunction;
 import org.apache.flink.api.common.io.GenericInputFormat;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
@@ -50,7 +50,7 @@ public class ExecutionEnvironmentITCase extends TestLogger {
 	@Test
 	public void testLocalEnvironmentWithConfig() throws Exception {
 		Configuration conf = new Configuration();
-		conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
+		conf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, PARALLELISM);
 
 		final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);
 		env.setParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
index a3a551c..c2d6341 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
@@ -26,9 +26,9 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.runtime.minicluster.MiniCluster;
@@ -96,7 +96,7 @@ public class RemoteEnvironmentITCase extends TestLogger {
 
 			resource = miniCluster;
 		} else {
-			configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
+			configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, TM_SLOTS);
 			final StandaloneMiniCluster standaloneMiniCluster = new StandaloneMiniCluster(configuration);
 			hostname = standaloneMiniCluster.getHostname();
 			port = standaloneMiniCluster.getPort();

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 29516dc..6764f6f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -406,7 +406,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 				cfg.setInteger(JobManagerOptions.PORT, jobManagerPort);
 				cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
 				cfg.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
-				cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+				cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
 				cfg.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
 
 				TaskManager.selectNetworkInterfaceAndRunTaskManager(cfg,

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index 16ea6d5..ce750d3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
@@ -365,7 +366,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 			config.setInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, retainedCheckpoints);
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
+			config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots);
 			config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, temporaryFolder.newFolder().toString());
 
 			String tmpFolderString = temporaryFolder.newFolder().toString();

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
index d217a2a..50404a4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -270,7 +269,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 			// Task manager configuration
 			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
 			config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+			config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
 
 			highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
 				config,

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
index 8371230..755b3d3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
@@ -77,7 +77,7 @@ public class TaskManagerFailureRecoveryITCase extends TestLogger {
 		try {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
+			config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism);
 			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
 
 			config.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "500 ms");

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
index f348b8a..399fc10 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -157,7 +158,7 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
 
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
-		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM);
+		configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlotsPerTM);
 
 		// we "effectively" disable the automatic RecoverAllJobs message and sent it manually to make
 		// sure that all TMs have registered to the JM prior to issuing the RecoverAllJobs message

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
index 770b88c..b3d92df 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.test.runtime.minicluster;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -76,7 +77,7 @@ public class LocalFlinkMiniClusterITCase extends TestLogger {
 		try {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
+			config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots);
 			miniCluster = new LocalFlinkMiniCluster(config, true);
 
 			miniCluster.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
index 5fe7b1d..5c9b1fb 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
@@ -133,7 +133,7 @@ class JobManagerFailsITCase(_system: ActorSystem)
 
   def startDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): TestingCluster = {
     val config = new Configuration()
-    config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
+    config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots)
     config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskmanagers)
     config.setInteger(JobManagerOptions.PORT, 0)
     config.setString(TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF, "50 ms")

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
index a065e5b..69ad2d7 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
@@ -20,8 +20,7 @@ package org.apache.flink.api.scala.runtime.taskmanager
 
 import akka.actor.{ActorSystem, Kill, PoisonPill}
 import akka.testkit.{ImplicitSender, TestKit}
-import org.apache.flink.configuration.ConfigConstants
-import org.apache.flink.configuration.Configuration
+import org.apache.flink.configuration.{ConfigConstants, Configuration, TaskManagerOptions}
 import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
 import org.apache.flink.runtime.client.JobExecutionException
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType
@@ -243,7 +242,7 @@ class TaskManagerFailsITCase(_system: ActorSystem)
 
   def createDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): TestingCluster = {
     val config = new Configuration()
-    config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
+    config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots)
     config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskmanagers)
 
     new TestingCluster(config, singleActorSystem = false)

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index fe04662..7596d68 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -23,7 +23,6 @@ import org.apache.flink.client.cli.CliArgsException;
 import org.apache.flink.client.cli.CliFrontend;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -500,7 +499,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 		}
 
 		if (commandLine.hasOption(slots.getOpt())) {
-			effectiveConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, Integer.parseInt(commandLine.getOptionValue(slots.getOpt())));
+			effectiveConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, Integer.parseInt(commandLine.getOptionValue(slots.getOpt())));
 		}
 
 		if (isYarnPropertiesFileMode(commandLine)) {


[02/10] flink git commit: [FLINK-8967][tests] Port NetworkStackThroughputITCase to flip6

Posted by ch...@apache.org.
[FLINK-8967][tests] Port NetworkStackThroughputITCase to flip6

This closes #5870.
This closes #5247.
This closes #5875.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b28e1632
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b28e1632
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b28e1632

Branch: refs/heads/master
Commit: b28e1632552cae9380269125ea4a486b5830f6d8
Parents: 26eb8bb
Author: zentol <ch...@apache.org>
Authored: Tue Apr 17 15:24:22 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Apr 24 13:29:34 2018 +0200

----------------------------------------------------------------------
 .../runtime/NetworkStackThroughputITCase.java   | 40 +++++++++++---------
 1 file changed, 23 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b28e1632/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
index e6401c0..3b93ca2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
@@ -19,10 +19,11 @@
 package org.apache.flink.test.runtime;
 
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.reader.RecordReader;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
@@ -32,9 +33,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Ignore;
@@ -42,7 +41,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -234,43 +232,51 @@ public class NetworkStackThroughputITCase extends TestLogger {
 
 			final int numTaskManagers = parallelism / numSlotsPerTaskManager;
 
-			final LocalFlinkMiniCluster localFlinkMiniCluster = TestBaseUtils.startCluster(
-				numTaskManagers,
-				numSlotsPerTaskManager,
-				false,
-				false,
-				true);
+			final MiniClusterResource cluster = new MiniClusterResource(
+				new MiniClusterResource.MiniClusterResourceConfiguration(
+					new Configuration(),
+					numTaskManagers,
+					numSlotsPerTaskManager
+				),
+				true
+			);
+			cluster.before();
 
 			try {
-				System.out.println(Arrays.toString(p));
+				System.out.println(String.format("Running test with parameters: dataVolumeGB=%s, useForwarder=%s, isSlowSender=%s, isSlowReceiver=%s, parallelism=%s, numSlotsPerTM=%s",
+					dataVolumeGb, useForwarder, isSlowSender, isSlowReceiver, parallelism, numSlotsPerTaskManager));
 				testProgram(
-					localFlinkMiniCluster,
+					cluster,
 					dataVolumeGb,
 					useForwarder,
 					isSlowSender,
 					isSlowReceiver,
 					parallelism);
 			} finally {
-				TestBaseUtils.stopCluster(localFlinkMiniCluster, FutureUtils.toFiniteDuration(TestingUtils.TIMEOUT()));
+				cluster.after();
 			}
 		}
 	}
 
 	private void testProgram(
-			LocalFlinkMiniCluster localFlinkMiniCluster,
+			final MiniClusterResource cluster,
 			final int dataVolumeGb,
 			final boolean useForwarder,
 			final boolean isSlowSender,
 			final boolean isSlowReceiver,
 			final int parallelism) throws Exception {
-		JobExecutionResult jer = localFlinkMiniCluster.submitJobAndWait(
+		ClusterClient<?> client = cluster.getClusterClient();
+		client.setDetached(false);
+		client.setPrintStatusDuringExecution(false);
+
+		JobExecutionResult jer = (JobExecutionResult) client.submitJob(
 			createJobGraph(
 				dataVolumeGb,
 				useForwarder,
 				isSlowSender,
 				isSlowReceiver,
 				parallelism),
-			false);
+			getClass().getClassLoader());
 
 		long dataVolumeMbit = dataVolumeGb * 8192;
 		long runtimeSecs = jer.getNetRuntime(TimeUnit.SECONDS);