You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/05/02 09:29:39 UTC

[01/16] flink git commit: [FLINK-9214] [tests, yarn] YarnClient should be stopped in YARNSessionCapacitySchedulerITCase#testDetachedPerJobYarnClusterInternal

Repository: flink
Updated Branches:
  refs/heads/release-1.5 64b32ecb6 -> b7532cb73


[FLINK-9214] [tests, yarn] YarnClient should be stopped in YARNSessionCapacitySchedulerITCase#testDetachedPerJobYarnClusterInternal

This closes #5892


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f1d536a9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f1d536a9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f1d536a9

Branch: refs/heads/release-1.5
Commit: f1d536a943ee19c9224b65b63d31b9dde72fe707
Parents: 64b32ec
Author: yanghua <ya...@gmail.com>
Authored: Sun Apr 22 16:38:28 2018 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Apr 30 23:24:51 2018 +0200

----------------------------------------------------------------------
 .../apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java  | 6 ++++++
 1 file changed, 6 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f1d536a9/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index 3c442b9..4bbd500 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -590,6 +590,12 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 				LOG.warn("testDetachedPerJobYarnClusterInternal: Exception while deleting the JobManager address file", e);
 			}
 
+			try {
+				LOG.info("testDetachedPerJobYarnClusterInternal: Closing the yarn client");
+				yc.stop();
+			} catch (Exception e) {
+				LOG.warn("testDetachedPerJobYarnClusterInternal: Exception while close the yarn client", e);
+			}
 		}
 	}
 


[07/16] flink git commit: [hotfix] [mesos] Delete unused class FlinkMesosSessionCli.

Posted by se...@apache.org.
[hotfix] [mesos] Delete unused class FlinkMesosSessionCli.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/557bf0a6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/557bf0a6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/557bf0a6

Branch: refs/heads/release-1.5
Commit: 557bf0a66e80ed9ef0713fb2889b5ee79ff1a67b
Parents: 0f34a85
Author: gyao <ga...@data-artisans.com>
Authored: Thu Apr 26 17:38:20 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Apr 30 23:25:10 2018 +0200

----------------------------------------------------------------------
 .../flink/mesos/cli/FlinkMesosSessionCli.java   | 75 --------------------
 1 file changed, 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/557bf0a6/flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java b/flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java
deleted file mode 100644
index e0df1c9..0000000
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.mesos.cli;
-
-import org.apache.flink.configuration.Configuration;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-
-import java.io.IOException;
-import java.util.Map;
-
-/**
- * Class handling the command line interface to the Mesos session.
- */
-public class FlinkMesosSessionCli {
-
-	private static final ObjectMapper mapper = new ObjectMapper();
-
-	/**
-	 * Decode encoded dynamic properties.
-	 *
-	 * @param dynamicPropertiesEncoded encoded properties produced by the encoding method.
-	 * @return a configuration instance to be merged with the static configuration.
-	 */
-	public static Configuration decodeDynamicProperties(String dynamicPropertiesEncoded) {
-		try {
-			Configuration configuration = new Configuration();
-			if (dynamicPropertiesEncoded != null) {
-				TypeReference<Map<String, String>> typeRef = new TypeReference<Map<String, String>>() {
-				};
-				Map<String, String> props = mapper.readValue(dynamicPropertiesEncoded, typeRef);
-				for (Map.Entry<String, String> property : props.entrySet()) {
-					configuration.setString(property.getKey(), property.getValue());
-				}
-			}
-			return configuration;
-		} catch (IOException ex) {
-			throw new IllegalArgumentException("unreadable encoded properties", ex);
-		}
-	}
-
-	/**
-	 * Encode dynamic properties as a string to be transported as an environment variable.
-	 *
-	 * @param configuration the dynamic properties to encode.
-	 * @return a string to be decoded later.
-	 */
-	public static String encodeDynamicProperties(Configuration configuration) {
-		try {
-			String dynamicPropertiesEncoded = mapper.writeValueAsString(configuration.toMap());
-			return dynamicPropertiesEncoded;
-		}
-		catch (JsonProcessingException ex) {
-			throw new IllegalArgumentException("unwritable properties", ex);
-		}
-	}
-}


[14/16] flink git commit: [hotfix] [tests] Remove redundant rebalance in SuccessAfterNetworkBuffersFailureITCase

Posted by se...@apache.org.
[hotfix] [tests] Remove redundant rebalance in SuccessAfterNetworkBuffersFailureITCase

This closes #5916


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f1fa5174
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f1fa5174
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f1fa5174

Branch: refs/heads/release-1.5
Commit: f1fa517474920da89f8b67bb1a58844c5234af76
Parents: 514cf81
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Apr 26 10:35:55 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Apr 30 23:25:32 2018 +0200

----------------------------------------------------------------------
 .../flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java    | 1 -
 1 file changed, 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f1fa5174/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
index c5c1882..e149f43 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
@@ -136,7 +136,6 @@ public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger {
 
 		// add some re-partitions to increase network buffer use
 		DataSet<KMeans.Centroid> newCentroids = points
-				.rebalance()
 				// compute closest centroid for each point
 				.map(new KMeans.SelectNearestCenter()).withBroadcastSet(loop, "centroids")
 				.rebalance()


[05/16] flink git commit: [hotfix] Remove unnecessary int cast.

Posted by se...@apache.org.
[hotfix] Remove unnecessary int cast.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/32209099
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/32209099
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/32209099

Branch: refs/heads/release-1.5
Commit: 32209099e5011b4b44632e909a81be42481157be
Parents: 36ce86a
Author: gyao <ga...@data-artisans.com>
Authored: Thu Apr 19 11:04:27 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Apr 30 23:25:03 2018 +0200

----------------------------------------------------------------------
 .../src/main/java/org/apache/flink/yarn/YarnResourceManager.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/32209099/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 0ea66b8..a25e4c4 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -286,7 +286,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 		// Priority for worker containers - priorities are intra-application
 		//TODO: set priority according to the resource allocated
 		Priority priority = Priority.newInstance(generatePriority(resourceProfile));
-		int mem = resourceProfile.getMemoryInMB() < 0 ? defaultTaskManagerMemoryMB : (int) resourceProfile.getMemoryInMB();
+		int mem = resourceProfile.getMemoryInMB() < 0 ? defaultTaskManagerMemoryMB : resourceProfile.getMemoryInMB();
 		int vcore = resourceProfile.getCpuCores() < 1 ? defaultCpus : (int) resourceProfile.getCpuCores();
 		Resource capability = Resource.newInstance(mem, vcore);
 		requestYarnContainer(capability, priority);


[06/16] flink git commit: [hotfix] Fix raw types warning.

Posted by se...@apache.org.
[hotfix] Fix raw types warning.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0f34a851
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0f34a851
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0f34a851

Branch: refs/heads/release-1.5
Commit: 0f34a851daa4e479a3faa1dfd97e28eff52b89ab
Parents: 3220909
Author: gyao <ga...@data-artisans.com>
Authored: Thu Apr 19 11:05:05 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Apr 30 23:25:06 2018 +0200

----------------------------------------------------------------------
 .../src/main/java/org/apache/flink/yarn/YarnResourceManager.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0f34a851/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index a25e4c4..220f4f2 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -435,7 +435,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 		String[] hostPort = address.split("@")[1].split(":");
 		String host = hostPort[0];
 		String port = hostPort[1].split("/")[0];
-		return new Tuple2(host, Integer.valueOf(port));
+		return new Tuple2<>(host, Integer.valueOf(port));
 	}
 
 	private void requestYarnContainer(Resource resource, Priority priority) {


[13/16] flink git commit: [FLINK-9274] [kafka] Add thread name for partition discovery

Posted by se...@apache.org.
[FLINK-9274] [kafka] Add thread name for partition discovery

This closes #5942


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/514cf81d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/514cf81d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/514cf81d

Branch: refs/heads/release-1.5
Commit: 514cf81d715a0789426c46e1cd05c4f0ebb762bb
Parents: a10f479
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Mon Apr 30 14:22:40 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Apr 30 23:25:30 2018 +0200

----------------------------------------------------------------------
 .../flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/514cf81d/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 82ac2c3..cfb5b6d 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -715,7 +715,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 						}
 					}
 				}
-			});
+			}, "Kafka Partition Discovery for " + getRuntimeContext().getTaskNameWithSubtasks());
 
 			discoveryLoopThread.start();
 			kafkaFetcher.runFetchLoop();


[10/16] flink git commit: [FLINK-9196] [flip6, yarn] Cleanup application files when deregistering YARN AM

Posted by se...@apache.org.
[FLINK-9196] [flip6, yarn] Cleanup application files when deregistering YARN AM

Enable graceful cluster shut down via HTTP.
Remove Flink application files from remote file system when the
YarnResourceManager deregisters the YARN ApplicationMaster.

This closes #5938


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d130f878
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d130f878
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d130f878

Branch: refs/heads/release-1.5
Commit: d130f87822818ba6f5c9dbd77ab3a774fdc41e60
Parents: a8425e5
Author: gyao <ga...@data-artisans.com>
Authored: Thu Apr 19 12:07:54 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Apr 30 23:25:19 2018 +0200

----------------------------------------------------------------------
 .../apache/flink/client/cli/CliFrontend.java    |  4 +-
 .../client/deployment/ClusterDescriptor.java    |  2 +-
 .../LegacyStandaloneClusterDescriptor.java      |  2 +-
 .../deployment/StandaloneClusterDescriptor.java |  2 +-
 .../flink/client/program/ClusterClient.java     |  3 +
 .../client/program/rest/RestClusterClient.java  | 16 +++++
 .../client/cli/util/DummyClusterDescriptor.java |  2 +-
 .../flink/runtime/dispatcher/Dispatcher.java    |  6 ++
 .../rest/handler/cluster/ShutdownHandler.java   | 58 ++++++++++++++++
 .../rest/messages/cluster/ShutdownHeaders.java  | 69 ++++++++++++++++++++
 .../runtime/webmonitor/RestfulGateway.java      |  4 ++
 .../runtime/webmonitor/WebMonitorEndpoint.java  | 11 ++++
 .../flink/yarn/YarnConfigurationITCase.java     |  2 +-
 .../util/NonDeployingYarnClusterDescriptor.java |  2 +-
 .../yarn/AbstractYarnClusterDescriptor.java     | 19 +++++-
 .../main/java/org/apache/flink/yarn/Utils.java  | 24 +++++++
 .../apache/flink/yarn/YarnClusterClient.java    | 15 +++++
 .../apache/flink/yarn/YarnResourceManager.java  |  2 +
 .../flink/yarn/cli/FlinkYarnSessionCli.java     | 10 +--
 .../org/apache/flink/yarn/YarnJobManager.scala  |  4 +-
 .../java/org/apache/flink/yarn/UtilsTest.java   | 52 +++++++++++++++
 .../flink/yarn/YarnResourceManagerTest.java     | 26 +++++++-
 22 files changed, 312 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 7745ca0..95a9949 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -277,8 +277,8 @@ public class CliFrontend {
 					if (clusterId == null && !client.isDetached()) {
 						// terminate the cluster only if we have started it before and if it's not detached
 						try {
-							clusterDescriptor.terminateCluster(client.getClusterId());
-						} catch (FlinkException e) {
+							client.shutDownCluster();
+						} catch (final Exception e) {
 							LOG.info("Could not properly terminate the Flink cluster.", e);
 						}
 					}

http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
index f9f5d4b..e6b3922 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
@@ -72,5 +72,5 @@ public interface ClusterDescriptor<T> extends AutoCloseable {
 	 * @param clusterId identifying the cluster to shut down
 	 * @throws FlinkException if the cluster could not be terminated
 	 */
-	void terminateCluster(T clusterId) throws FlinkException;
+	void killCluster(T clusterId) throws FlinkException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-clients/src/main/java/org/apache/flink/client/deployment/LegacyStandaloneClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/LegacyStandaloneClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/LegacyStandaloneClusterDescriptor.java
index 21e020c..b448970 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/LegacyStandaloneClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/LegacyStandaloneClusterDescriptor.java
@@ -65,7 +65,7 @@ public class LegacyStandaloneClusterDescriptor implements ClusterDescriptor<Stan
 	}
 
 	@Override
-	public void terminateCluster(StandaloneClusterId clusterId) throws FlinkException {
+	public void killCluster(StandaloneClusterId clusterId) throws FlinkException {
 		throw new UnsupportedOperationException("Cannot terminate standalone clusters.");
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
index c4bcde6..bdf8faf 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
@@ -66,7 +66,7 @@ public class StandaloneClusterDescriptor implements ClusterDescriptor<Standalone
 	}
 
 	@Override
-	public void terminateCluster(StandaloneClusterId clusterId) throws FlinkException {
+	public void killCluster(StandaloneClusterId clusterId) throws FlinkException {
 		throw new UnsupportedOperationException("Cannot terminate a standalone cluster.");
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 2ef0b2e..ac779a7 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -1036,4 +1036,7 @@ public abstract class ClusterClient<T> {
 		throw new UnsupportedOperationException("The " + getClass().getSimpleName() + " does not support rescaling.");
 	}
 
+	public void shutDownCluster() {
+		throw new UnsupportedOperationException("The " + getClass().getSimpleName() + " does not support shutDownCluster.");
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 3d50e93..5c04be7 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -67,6 +67,7 @@ import org.apache.flink.runtime.rest.messages.RequestBody;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
 import org.apache.flink.runtime.rest.messages.TriggerId;
+import org.apache.flink.runtime.rest.messages.cluster.ShutdownHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
 import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders;
@@ -570,6 +571,21 @@ public class RestClusterClient<T> extends ClusterClient<T> implements NewCluster
 			});
 	}
 
+	@Override
+	public void shutDownCluster() {
+		try {
+			sendRetryableRequest(
+				ShutdownHeaders.getInstance(),
+				EmptyMessageParameters.getInstance(),
+				EmptyRequestBody.getInstance(),
+				isConnectionProblemException()).get();
+		} catch (InterruptedException e) {
+			Thread.currentThread().interrupt();
+		} catch (ExecutionException e) {
+			log.error("Error while shutting down cluster", e);
+		}
+	}
+
 	/**
 	 * Creates a {@code CompletableFuture} that polls a {@code AsynchronouslyCreatedResource} until
 	 * its {@link AsynchronouslyCreatedResource#queueStatus() QueueStatus} becomes

http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java
index df2f3f7..7620ae2 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java
@@ -60,7 +60,7 @@ public class DummyClusterDescriptor<T> implements ClusterDescriptor<T> {
 	}
 
 	@Override
-	public void terminateCluster(T clusterId) throws FlinkException {
+	public void killCluster(T clusterId) throws FlinkException {
 		throw new UnsupportedOperationException("Cannot terminate a dummy cluster.");
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 41efaab..c05255b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -518,6 +518,12 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 				jobMasterGateway.triggerSavepoint(targetDirectory, cancelJob, timeout));
 	}
 
+	@Override
+	public CompletableFuture<Acknowledge> shutDownCluster() {
+		shutDown();
+		return CompletableFuture.completedFuture(Acknowledge.get());
+	}
+
 	/**
 	 * Cleans up the job related data from the dispatcher. If cleanupHA is true, then
 	 * the data will also be removed from HA.

http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/ShutdownHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/ShutdownHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/ShutdownHandler.java
new file mode 100644
index 0000000..73fca58
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/ShutdownHandler.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.cluster;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * REST handler which allows to shut down the cluster.
+ */
+public class ShutdownHandler extends
+		AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+
+	public ShutdownHandler(
+			final CompletableFuture<String> localRestAddress,
+			final GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+			final Time timeout,
+			final Map<String, String> responseHeaders,
+			final MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> messageHeaders) {
+		super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders);
+	}
+
+	@Override
+	protected CompletableFuture<EmptyResponseBody> handleRequest(
+			@Nonnull final HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request,
+			@Nonnull final RestfulGateway gateway) throws RestHandlerException {
+		return gateway.shutDownCluster().thenApply(ignored -> EmptyResponseBody.getInstance());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/cluster/ShutdownHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/cluster/ShutdownHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/cluster/ShutdownHeaders.java
new file mode 100644
index 0000000..75a1e99
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/cluster/ShutdownHeaders.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.cluster;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for {@link org.apache.flink.runtime.rest.handler.cluster.ShutdownHandler}.
+ */
+public class ShutdownHeaders implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+
+	private static final ShutdownHeaders INSTANCE = new ShutdownHeaders();
+
+	@Override
+	public Class<EmptyResponseBody> getResponseClass() {
+		return EmptyResponseBody.class;
+	}
+
+	@Override
+	public HttpResponseStatus getResponseStatusCode() {
+		return HttpResponseStatus.OK;
+	}
+
+	@Override
+	public Class<EmptyRequestBody> getRequestClass() {
+		return EmptyRequestBody.class;
+	}
+
+	@Override
+	public EmptyMessageParameters getUnresolvedMessageParameters() {
+		return EmptyMessageParameters.getInstance();
+	}
+
+	@Override
+	public HttpMethodWrapper getHttpMethod() {
+		return HttpMethodWrapper.DELETE;
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return "/cluster";
+	}
+
+	public static ShutdownHeaders getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
index 6bb088c..6a6c34b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
@@ -200,4 +200,8 @@ public interface RestfulGateway extends RpcGateway {
 			@RpcTimeout Time timeout) {
 		throw new UnsupportedOperationException();
 	}
+
+	default CompletableFuture<Acknowledge> shutDownCluster() {
+		throw new UnsupportedOperationException();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 1a67d92..ef6721b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.cluster.ClusterConfigHandler;
 import org.apache.flink.runtime.rest.handler.cluster.ClusterOverviewHandler;
 import org.apache.flink.runtime.rest.handler.cluster.DashboardConfigHandler;
+import org.apache.flink.runtime.rest.handler.cluster.ShutdownHandler;
 import org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler;
 import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
 import org.apache.flink.runtime.rest.handler.job.JobDetailsHandler;
@@ -104,6 +105,7 @@ import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeader
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
 import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatisticsHeaders;
+import org.apache.flink.runtime.rest.messages.cluster.ShutdownHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders;
 import org.apache.flink.runtime.rest.messages.job.SubtaskCurrentAttemptDetailsHeaders;
@@ -558,6 +560,13 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 			timeout,
 			responseHeaders);
 
+		final ShutdownHandler shutdownHandler = new ShutdownHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			ShutdownHeaders.getInstance());
+
 		final File webUiDir = restConfiguration.getWebUiDir();
 
 		Optional<StaticFileServerHandler<T>> optWebContent;
@@ -619,6 +628,8 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 		handlers.add(Tuple2.of(YarnCancelJobTerminationHeaders.getInstance(), jobCancelTerminationHandler));
 		handlers.add(Tuple2.of(YarnStopJobTerminationHeaders.getInstance(), jobStopTerminationHandler));
 
+		handlers.add(Tuple2.of(ShutdownHeaders.getInstance(), shutdownHandler));
+
 		optWebContent.ifPresent(
 			webContent -> {
 				handlers.add(Tuple2.of(WebContentHandlerSpecification.getInstance(), webContent));

http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
index 3a2f957..a5f7c10 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
@@ -184,7 +184,7 @@ public class YarnConfigurationITCase extends YarnTestBase {
 				clusterClient.shutdown();
 			}
 
-			clusterDescriptor.terminateCluster(clusterId);
+			clusterDescriptor.killCluster(clusterId);
 
 		} finally {
 			clusterDescriptor.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/NonDeployingYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/NonDeployingYarnClusterDescriptor.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/NonDeployingYarnClusterDescriptor.java
index 4916b73..de0e1d7 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/NonDeployingYarnClusterDescriptor.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/NonDeployingYarnClusterDescriptor.java
@@ -83,7 +83,7 @@ public class NonDeployingYarnClusterDescriptor extends AbstractYarnClusterDescri
 	}
 
 	@Override
-	public void terminateCluster(ApplicationId clusterId) {
+	public void killCluster(ApplicationId clusterId) {
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index aec5fdb..1948d0f 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -412,9 +412,12 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	}
 
 	@Override
-	public void terminateCluster(ApplicationId applicationId) throws FlinkException {
+	public void killCluster(ApplicationId applicationId) throws FlinkException {
 		try {
 			yarnClient.killApplication(applicationId);
+			Utils.deleteApplicationFiles(Collections.singletonMap(
+				YarnConfigKeys.FLINK_YARN_FILES,
+				getYarnFilesDir(applicationId).toUri().toString()));
 		} catch (YarnException | IOException e) {
 			throw new FlinkException("Could not kill the Yarn Flink cluster with id " + applicationId + '.', e);
 		}
@@ -897,8 +900,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			}
 		}
 
-		Path yarnFilesDir = new Path(homeDir, ".flink/" + appId + '/');
-
+		final Path yarnFilesDir = getYarnFilesDir(appId);
 		FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
 		fs.setPermission(yarnFilesDir, permission); // set permission for path.
 
@@ -1086,6 +1088,17 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	}
 
 	/**
+	 * Returns the Path where the YARN application files should be uploaded to.
+	 *
+	 * @param appId YARN application id
+	 */
+	private Path getYarnFilesDir(final ApplicationId appId) throws IOException {
+		final FileSystem fileSystem = FileSystem.get(yarnConfiguration);
+		final Path homeDir = fileSystem.getHomeDirectory();
+		return new Path(homeDir, ".flink/" + appId + '/');
+	}
+
+	/**
 	 * Uploads and registers a single resource and adds it to <tt>localResources</tt>.
 	 *
 	 * @param key

http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index fb9a478..20e02e1 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.runtime.util.HadoopUtils;
 import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.StringUtils;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -173,6 +174,29 @@ public final class Utils {
 	}
 
 	/**
+	 * Deletes the YARN application files, e.g., Flink binaries, libraries, etc., from the remote
+	 * filesystem.
+	 *
+	 * @param env The environment variables.
+	 */
+	public static void deleteApplicationFiles(final Map<String, String> env) {
+		final String applicationFilesDir = env.get(YarnConfigKeys.FLINK_YARN_FILES);
+		if (!StringUtils.isNullOrWhitespaceOnly(applicationFilesDir)) {
+			final org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(applicationFilesDir);
+			try {
+				final org.apache.flink.core.fs.FileSystem fileSystem = path.getFileSystem();
+				if (!fileSystem.delete(path, true)) {
+					LOG.error("Deleting yarn application files under {} was unsuccessful.", applicationFilesDir);
+				}
+			} catch (final IOException e) {
+				LOG.error("Could not properly delete yarn application files directory {}.", applicationFilesDir, e);
+			}
+		} else {
+			LOG.debug("No yarn application files directory set. Therefore, cannot clean up the data.");
+		}
+	}
+
+	/**
 	 * Creates a YARN resource for the remote object at the given location.
 	 *
 	 * @param remoteRsrcPath	remote location of the resource

http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index 3edd6e4..0d7546e 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -24,6 +24,7 @@ import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatus;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
 import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
@@ -263,6 +264,20 @@ public class YarnClusterClient extends ClusterClient<ApplicationId> {
 		}
 	}
 
+	@Override
+	public void shutDownCluster() {
+		LOG.info("Sending shutdown request to the Application Master");
+		try {
+			final Future<Object> response = Patterns.ask(applicationClient.get(),
+				new YarnMessages.LocalStopYarnSession(ApplicationStatus.SUCCEEDED,
+					"Flink YARN Client requested shutdown"),
+				new Timeout(akkaDuration));
+			Await.ready(response, akkaDuration);
+		} catch (final Exception e) {
+			LOG.warn("Error while stopping YARN cluster.", e);
+		}
+	}
+
 	public ApplicationId getApplicationId() {
 		return appId;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 220f4f2..22d0164 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -279,6 +279,8 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 		} catch (Throwable t) {
 			log.error("Could not unregister the application master.", t);
 		}
+
+		Utils.deleteApplicationFiles(env);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 7596d68..698d48d 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -611,7 +611,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 						}
 
 						try {
-							yarnClusterDescriptor.terminateCluster(yarnApplicationId);
+							yarnClusterDescriptor.killCluster(yarnApplicationId);
 						} catch (FlinkException fe) {
 							LOG.info("Could not properly terminate the Flink cluster.", fe);
 						}
@@ -644,18 +644,14 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 							LOG.info("Could not properly close the Yarn application status monitor.", e);
 						}
 
+						clusterClient.shutDownCluster();
+
 						try {
 							clusterClient.shutdown();
 						} catch (Exception e) {
 							LOG.info("Could not properly shutdown cluster client.", e);
 						}
 
-						try {
-							yarnClusterDescriptor.terminateCluster(yarnApplicationId);
-						} catch (FlinkException e) {
-							LOG.info("Could not properly terminate the Flink cluster.", e);
-						}
-
 						// shut down the scheduled executor service
 						ExecutorUtils.gracefulShutdown(
 							1000L,

http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index 6b439bd..7d17325 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -109,8 +109,6 @@ class YarnJobManager(
 
   private def handleYarnShutdown: Receive = {
     case msg: StopCluster =>
-      super.handleMessage(msg)
-
       // do global cleanup if the yarn files path has been set
       yarnFilesPath match {
         case Some(filePath) =>
@@ -135,5 +133,7 @@ class YarnJobManager(
           log.debug("No yarn application files directory set. Therefore, cannot clean up " +
             "the data.")
       }
+
+      super.handleMessage(msg)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
new file mode 100644
index 0000000..5fc3567
--- /dev/null
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link Utils}.
+ */
+public class UtilsTest {
+
+	@Rule
+	public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	@Test
+	public void testDeleteApplicationFiles() throws Exception {
+		final Path applicationFilesDir = temporaryFolder.newFolder(".flink").toPath();
+		Files.createFile(applicationFilesDir.resolve("flink.jar"));
+		assertThat(Files.list(temporaryFolder.getRoot().toPath()).count(), equalTo(1L));
+		assertThat(Files.list(applicationFilesDir).count(), equalTo(1L));
+
+		Utils.deleteApplicationFiles(Collections.singletonMap(
+			YarnConfigKeys.FLINK_YARN_FILES,
+			applicationFilesDir.toString()));
+		assertThat(Files.list(temporaryFolder.getRoot().toPath()).count(), equalTo(0L));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index 0d37b8e..03f3ef2 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -76,6 +77,7 @@ import org.junit.rules.TemporaryFolder;
 import javax.annotation.Nullable;
 
 import java.io.File;
+import java.nio.file.Files;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -90,7 +92,9 @@ import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
 import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
 import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
 import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH;
+import static org.apache.flink.yarn.YarnConfigKeys.FLINK_YARN_FILES;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
@@ -105,9 +109,9 @@ public class YarnResourceManagerTest extends TestLogger {
 
 	private static final Time TIMEOUT = Time.seconds(10L);
 
-	private static Configuration flinkConfig = new Configuration();
+	private Configuration flinkConfig = new Configuration();
 
-	private static Map<String, String> env = new HashMap<>();
+	private Map<String, String> env = new HashMap<>();
 
 	@Rule
 	public TemporaryFolder folder = new TemporaryFolder();
@@ -200,7 +204,7 @@ public class YarnResourceManagerTest extends TestLogger {
 		}
 	}
 
-	static class Context {
+	class Context {
 
 		// services
 		final TestingRpcService rpcService;
@@ -388,4 +392,20 @@ public class YarnResourceManagerTest extends TestLogger {
 			assertTrue(resourceManager.getNumberOfRegisteredTaskManagers().get() == 0);
 		}};
 	}
+
+	/**
+	 * Tests that application files are deleted when the YARN application master is de-registered.
+	 */
+	@Test
+	public void testDeleteApplicationFiles() throws Exception {
+		new Context() {{
+			final File applicationDir = folder.newFolder(".flink");
+			env.put(FLINK_YARN_FILES, applicationDir.getCanonicalPath());
+
+			startResourceManager();
+
+			resourceManager.deregisterApplication(ApplicationStatus.SUCCEEDED, null);
+			assertFalse("YARN application directory was not removed", Files.exists(applicationDir.toPath()));
+		}};
+	}
 }


[09/16] flink git commit: [hotfix] [tests] Rename UtilsTest to YarnFlinkResourceManagerTest.

Posted by se...@apache.org.
[hotfix] [tests] Rename UtilsTest to YarnFlinkResourceManagerTest.

Test was misnamed.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a8425e5b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a8425e5b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a8425e5b

Branch: refs/heads/release-1.5
Commit: a8425e5ba6d36f6a8408e47da7f71cf7a0c6eb73
Parents: 4409ba2
Author: gyao <ga...@data-artisans.com>
Authored: Thu Apr 19 11:18:32 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Apr 30 23:25:15 2018 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/yarn/UtilsTest.java   | 244 -------------------
 .../yarn/YarnFlinkResourceManagerTest.java      | 244 +++++++++++++++++++
 2 files changed, 244 insertions(+), 244 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a8425e5b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
deleted file mode 100644
index b7a38b0..0000000
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
-import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
-import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
-import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.TestLogger;
-import org.apache.flink.yarn.messages.NotifyWhenResourcesRegistered;
-import org.apache.flink.yarn.messages.RequestNumberOfRegisteredResources;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.testkit.JavaTestKit;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.NMClient;
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-import scala.Option;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for {@link Utils}.
- */
-public class UtilsTest extends TestLogger {
-
-	private static ActorSystem system;
-
-	@BeforeClass
-	public static void setup() {
-		system = AkkaUtils.createLocalActorSystem(new Configuration());
-	}
-
-	@AfterClass
-	public static void teardown() {
-		JavaTestKit.shutdownActorSystem(system);
-	}
-
-	@Test
-	public void testYarnFlinkResourceManagerJobManagerLostLeadership() throws Exception {
-		new JavaTestKit(system) {{
-
-			final Deadline deadline = new FiniteDuration(3, TimeUnit.MINUTES).fromNow();
-
-			Configuration flinkConfig = new Configuration();
-			YarnConfiguration yarnConfig = new YarnConfiguration();
-			SettableLeaderRetrievalService leaderRetrievalService = new SettableLeaderRetrievalService(
-				null,
-				null);
-			String applicationMasterHostName = "localhost";
-			String webInterfaceURL = "foobar";
-			ContaineredTaskManagerParameters taskManagerParameters = new ContaineredTaskManagerParameters(
-				1L, 1L, 1L, 1, new HashMap<String, String>());
-			ContainerLaunchContext taskManagerLaunchContext = mock(ContainerLaunchContext.class);
-			int yarnHeartbeatIntervalMillis = 1000;
-			int maxFailedContainers = 10;
-			int numInitialTaskManagers = 5;
-			final YarnResourceManagerCallbackHandler callbackHandler = new YarnResourceManagerCallbackHandler();
-			AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient = mock(AMRMClientAsync.class);
-			NMClient nodeManagerClient = mock(NMClient.class);
-			UUID leaderSessionID = UUID.randomUUID();
-
-			final List<Container> containerList = new ArrayList<>();
-
-			for (int i = 0; i < numInitialTaskManagers; i++) {
-				Container mockContainer = mock(Container.class);
-				when(mockContainer.getId()).thenReturn(
-					ContainerId.newInstance(
-						ApplicationAttemptId.newInstance(
-							ApplicationId.newInstance(System.currentTimeMillis(), 1),
-							1),
-						i));
-				when(mockContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 1234));
-				containerList.add(mockContainer);
-			}
-
-			doAnswer(new Answer() {
-				int counter = 0;
-				@Override
-				public Object answer(InvocationOnMock invocation) throws Throwable {
-					if (counter < containerList.size()) {
-						callbackHandler.onContainersAllocated(
-							Collections.singletonList(
-								containerList.get(counter++)
-							));
-					}
-					return null;
-				}
-			}).when(resourceManagerClient).addContainerRequest(Matchers.any(AMRMClient.ContainerRequest.class));
-
-			final CompletableFuture<AkkaActorGateway> resourceManagerFuture = new CompletableFuture<>();
-			final CompletableFuture<AkkaActorGateway> leaderGatewayFuture = new CompletableFuture<>();
-
-			doAnswer(
-				(InvocationOnMock invocation) -> {
-					Container container = (Container) invocation.getArguments()[0];
-					resourceManagerFuture.thenCombine(leaderGatewayFuture,
-						(resourceManagerGateway, leaderGateway) -> {
-						resourceManagerGateway.tell(
-							new NotifyResourceStarted(YarnFlinkResourceManager.extractResourceID(container)),
-							leaderGateway);
-						return null;
-						});
-					return null;
-				})
-				.when(nodeManagerClient)
-				.startContainer(
-					Matchers.any(Container.class),
-					Matchers.any(ContainerLaunchContext.class));
-
-			ActorRef resourceManager = null;
-			ActorRef leader1;
-
-			try {
-				leader1 = system.actorOf(
-					Props.create(
-						TestingUtils.ForwardingActor.class,
-						getRef(),
-						Option.apply(leaderSessionID)
-					));
-
-				resourceManager = system.actorOf(
-					Props.create(
-						TestingYarnFlinkResourceManager.class,
-						flinkConfig,
-						yarnConfig,
-						leaderRetrievalService,
-						applicationMasterHostName,
-						webInterfaceURL,
-						taskManagerParameters,
-						taskManagerLaunchContext,
-						yarnHeartbeatIntervalMillis,
-						maxFailedContainers,
-						numInitialTaskManagers,
-						callbackHandler,
-						resourceManagerClient,
-						nodeManagerClient
-					));
-
-				leaderRetrievalService.notifyListener(leader1.path().toString(), leaderSessionID);
-
-				final AkkaActorGateway leader1Gateway = new AkkaActorGateway(leader1, leaderSessionID);
-				final AkkaActorGateway resourceManagerGateway = new AkkaActorGateway(resourceManager, leaderSessionID);
-
-				leaderGatewayFuture.complete(leader1Gateway);
-				resourceManagerFuture.complete(resourceManagerGateway);
-
-				expectMsgClass(deadline.timeLeft(), RegisterResourceManager.class);
-
-				resourceManagerGateway.tell(new RegisterResourceManagerSuccessful(leader1, Collections.emptyList()));
-
-				for (int i = 0; i < containerList.size(); i++) {
-					expectMsgClass(deadline.timeLeft(), Acknowledge.class);
-				}
-
-				Future<Object> taskManagerRegisteredFuture = resourceManagerGateway.ask(new NotifyWhenResourcesRegistered(numInitialTaskManagers), deadline.timeLeft());
-
-				Await.ready(taskManagerRegisteredFuture, deadline.timeLeft());
-
-				leaderRetrievalService.notifyListener(null, null);
-
-				leaderRetrievalService.notifyListener(leader1.path().toString(), leaderSessionID);
-
-				expectMsgClass(deadline.timeLeft(), RegisterResourceManager.class);
-
-				resourceManagerGateway.tell(new RegisterResourceManagerSuccessful(leader1, Collections.emptyList()));
-
-				for (Container container: containerList) {
-					resourceManagerGateway.tell(
-						new NotifyResourceStarted(YarnFlinkResourceManager.extractResourceID(container)),
-						leader1Gateway);
-				}
-
-				for (int i = 0; i < containerList.size(); i++) {
-					expectMsgClass(deadline.timeLeft(), Acknowledge.class);
-				}
-
-				Future<Object> numberOfRegisteredResourcesFuture = resourceManagerGateway.ask(RequestNumberOfRegisteredResources.INSTANCE, deadline.timeLeft());
-
-				int numberOfRegisteredResources = (Integer) Await.result(numberOfRegisteredResourcesFuture, deadline.timeLeft());
-
-				assertEquals(numInitialTaskManagers, numberOfRegisteredResources);
-			} finally {
-				if (resourceManager != null) {
-					resourceManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
-				}
-			}
-		}};
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a8425e5b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
new file mode 100644
index 0000000..10b2ce9
--- /dev/null
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
+import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
+import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.yarn.messages.NotifyWhenResourcesRegistered;
+import org.apache.flink.yarn.messages.RequestNumberOfRegisteredResources;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link YarnFlinkResourceManager}.
+ */
+public class YarnFlinkResourceManagerTest extends TestLogger {
+
+	private static ActorSystem system;
+
+	@BeforeClass
+	public static void setup() {
+		system = AkkaUtils.createLocalActorSystem(new Configuration());
+	}
+
+	@AfterClass
+	public static void teardown() {
+		JavaTestKit.shutdownActorSystem(system);
+	}
+
+	@Test
+	public void testYarnFlinkResourceManagerJobManagerLostLeadership() throws Exception {
+		new JavaTestKit(system) {{
+
+			final Deadline deadline = new FiniteDuration(3, TimeUnit.MINUTES).fromNow();
+
+			Configuration flinkConfig = new Configuration();
+			YarnConfiguration yarnConfig = new YarnConfiguration();
+			SettableLeaderRetrievalService leaderRetrievalService = new SettableLeaderRetrievalService(
+				null,
+				null);
+			String applicationMasterHostName = "localhost";
+			String webInterfaceURL = "foobar";
+			ContaineredTaskManagerParameters taskManagerParameters = new ContaineredTaskManagerParameters(
+				1L, 1L, 1L, 1, new HashMap<String, String>());
+			ContainerLaunchContext taskManagerLaunchContext = mock(ContainerLaunchContext.class);
+			int yarnHeartbeatIntervalMillis = 1000;
+			int maxFailedContainers = 10;
+			int numInitialTaskManagers = 5;
+			final YarnResourceManagerCallbackHandler callbackHandler = new YarnResourceManagerCallbackHandler();
+			AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient = mock(AMRMClientAsync.class);
+			NMClient nodeManagerClient = mock(NMClient.class);
+			UUID leaderSessionID = UUID.randomUUID();
+
+			final List<Container> containerList = new ArrayList<>();
+
+			for (int i = 0; i < numInitialTaskManagers; i++) {
+				Container mockContainer = mock(Container.class);
+				when(mockContainer.getId()).thenReturn(
+					ContainerId.newInstance(
+						ApplicationAttemptId.newInstance(
+							ApplicationId.newInstance(System.currentTimeMillis(), 1),
+							1),
+						i));
+				when(mockContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 1234));
+				containerList.add(mockContainer);
+			}
+
+			doAnswer(new Answer() {
+				int counter = 0;
+				@Override
+				public Object answer(InvocationOnMock invocation) throws Throwable {
+					if (counter < containerList.size()) {
+						callbackHandler.onContainersAllocated(
+							Collections.singletonList(
+								containerList.get(counter++)
+							));
+					}
+					return null;
+				}
+			}).when(resourceManagerClient).addContainerRequest(Matchers.any(AMRMClient.ContainerRequest.class));
+
+			final CompletableFuture<AkkaActorGateway> resourceManagerFuture = new CompletableFuture<>();
+			final CompletableFuture<AkkaActorGateway> leaderGatewayFuture = new CompletableFuture<>();
+
+			doAnswer(
+				(InvocationOnMock invocation) -> {
+					Container container = (Container) invocation.getArguments()[0];
+					resourceManagerFuture.thenCombine(leaderGatewayFuture,
+						(resourceManagerGateway, leaderGateway) -> {
+						resourceManagerGateway.tell(
+							new NotifyResourceStarted(YarnFlinkResourceManager.extractResourceID(container)),
+							leaderGateway);
+						return null;
+						});
+					return null;
+				})
+				.when(nodeManagerClient)
+				.startContainer(
+					Matchers.any(Container.class),
+					Matchers.any(ContainerLaunchContext.class));
+
+			ActorRef resourceManager = null;
+			ActorRef leader1;
+
+			try {
+				leader1 = system.actorOf(
+					Props.create(
+						TestingUtils.ForwardingActor.class,
+						getRef(),
+						Option.apply(leaderSessionID)
+					));
+
+				resourceManager = system.actorOf(
+					Props.create(
+						TestingYarnFlinkResourceManager.class,
+						flinkConfig,
+						yarnConfig,
+						leaderRetrievalService,
+						applicationMasterHostName,
+						webInterfaceURL,
+						taskManagerParameters,
+						taskManagerLaunchContext,
+						yarnHeartbeatIntervalMillis,
+						maxFailedContainers,
+						numInitialTaskManagers,
+						callbackHandler,
+						resourceManagerClient,
+						nodeManagerClient
+					));
+
+				leaderRetrievalService.notifyListener(leader1.path().toString(), leaderSessionID);
+
+				final AkkaActorGateway leader1Gateway = new AkkaActorGateway(leader1, leaderSessionID);
+				final AkkaActorGateway resourceManagerGateway = new AkkaActorGateway(resourceManager, leaderSessionID);
+
+				leaderGatewayFuture.complete(leader1Gateway);
+				resourceManagerFuture.complete(resourceManagerGateway);
+
+				expectMsgClass(deadline.timeLeft(), RegisterResourceManager.class);
+
+				resourceManagerGateway.tell(new RegisterResourceManagerSuccessful(leader1, Collections.emptyList()));
+
+				for (int i = 0; i < containerList.size(); i++) {
+					expectMsgClass(deadline.timeLeft(), Acknowledge.class);
+				}
+
+				Future<Object> taskManagerRegisteredFuture = resourceManagerGateway.ask(new NotifyWhenResourcesRegistered(numInitialTaskManagers), deadline.timeLeft());
+
+				Await.ready(taskManagerRegisteredFuture, deadline.timeLeft());
+
+				leaderRetrievalService.notifyListener(null, null);
+
+				leaderRetrievalService.notifyListener(leader1.path().toString(), leaderSessionID);
+
+				expectMsgClass(deadline.timeLeft(), RegisterResourceManager.class);
+
+				resourceManagerGateway.tell(new RegisterResourceManagerSuccessful(leader1, Collections.emptyList()));
+
+				for (Container container: containerList) {
+					resourceManagerGateway.tell(
+						new NotifyResourceStarted(YarnFlinkResourceManager.extractResourceID(container)),
+						leader1Gateway);
+				}
+
+				for (int i = 0; i < containerList.size(); i++) {
+					expectMsgClass(deadline.timeLeft(), Acknowledge.class);
+				}
+
+				Future<Object> numberOfRegisteredResourcesFuture = resourceManagerGateway.ask(RequestNumberOfRegisteredResources.INSTANCE, deadline.timeLeft());
+
+				int numberOfRegisteredResources = (Integer) Await.result(numberOfRegisteredResourcesFuture, deadline.timeLeft());
+
+				assertEquals(numInitialTaskManagers, numberOfRegisteredResources);
+			} finally {
+				if (resourceManager != null) {
+					resourceManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
+				}
+			}
+		}};
+	}
+}


[04/16] flink git commit: [hotfix] Indent method parameters.

Posted by se...@apache.org.
[hotfix] Indent method parameters.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/36ce86a4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/36ce86a4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/36ce86a4

Branch: refs/heads/release-1.5
Commit: 36ce86a49b493999220beff309882b1b6431e18c
Parents: 57c9ca2
Author: gyao <ga...@data-artisans.com>
Authored: Thu Apr 19 11:03:20 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Apr 30 23:25:01 2018 +0200

----------------------------------------------------------------------
 .../src/main/java/org/apache/flink/yarn/YarnResourceManager.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/36ce86a4/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 4eb4fc9..0ea66b8 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -267,8 +267,8 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 
 	@Override
 	protected void internalDeregisterApplication(
-		ApplicationStatus finalStatus,
-		@Nullable String diagnostics) {
+			ApplicationStatus finalStatus,
+			@Nullable String diagnostics) {
 
 		// first, de-register from YARN
 		FinalApplicationStatus yarnStatus = getYarnStatus(finalStatus);


[11/16] flink git commit: [hotfix] [runtime] Fix a debug statement in StreamTask

Posted by se...@apache.org.
[hotfix] [runtime] Fix a debug statement in StreamTask


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/24c33485
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/24c33485
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/24c33485

Branch: refs/heads/release-1.5
Commit: 24c33485079fb0e60fccf8cf507cb4225cafe8bd
Parents: d130f87
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Mon Apr 30 15:02:26 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Apr 30 23:25:24 2018 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/streaming/runtime/tasks/StreamTask.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/24c33485/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 182543d..6812871 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -1191,7 +1191,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		@SuppressWarnings("unchecked")
 		StreamPartitioner<OUT> outputPartitioner = (StreamPartitioner<OUT>) edge.getPartitioner();
 
-		LOG.debug("Using partitioner {} for output {} of task ", outputPartitioner, outputIndex, taskName);
+		LOG.debug("Using partitioner {} for output {} of task {}", outputPartitioner, outputIndex, taskName);
 
 		ResultPartitionWriter bufferWriter = environment.getWriter(outputIndex);
 


[12/16] flink git commit: [FLINK-9275] [streaming] Add taskName to the output flusher thread's name

Posted by se...@apache.org.
[FLINK-9275] [streaming] Add taskName to the output flusher thread's name

This closes #5943


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a10f4791
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a10f4791
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a10f4791

Branch: refs/heads/release-1.5
Commit: a10f4791b1831e782fc26fe2d5c70336cfcd808c
Parents: 24c3348
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Mon Apr 30 15:02:43 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Apr 30 23:25:27 2018 +0200

----------------------------------------------------------------------
 .../org/apache/flink/streaming/runtime/io/StreamRecordWriter.java | 3 ++-
 .../java/org/apache/flink/streaming/runtime/tasks/StreamTask.java | 2 +-
 2 files changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a10f4791/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
index dad680c..9fedf70 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
@@ -67,7 +67,8 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWrit
 		}
 		else {
 			String threadName = taskName == null ?
-				DEFAULT_OUTPUT_FLUSH_THREAD_NAME : "Output Timeout Flusher - " + taskName;
+				DEFAULT_OUTPUT_FLUSH_THREAD_NAME :
+				DEFAULT_OUTPUT_FLUSH_THREAD_NAME + " for " + taskName;
 
 			outputFlusher = new OutputFlusher(threadName, timeout);
 			outputFlusher.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/a10f4791/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 6812871..6790949 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -1204,7 +1204,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		}
 
 		StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> output =
-			new StreamRecordWriter<>(bufferWriter, outputPartitioner, bufferTimeout);
+			new StreamRecordWriter<>(bufferWriter, outputPartitioner, bufferTimeout, taskName);
 		output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
 		return output;
 	}


[15/16] flink git commit: [FLINK-9256] [network] Fix NPE in SingleInputGate#updateInputChannel() for non-credit based flow control

Posted by se...@apache.org.
[FLINK-9256] [network] Fix NPE in SingleInputGate#updateInputChannel() for non-credit based flow control

This closes #5914


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/56e2b0b5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/56e2b0b5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/56e2b0b5

Branch: refs/heads/release-1.5
Commit: 56e2b0b5d600935eae590a985643a5879f224d04
Parents: f1fa517
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Wed Apr 25 18:28:48 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Apr 30 23:25:38 2018 +0200

----------------------------------------------------------------------
 .../runtime/io/network/NetworkEnvironment.java  |   4 +
 .../partition/consumer/SingleInputGate.java     |  20 +-
 .../io/network/NetworkEnvironmentTest.java      |   5 +-
 .../PartitionRequestClientHandlerTest.java      |   3 +-
 .../partition/InputGateConcurrentTest.java      |   9 +-
 .../partition/InputGateFairnessTest.java        |  17 +-
 .../consumer/LocalInputChannelTest.java         |   6 +-
 .../consumer/RemoteInputChannelTest.java        |   3 +-
 .../partition/consumer/SingleInputGateTest.java | 318 +++++++++++++++----
 .../partition/consumer/TestSingleInputGate.java |   3 +-
 .../partition/consumer/UnionInputGateTest.java  |   6 +-
 11 files changed, 301 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/56e2b0b5/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 0a9dc0f..f254756 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -157,6 +157,10 @@ public class NetworkEnvironment {
 		return partitionRequestMaxBackoff;
 	}
 
+	public boolean isCreditBased() {
+		return enableCreditBased;
+	}
+
 	public KvStateRegistry getKvStateRegistry() {
 		return kvStateRegistry;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/56e2b0b5/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index b9091b2..06e80ff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -157,9 +157,11 @@ public class SingleInputGate implements InputGate {
 	 */
 	private BufferPool bufferPool;
 
-	/** Global network buffer pool to request and recycle exclusive buffers. */
+	/** Global network buffer pool to request and recycle exclusive buffers (only for credit-based). */
 	private NetworkBufferPool networkBufferPool;
 
+	private final boolean isCreditBased;
+
 	private boolean hasReceivedAllEndOfPartitionEvents;
 
 	/** Flag indicating whether partitions have been requested. */
@@ -189,7 +191,8 @@ public class SingleInputGate implements InputGate {
 		int consumedSubpartitionIndex,
 		int numberOfInputChannels,
 		TaskActions taskActions,
-		TaskIOMetricGroup metrics) {
+		TaskIOMetricGroup metrics,
+		boolean isCreditBased) {
 
 		this.owningTaskName = checkNotNull(owningTaskName);
 		this.jobId = checkNotNull(jobId);
@@ -208,6 +211,7 @@ public class SingleInputGate implements InputGate {
 		this.enqueuedInputChannelsWithData = new BitSet(numberOfInputChannels);
 
 		this.taskActions = checkNotNull(taskActions);
+		this.isCreditBased = isCreditBased;
 	}
 
 	// ------------------------------------------------------------------------
@@ -288,6 +292,7 @@ public class SingleInputGate implements InputGate {
 	 * @param networkBuffersPerChannel The number of exclusive buffers for each channel
 	 */
 	public void assignExclusiveSegments(NetworkBufferPool networkBufferPool, int networkBuffersPerChannel) throws IOException {
+		checkState(this.isCreditBased, "Bug in input gate setup logic: exclusive buffers only exist with credit-based flow control.");
 		checkState(this.networkBufferPool == null, "Bug in input gate setup logic: global buffer pool has" +
 			"already been set for this input gate.");
 
@@ -347,8 +352,13 @@ public class SingleInputGate implements InputGate {
 				}
 				else if (partitionLocation.isRemote()) {
 					newChannel = unknownChannel.toRemoteInputChannel(partitionLocation.getConnectionId());
-					((RemoteInputChannel)newChannel).assignExclusiveSegments(
-						networkBufferPool.requestMemorySegments(networkBuffersPerChannel));
+
+					if (this.isCreditBased) {
+						checkState(this.networkBufferPool != null, "Bug in input gate setup logic: " +
+							"global buffer pool has not been set for this input gate.");
+						((RemoteInputChannel) newChannel).assignExclusiveSegments(
+							networkBufferPool.requestMemorySegments(networkBuffersPerChannel));
+					}
 				}
 				else {
 					throw new IllegalStateException("Tried to update unknown channel with unknown channel.");
@@ -661,7 +671,7 @@ public class SingleInputGate implements InputGate {
 
 		final SingleInputGate inputGate = new SingleInputGate(
 			owningTaskName, jobId, consumedResultId, consumedPartitionType, consumedSubpartitionIndex,
-			icdd.length, taskActions, metrics);
+			icdd.length, taskActions, metrics, networkEnvironment.isCreditBased());
 
 		// Create the input channels. There is one input channel for each consumed partition.
 		final InputChannel[] inputChannels = new InputChannel[icdd.length];

http://git-wip-us.apache.org/repos/asf/flink/blob/56e2b0b5/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index 317a214..f790b5f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -329,7 +329,7 @@ public class NetworkEnvironmentTest {
 	 *
 	 * @return input gate with some fake settings
 	 */
-	private static SingleInputGate createSingleInputGate(
+	private SingleInputGate createSingleInputGate(
 			final ResultPartitionType partitionType, final int channels) {
 		return spy(new SingleInputGate(
 			"Test Task Name",
@@ -339,7 +339,8 @@ public class NetworkEnvironmentTest {
 			0,
 			channels,
 			mock(TaskActions.class),
-			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()));
+			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
+			enableCreditBasedFlowControl));
 	}
 
 	private static void createRemoteInputChannel(

http://git-wip-us.apache.org/repos/asf/flink/blob/56e2b0b5/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
index 13f7510..842aed8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
@@ -221,7 +221,8 @@ public class PartitionRequestClientHandlerTest {
 			0,
 			1,
 			mock(TaskActions.class),
-			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
+			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
+			true);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/56e2b0b5/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
index 289a398..73f3cfb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
@@ -66,7 +66,8 @@ public class InputGateConcurrentTest {
 				new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
 				0, numChannels,
 				mock(TaskActions.class),
-				UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
+				UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
+				true);
 
 		for (int i = 0; i < numChannels; i++) {
 			LocalInputChannel channel = new LocalInputChannel(gate, i, new ResultPartitionID(),
@@ -102,7 +103,8 @@ public class InputGateConcurrentTest {
 				0,
 				numChannels,
 				mock(TaskActions.class),
-				UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
+				UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
+				true);
 
 		for (int i = 0; i < numChannels; i++) {
 			RemoteInputChannel channel = new RemoteInputChannel(
@@ -151,7 +153,8 @@ public class InputGateConcurrentTest {
 				0,
 				numChannels,
 				mock(TaskActions.class),
-				UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
+				UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
+				true);
 
 		for (int i = 0, local = 0; i < numChannels; i++) {
 			if (localOrRemote.get(i)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/56e2b0b5/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
index 45df56f..82a27cc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
@@ -93,7 +93,8 @@ public class InputGateFairnessTest {
 				new IntermediateDataSetID(),
 				0, numChannels,
 				mock(TaskActions.class),
-				UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
+				UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
+				true);
 
 		for (int i = 0; i < numChannels; i++) {
 			LocalInputChannel channel = new LocalInputChannel(gate, i, new ResultPartitionID(),
@@ -146,7 +147,8 @@ public class InputGateFairnessTest {
 				new IntermediateDataSetID(),
 				0, numChannels,
 				mock(TaskActions.class),
-				UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
+				UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
+				true);
 
 			for (int i = 0; i < numChannels; i++) {
 				LocalInputChannel channel = new LocalInputChannel(gate, i, new ResultPartitionID(),
@@ -196,7 +198,8 @@ public class InputGateFairnessTest {
 				new IntermediateDataSetID(),
 				0, numChannels,
 				mock(TaskActions.class),
-				UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
+				UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
+				true);
 
 		final ConnectionManager connManager = createDummyConnectionManager();
 
@@ -251,7 +254,8 @@ public class InputGateFairnessTest {
 				new IntermediateDataSetID(),
 				0, numChannels,
 				mock(TaskActions.class),
-				UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
+				UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
+				true);
 
 		final ConnectionManager connManager = createDummyConnectionManager();
 
@@ -349,11 +353,12 @@ public class InputGateFairnessTest {
 				int consumedSubpartitionIndex,
 				int numberOfInputChannels,
 				TaskActions taskActions,
-				TaskIOMetricGroup metrics) {
+				TaskIOMetricGroup metrics,
+				boolean isCreditBased) {
 
 			super(owningTaskName, jobId, consumedResultId, ResultPartitionType.PIPELINED,
 				consumedSubpartitionIndex,
-					numberOfInputChannels, taskActions, metrics);
+					numberOfInputChannels, taskActions, metrics, isCreditBased);
 
 			try {
 				Field f = SingleInputGate.class.getDeclaredField("inputChannelsWithData");

http://git-wip-us.apache.org/repos/asf/flink/blob/56e2b0b5/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index c78b7b9..1ecb67f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -293,7 +293,8 @@ public class LocalInputChannelTest {
 			0,
 			1,
 			mock(TaskActions.class),
-			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()
+			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
+			true
 		);
 
 		ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
@@ -490,7 +491,8 @@ public class LocalInputChannelTest {
 					subpartitionIndex,
 					numberOfInputChannels,
 					mock(TaskActions.class),
-					UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
+					UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
+					true);
 
 			// Set buffer pool
 			inputGate.setBufferPool(bufferPool);

http://git-wip-us.apache.org/repos/asf/flink/blob/56e2b0b5/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 97a5688..802cb93 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -889,7 +889,8 @@ public class RemoteInputChannelTest {
 			0,
 			1,
 			mock(TaskActions.class),
-			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
+			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
+			true);
 	}
 
 	private RemoteInputChannel createRemoteInputChannel(SingleInputGate inputGate)

http://git-wip-us.apache.org/repos/asf/flink/blob/56e2b0b5/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 8c54c1f..c244668 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -19,13 +19,13 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.LocalConnectionManager;
@@ -45,31 +45,51 @@ import org.apache.flink.runtime.io.network.util.TestTaskEvent;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.taskmanager.TaskActions;
 
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyListOf;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+/**
+ * Tests for {@link SingleInputGate}.
+ */
+@RunWith(Parameterized.class)
 public class SingleInputGateTest {
 
+	@Parameterized.Parameter
+	public boolean enableCreditBasedFlowControl;
+
+	@Parameterized.Parameters(name = "Credit-based = {0}")
+	public static List<Boolean> parameters() {
+		return Arrays.asList(Boolean.TRUE, Boolean.FALSE);
+	}
+
 	/**
 	 * Tests basic correctness of buffer-or-event interleaving and correct <code>null</code> return
 	 * value after receiving all end-of-partition events.
@@ -324,12 +344,7 @@ public class SingleInputGateTest {
 		int initialBackoff = 137;
 		int maxBackoff = 1001;
 
-		NetworkEnvironment netEnv = mock(NetworkEnvironment.class);
-		when(netEnv.getResultPartitionManager()).thenReturn(new ResultPartitionManager());
-		when(netEnv.getTaskEventDispatcher()).thenReturn(new TaskEventDispatcher());
-		when(netEnv.getPartitionRequestInitialBackoff()).thenReturn(initialBackoff);
-		when(netEnv.getPartitionRequestMaxBackoff()).thenReturn(maxBackoff);
-		when(netEnv.getConnectionManager()).thenReturn(new LocalConnectionManager());
+		final NetworkEnvironment netEnv = createNetworkEnvironment(2, 8, initialBackoff, maxBackoff);
 
 		SingleInputGate gate = SingleInputGate.create(
 			"TestTask",
@@ -340,37 +355,43 @@ public class SingleInputGateTest {
 			mock(TaskActions.class),
 			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
-		assertEquals(gateDesc.getConsumedPartitionType(), gate.getConsumedPartitionType());
+		try {
+			assertEquals(gateDesc.getConsumedPartitionType(), gate.getConsumedPartitionType());
 
-		Map<IntermediateResultPartitionID, InputChannel> channelMap = gate.getInputChannels();
+			Map<IntermediateResultPartitionID, InputChannel> channelMap = gate.getInputChannels();
 
-		assertEquals(3, channelMap.size());
-		InputChannel localChannel = channelMap.get(partitionIds[0].getPartitionId());
-		assertEquals(LocalInputChannel.class, localChannel.getClass());
+			assertEquals(3, channelMap.size());
+			InputChannel localChannel = channelMap.get(partitionIds[0].getPartitionId());
+			assertEquals(LocalInputChannel.class, localChannel.getClass());
 
-		InputChannel remoteChannel = channelMap.get(partitionIds[1].getPartitionId());
-		assertEquals(RemoteInputChannel.class, remoteChannel.getClass());
+			InputChannel remoteChannel = channelMap.get(partitionIds[1].getPartitionId());
+			assertEquals(RemoteInputChannel.class, remoteChannel.getClass());
 
-		InputChannel unknownChannel = channelMap.get(partitionIds[2].getPartitionId());
-		assertEquals(UnknownInputChannel.class, unknownChannel.getClass());
+			InputChannel unknownChannel = channelMap.get(partitionIds[2].getPartitionId());
+			assertEquals(UnknownInputChannel.class, unknownChannel.getClass());
 
-		InputChannel[] channels = new InputChannel[]{localChannel, remoteChannel, unknownChannel};
-		for (InputChannel ch : channels) {
-			assertEquals(0, ch.getCurrentBackoff());
+			InputChannel[] channels =
+				new InputChannel[] {localChannel, remoteChannel, unknownChannel};
+			for (InputChannel ch : channels) {
+				assertEquals(0, ch.getCurrentBackoff());
 
-			assertTrue(ch.increaseBackoff());
-			assertEquals(initialBackoff, ch.getCurrentBackoff());
+				assertTrue(ch.increaseBackoff());
+				assertEquals(initialBackoff, ch.getCurrentBackoff());
 
-			assertTrue(ch.increaseBackoff());
-			assertEquals(initialBackoff * 2, ch.getCurrentBackoff());
+				assertTrue(ch.increaseBackoff());
+				assertEquals(initialBackoff * 2, ch.getCurrentBackoff());
 
-			assertTrue(ch.increaseBackoff());
-			assertEquals(initialBackoff * 2 * 2, ch.getCurrentBackoff());
+				assertTrue(ch.increaseBackoff());
+				assertEquals(initialBackoff * 2 * 2, ch.getCurrentBackoff());
 
-			assertTrue(ch.increaseBackoff());
-			assertEquals(maxBackoff, ch.getCurrentBackoff());
+				assertTrue(ch.increaseBackoff());
+				assertEquals(maxBackoff, ch.getCurrentBackoff());
 
-			assertFalse(ch.increaseBackoff());
+				assertFalse(ch.increaseBackoff());
+			}
+		} finally {
+			gate.releaseAllResources();
+			netEnv.shutdown();
 		}
 	}
 
@@ -379,26 +400,39 @@ public class SingleInputGateTest {
 	 */
 	@Test
 	public void testRequestBuffersWithRemoteInputChannel() throws Exception {
-		final SingleInputGate inputGate = new SingleInputGate(
-			"t1",
-			new JobID(),
-			new IntermediateDataSetID(),
-			ResultPartitionType.PIPELINED_BOUNDED,
-			0,
-			1,
-			mock(TaskActions.class),
-			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
-
-		RemoteInputChannel remote = mock(RemoteInputChannel.class);
-		inputGate.setInputChannel(new IntermediateResultPartitionID(), remote);
-
-		final int buffersPerChannel = 2;
-		NetworkBufferPool network = mock(NetworkBufferPool.class);
-		// Trigger requests of segments from global pool and assign buffers to remote input channel
-		inputGate.assignExclusiveSegments(network, buffersPerChannel);
-
-		verify(network, times(1)).requestMemorySegments(buffersPerChannel);
-		verify(remote, times(1)).assignExclusiveSegments(anyListOf(MemorySegment.class));
+		final SingleInputGate inputGate = createInputGate(1, ResultPartitionType.PIPELINED_BOUNDED);
+		int buffersPerChannel = 2;
+		int extraNetworkBuffersPerGate = 8;
+		final NetworkEnvironment network = createNetworkEnvironment(buffersPerChannel,
+			extraNetworkBuffersPerGate, 0, 0);
+
+		try {
+			final ResultPartitionID resultPartitionId = new ResultPartitionID();
+			final ConnectionID connectionId = new ConnectionID(new InetSocketAddress("localhost", 5000), 0);
+			addRemoteInputChannel(network, inputGate, connectionId, resultPartitionId, 0);
+
+			network.setupInputGate(inputGate);
+
+			NetworkBufferPool bufferPool = network.getNetworkBufferPool();
+			if (enableCreditBasedFlowControl) {
+				verify(bufferPool,
+					times(1)).requestMemorySegments(buffersPerChannel);
+				RemoteInputChannel remote = (RemoteInputChannel) inputGate.getInputChannels()
+					.get(resultPartitionId.getPartitionId());
+				// only the exclusive buffers should be assigned/available now
+				assertEquals(buffersPerChannel, remote.getNumberOfAvailableBuffers());
+
+				assertEquals(bufferPool.getTotalNumberOfMemorySegments() - buffersPerChannel,
+					bufferPool.getNumberOfAvailableMemorySegments());
+				// note: exclusive buffers are not handed out into LocalBufferPool and are thus not counted
+				assertEquals(extraNetworkBuffersPerGate, bufferPool.countBuffers());
+			} else {
+				assertEquals(buffersPerChannel + extraNetworkBuffersPerGate, bufferPool.countBuffers());
+			}
+		} finally {
+			inputGate.releaseAllResources();
+			network.shutdown();
+		}
 	}
 
 	/**
@@ -407,51 +441,195 @@ public class SingleInputGateTest {
 	 */
 	@Test
 	public void testRequestBuffersWithUnknownInputChannel() throws Exception {
-		final SingleInputGate inputGate = createInputGate(1);
+		final SingleInputGate inputGate = createInputGate(1, ResultPartitionType.PIPELINED_BOUNDED);
+		int buffersPerChannel = 2;
+		int extraNetworkBuffersPerGate = 8;
+		final NetworkEnvironment network = createNetworkEnvironment(buffersPerChannel, extraNetworkBuffersPerGate, 0, 0);
 
-		UnknownInputChannel unknown = mock(UnknownInputChannel.class);
-		final ResultPartitionID resultPartitionId = new ResultPartitionID();
-		inputGate.setInputChannel(resultPartitionId.getPartitionId(), unknown);
+		try {
+			final ResultPartitionID resultPartitionId = new ResultPartitionID();
+			addUnknownInputChannel(network, inputGate, resultPartitionId, 0);
 
-		RemoteInputChannel remote = mock(RemoteInputChannel.class);
-		final ConnectionID connectionId = new ConnectionID(new InetSocketAddress("localhost", 5000), 0);
-		when(unknown.toRemoteInputChannel(connectionId)).thenReturn(remote);
+			network.setupInputGate(inputGate);
+			NetworkBufferPool bufferPool = network.getNetworkBufferPool();
 
-		final int buffersPerChannel = 2;
-		NetworkBufferPool network = mock(NetworkBufferPool.class);
-		inputGate.assignExclusiveSegments(network, buffersPerChannel);
+			if (enableCreditBasedFlowControl) {
+				verify(bufferPool, times(0)).requestMemorySegments(buffersPerChannel);
 
-		// Trigger updates to remote input channel from unknown input channel
-		inputGate.updateInputChannel(new InputChannelDeploymentDescriptor(
-			resultPartitionId,
-			ResultPartitionLocation.createRemote(connectionId)));
+				assertEquals(bufferPool.getTotalNumberOfMemorySegments(),
+					bufferPool.getNumberOfAvailableMemorySegments());
+				// note: exclusive buffers are not handed out into LocalBufferPool and are thus not counted
+				assertEquals(extraNetworkBuffersPerGate, bufferPool.countBuffers());
+			} else {
+				assertEquals(buffersPerChannel + extraNetworkBuffersPerGate, bufferPool.countBuffers());
+			}
+
+			// Trigger updates to remote input channel from unknown input channel
+			final ConnectionID connectionId = new ConnectionID(new InetSocketAddress("localhost", 5000), 0);
+			inputGate.updateInputChannel(new InputChannelDeploymentDescriptor(
+				resultPartitionId,
+				ResultPartitionLocation.createRemote(connectionId)));
+
+			if (enableCreditBasedFlowControl) {
+				verify(bufferPool,
+					times(1)).requestMemorySegments(buffersPerChannel);
+				RemoteInputChannel remote = (RemoteInputChannel) inputGate.getInputChannels()
+					.get(resultPartitionId.getPartitionId());
+				// only the exclusive buffers should be assigned/available now
+				assertEquals(buffersPerChannel, remote.getNumberOfAvailableBuffers());
+
+				assertEquals(bufferPool.getTotalNumberOfMemorySegments() - buffersPerChannel,
+					bufferPool.getNumberOfAvailableMemorySegments());
+				// note: exclusive buffers are not handed out into LocalBufferPool and are thus not counted
+				assertEquals(extraNetworkBuffersPerGate, bufferPool.countBuffers());
+			} else {
+				assertEquals(buffersPerChannel + extraNetworkBuffersPerGate, bufferPool.countBuffers());
+			}
+		} finally {
+			inputGate.releaseAllResources();
+			network.shutdown();
+		}
+	}
 
-		verify(network, times(1)).requestMemorySegments(buffersPerChannel);
-		verify(remote, times(1)).assignExclusiveSegments(anyListOf(MemorySegment.class));
+	/**
+	 * Tests that input gate can successfully convert unknown input channels into local and remote
+	 * channels.
+	 */
+	@Test
+	public void testUpdateUnknownInputChannel() throws Exception {
+		final SingleInputGate inputGate = createInputGate(2);
+		int buffersPerChannel = 2;
+		final NetworkEnvironment network = createNetworkEnvironment(buffersPerChannel, 8, 0, 0);
+
+		try {
+			final ResultPartitionID localResultPartitionId = new ResultPartitionID();
+			addUnknownInputChannel(network, inputGate, localResultPartitionId, 0);
+
+			final ResultPartitionID remoteResultPartitionId = new ResultPartitionID();
+			addUnknownInputChannel(network, inputGate, remoteResultPartitionId, 1);
+
+			network.setupInputGate(inputGate);
+
+			assertThat(inputGate.getInputChannels().get(remoteResultPartitionId.getPartitionId()),
+				is(instanceOf((UnknownInputChannel.class))));
+			assertThat(inputGate.getInputChannels().get(localResultPartitionId.getPartitionId()),
+				is(instanceOf((UnknownInputChannel.class))));
+
+			// Trigger updates to remote input channel from unknown input channel
+			final ConnectionID remoteConnectionId = new ConnectionID(new InetSocketAddress("localhost", 5000), 0);
+			inputGate.updateInputChannel(new InputChannelDeploymentDescriptor(
+				remoteResultPartitionId,
+				ResultPartitionLocation.createRemote(remoteConnectionId)));
+
+			assertThat(inputGate.getInputChannels().get(remoteResultPartitionId.getPartitionId()),
+				is(instanceOf((RemoteInputChannel.class))));
+			assertThat(inputGate.getInputChannels().get(localResultPartitionId.getPartitionId()),
+				is(instanceOf((UnknownInputChannel.class))));
+
+			// Trigger updates to local input channel from unknown input channel
+			inputGate.updateInputChannel(new InputChannelDeploymentDescriptor(
+				localResultPartitionId,
+				ResultPartitionLocation.createLocal()));
+
+			assertThat(inputGate.getInputChannels().get(remoteResultPartitionId.getPartitionId()),
+				is(instanceOf((RemoteInputChannel.class))));
+			assertThat(inputGate.getInputChannels().get(localResultPartitionId.getPartitionId()),
+				is(instanceOf((LocalInputChannel.class))));
+		} finally {
+			inputGate.releaseAllResources();
+			network.shutdown();
+		}
 	}
 
 	// ---------------------------------------------------------------------------------------------
 
-	private static SingleInputGate createInputGate() {
+	private NetworkEnvironment createNetworkEnvironment(
+			int buffersPerChannel,
+			int extraNetworkBuffersPerGate,
+			int initialBackoff,
+			int maxBackoff) {
+		return new NetworkEnvironment(
+			spy(new NetworkBufferPool(100, 32)),
+			new LocalConnectionManager(),
+			new ResultPartitionManager(),
+			new TaskEventDispatcher(),
+			new KvStateRegistry(),
+			null,
+			null,
+			IOManager.IOMode.SYNC,
+			initialBackoff,
+			maxBackoff,
+			buffersPerChannel,
+			extraNetworkBuffersPerGate,
+			enableCreditBasedFlowControl);
+	}
+
+	private SingleInputGate createInputGate() {
 		return createInputGate(2);
 	}
 
-	private static SingleInputGate createInputGate(int numberOfInputChannels) {
+	private SingleInputGate createInputGate(int numberOfInputChannels) {
+		return createInputGate(numberOfInputChannels, ResultPartitionType.PIPELINED);
+	}
+
+	private SingleInputGate createInputGate(
+			int numberOfInputChannels, ResultPartitionType partitionType) {
 		SingleInputGate inputGate = new SingleInputGate(
 			"Test Task Name",
 			new JobID(),
 			new IntermediateDataSetID(),
-			ResultPartitionType.PIPELINED,
+			partitionType,
 			0,
 			numberOfInputChannels,
 			mock(TaskActions.class),
-			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
+			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
+			enableCreditBasedFlowControl);
 
-		assertEquals(ResultPartitionType.PIPELINED, inputGate.getConsumedPartitionType());
+		assertEquals(partitionType, inputGate.getConsumedPartitionType());
 
 		return inputGate;
 	}
 
+	private void addUnknownInputChannel(
+			NetworkEnvironment network,
+			SingleInputGate inputGate,
+			ResultPartitionID partitionId,
+			int channelIndex) {
+		UnknownInputChannel unknown =
+			createUnknownInputChannel(network, inputGate, partitionId, channelIndex);
+		inputGate.setInputChannel(partitionId.getPartitionId(), unknown);
+	}
+
+	private UnknownInputChannel createUnknownInputChannel(
+			NetworkEnvironment network,
+			SingleInputGate inputGate,
+			ResultPartitionID partitionId,
+			int channelIndex) {
+		return new UnknownInputChannel(
+			inputGate,
+			channelIndex,
+			partitionId,
+			network.getResultPartitionManager(),
+			network.getTaskEventDispatcher(),
+			network.getConnectionManager(),
+			network.getPartitionRequestInitialBackoff(),
+			network.getPartitionRequestMaxBackoff(),
+			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()
+		);
+	}
+
+	private void addRemoteInputChannel(
+			NetworkEnvironment network,
+			SingleInputGate inputGate,
+			ConnectionID connectionId,
+			ResultPartitionID partitionId,
+			int channelIndex) {
+		RemoteInputChannel remote =
+			createUnknownInputChannel(network, inputGate, partitionId, channelIndex)
+				.toRemoteInputChannel(connectionId);
+		inputGate.setInputChannel(partitionId.getPartitionId(), remote);
+	}
+
 	static void verifyBufferOrEvent(
 			InputGate inputGate,
 			boolean expectedIsBuffer,

http://git-wip-us.apache.org/repos/asf/flink/blob/56e2b0b5/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
index 0ae6e74..33dc1ca 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
@@ -60,7 +60,8 @@ public class TestSingleInputGate {
 			0,
 			numberOfInputChannels,
 			mock(TaskActions.class),
-			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
+			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
+			true);
 
 		this.inputGate = spy(realGate);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/56e2b0b5/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index 912cd5b..081d97d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -50,13 +50,15 @@ public class UnionInputGateTest {
 			new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
 			0, 3,
 			mock(TaskActions.class),
-			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
+			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
+			true);
 		final SingleInputGate ig2 = new SingleInputGate(
 			testTaskName, new JobID(),
 			new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
 			0, 5,
 			mock(TaskActions.class),
-			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
+			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
+			true);
 
 		final UnionInputGate union = new UnionInputGate(new SingleInputGate[]{ig1, ig2});
 


[08/16] flink git commit: [hotfix] [yarn] Remove unused field appReport in YarnClusterClient.

Posted by se...@apache.org.
[hotfix] [yarn] Remove unused field appReport in YarnClusterClient.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4409ba20
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4409ba20
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4409ba20

Branch: refs/heads/release-1.5
Commit: 4409ba201a54564042dc6690b800b322745942c0
Parents: 557bf0a
Author: gyao <ga...@data-artisans.com>
Authored: Thu Apr 26 17:44:56 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Apr 30 23:25:12 2018 +0200

----------------------------------------------------------------------
 .../src/main/java/org/apache/flink/yarn/YarnClusterClient.java     | 2 --
 1 file changed, 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4409ba20/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index 2ac9664..3edd6e4 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -68,7 +68,6 @@ public class YarnClusterClient extends ClusterClient<ApplicationId> {
 	private final int slotsPerTaskManager;
 	private final LazApplicationClientLoader applicationClient;
 	private final FiniteDuration akkaDuration;
-	private final ApplicationReport appReport;
 	private final ApplicationId appId;
 	private final String trackingURL;
 
@@ -101,7 +100,6 @@ public class YarnClusterClient extends ClusterClient<ApplicationId> {
 		this.clusterDescriptor = clusterDescriptor;
 		this.numberTaskManagers = numberTaskManagers;
 		this.slotsPerTaskManager = slotsPerTaskManager;
-		this.appReport = appReport;
 		this.appId = appReport.getApplicationId();
 		this.trackingURL = appReport.getTrackingUrl();
 		this.newlyCreatedCluster = newlyCreatedCluster;


[03/16] flink git commit: [hotfix] Replace String concatenation with Slf4j placeholders.

Posted by se...@apache.org.
[hotfix] Replace String concatenation with Slf4j placeholders.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/57c9ca27
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/57c9ca27
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/57c9ca27

Branch: refs/heads/release-1.5
Commit: 57c9ca2775ba53a6ba58ac391b29ae1c9cee66d2
Parents: 58e53b0
Author: gyao <ga...@data-artisans.com>
Authored: Thu Apr 19 10:29:43 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Apr 30 23:24:57 2018 +0200

----------------------------------------------------------------------
 flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/57c9ca27/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 2ca4cb6..fb9a478 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -156,7 +156,7 @@ public final class Utils {
 
 		Path dst = new Path(homedir, suffix);
 
-		LOG.debug("Copying from " + localSrcPath + " to " + dst);
+		LOG.debug("Copying from {} to {}", localSrcPath, dst);
 
 		fs.copyFromLocalFile(false, true, localSrcPath, dst);
 


[16/16] flink git commit: [FLINK-6557] [rocksdb] Use File instead of Path for RocksDB local temp directories.

Posted by se...@apache.org.
[FLINK-6557] [rocksdb] Use File instead of Path for RocksDB local temp directories.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b7532cb7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b7532cb7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b7532cb7

Branch: refs/heads/release-1.5
Commit: b7532cb738398aa3b05c0769705c45fe22eb6a04
Parents: 56e2b0b
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Apr 30 22:50:24 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Apr 30 23:25:43 2018 +0200

----------------------------------------------------------------------
 .../streaming/state/RocksDBStateBackend.java    |  69 ++++++---
 .../state/RocksDBStateBackendConfigTest.java    | 139 +++++++++++++++----
 2 files changed, 168 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b7532cb7/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 9389295..81d6265 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -83,6 +83,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
 	/** The number of (re)tries for loading the RocksDB JNI library. */
 	private static final int ROCKSDB_LIB_LOADING_ATTEMPTS = 3;
 
+	/** Flag whether the native library has been loaded. */
 	private static boolean rocksDbInitialized = false;
 
 	// ------------------------------------------------------------------------
@@ -96,7 +97,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
 	 * Null if not yet set, in which case the configuration values will be used.
 	 * The configuration defaults to the TaskManager's temp directories. */
 	@Nullable
-	private Path[] localRocksDbDirectories;
+	private File[] localRocksDbDirectories;
 
 	/** The pre-configured option settings. */
 	private PredefinedOptions predefinedOptions = PredefinedOptions.DEFAULT;
@@ -169,6 +170,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
 	 * @param checkpointDataUri The URI describing the filesystem and path to the checkpoint data directory.
 	 * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
 	 */
+	@SuppressWarnings("deprecation")
 	public RocksDBStateBackend(URI checkpointDataUri) throws IOException {
 		this(new FsStateBackend(checkpointDataUri));
 	}
@@ -186,6 +188,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
 	 * @param enableIncrementalCheckpointing True if incremental checkpointing is enabled.
 	 * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
 	 */
+	@SuppressWarnings("deprecation")
 	public RocksDBStateBackend(URI checkpointDataUri, boolean enableIncrementalCheckpointing) throws IOException {
 		this(new FsStateBackend(checkpointDataUri), enableIncrementalCheckpointing);
 	}
@@ -326,16 +329,15 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
 		}
 		else {
 			List<File> dirs = new ArrayList<>(localRocksDbDirectories.length);
-			String errorMessage = "";
+			StringBuilder errorMessage = new StringBuilder();
 
-			for (Path path : localRocksDbDirectories) {
-				File f = new File(path.toUri().getPath());
+			for (File f : localRocksDbDirectories) {
 				File testDir = new File(f, UUID.randomUUID().toString());
 				if (!testDir.mkdirs()) {
-					String msg = "Local DB files directory '" + path
+					String msg = "Local DB files directory '" + f
 							+ "' does not exist and cannot be created. ";
 					LOG.error(msg);
-					errorMessage += msg;
+					errorMessage.append(msg);
 				} else {
 					dirs.add(f);
 				}
@@ -455,9 +457,13 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
 	}
 
 	/**
-	 * Sets the paths across which the local RocksDB database files are distributed on the local
-	 * file system. Setting these paths overrides the default behavior, where the
-	 * files are stored across the configured temp directories.
+	 * Sets the directories in which the local RocksDB database puts its files (like SST and
+	 * metadata files). These directories do not need to be persistent, they can be ephemeral,
+	 * meaning that they are lost on a machine failure, because state in RocksDB is persisted
+	 * in checkpoints.
+	 *
+	 * <p>If nothing is configured, these directories default to the TaskManager's local
+	 * temporary file directories.
 	 *
 	 * <p>Each distinct state will be stored in one path, but when the state backend creates
 	 * multiple states, they will store their files on different paths.
@@ -475,17 +481,41 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
 			throw new IllegalArgumentException("empty paths");
 		}
 		else {
-			Path[] pp = new Path[paths.length];
+			File[] pp = new File[paths.length];
 
 			for (int i = 0; i < paths.length; i++) {
-				if (paths[i] == null) {
+				final String rawPath = paths[i];
+				final String path;
+
+				if (rawPath == null) {
 					throw new IllegalArgumentException("null path");
 				}
+				else {
+					// we need this for backwards compatibility, to allow URIs like 'file:///'...
+					URI uri = null;
+					try {
+						uri = new Path(rawPath).toUri();
+					}
+					catch (Exception e) {
+						// cannot parse as a path
+					}
 
-				pp[i] = new Path(paths[i]);
-				String scheme = pp[i].toUri().getScheme();
-				if (scheme != null && !scheme.equalsIgnoreCase("file")) {
-					throw new IllegalArgumentException("Path " + paths[i] + " has a non local scheme");
+					if (uri != null && uri.getScheme() != null) {
+						if ("file".equalsIgnoreCase(uri.getScheme())) {
+							path = uri.getPath();
+						}
+						else {
+							throw new IllegalArgumentException("Path " + rawPath + " has a non-local scheme");
+						}
+					}
+					else {
+						path = rawPath;
+					}
+				}
+
+				pp[i] = new File(path);
+				if (!pp[i].isAbsolute()) {
+					throw new IllegalArgumentException("Relative paths are not supported");
 				}
 			}
 
@@ -494,8 +524,15 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
 	}
 
 	/**
+	 * Gets the configured local DB storage paths, or null, if none were configured.
+	 *
+	 * <p>Under these directories on the TaskManager, RocksDB stores its SST files and
+	 * metadata files. These directories do not need to be persistent, they can be ephermeral,
+	 * meaning that they are lost on a machine failure, because state in RocksDB is persisted
+	 * in checkpoints.
 	 *
-	 * @return The configured DB storage paths, or null, if none were configured.
+	 * <p>If nothing is configured, these directories default to the TaskManager's local
+	 * temporary file directories.
 	 */
 	public String[] getDbStoragePaths() {
 		if (localRocksDbDirectories == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b7532cb7/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 65d5b2e..4bc2f9f 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -23,14 +23,17 @@ import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
@@ -67,10 +70,10 @@ import static org.mockito.Mockito.when;
 public class RocksDBStateBackendConfigTest {
 
 	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
+	public final TemporaryFolder tempFolder = new TemporaryFolder();
 
 	// ------------------------------------------------------------------------
-	//  RocksDB local file directory
+	//  default values
 	// ------------------------------------------------------------------------
 
 	@Test
@@ -81,39 +84,99 @@ public class RocksDBStateBackendConfigTest {
 		assertEquals(defaultIncremental, backend.isIncrementalCheckpointsEnabled());
 	}
 
+	// ------------------------------------------------------------------------
+	//  RocksDB local file directory
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This test checks the behavior for basic setting of local DB directories.
+	 */
 	@Test
 	public void testSetDbPath() throws Exception {
-		String checkpointPath = tempFolder.newFolder().toURI().toString();
-		File testDir1 = tempFolder.newFolder();
-		File testDir2 = tempFolder.newFolder();
+		final RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(tempFolder.newFolder().toURI().toString());
 
-		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath);
+		final String testDir1 = tempFolder.newFolder().getAbsolutePath();
+		final String testDir2 = tempFolder.newFolder().getAbsolutePath();
 
 		assertNull(rocksDbBackend.getDbStoragePaths());
 
-		rocksDbBackend.setDbStoragePath(testDir1.getAbsolutePath());
-		assertArrayEquals(new String[] { new Path(testDir1.getAbsolutePath()).toString() }, rocksDbBackend.getDbStoragePaths());
+		rocksDbBackend.setDbStoragePath(testDir1);
+		assertArrayEquals(new String[] { testDir1 }, rocksDbBackend.getDbStoragePaths());
 
 		rocksDbBackend.setDbStoragePath(null);
 		assertNull(rocksDbBackend.getDbStoragePaths());
 
-		rocksDbBackend.setDbStoragePaths(testDir1.getAbsolutePath(), testDir2.getAbsolutePath());
-		assertArrayEquals(new String[] { new Path(testDir1.getAbsolutePath()).toString(), new Path(testDir2.getAbsolutePath()).toString() }, rocksDbBackend.getDbStoragePaths());
+		rocksDbBackend.setDbStoragePaths(testDir1, testDir2);
+		assertArrayEquals(new String[] { testDir1, testDir2 }, rocksDbBackend.getDbStoragePaths());
 
-		Environment env = getMockEnvironment();
-		RocksDBKeyedStateBackend<Integer> keyedBackend = (RocksDBKeyedStateBackend<Integer>) rocksDbBackend.
-				createKeyedStateBackend(
-						env,
-						env.getJobID(),
-						"test_op",
-						IntSerializer.INSTANCE,
-						1,
-						new KeyGroupRange(0, 0),
-						env.getTaskKvStateRegistry());
+		final RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend);
+
+		try {
+			File instanceBasePath = keyedBackend.getInstanceBasePath();
+			assertThat(instanceBasePath.getAbsolutePath(), anyOf(startsWith(testDir1), startsWith(testDir2)));
+
+			//noinspection NullArgumentToVariableArgMethod
+			rocksDbBackend.setDbStoragePaths(null);
+			assertNull(rocksDbBackend.getDbStoragePaths());
+		}
+		finally {
+			IOUtils.closeQuietly(keyedBackend);
+			keyedBackend.dispose();
+		}
+	}
+
+	@Test
+	public void testStoragePathWithFilePrefix() throws Exception {
+		final File folder = tempFolder.newFolder();
+		final String dbStoragePath = new Path(folder.toURI().toString()).toString();
+
+		assertTrue(dbStoragePath.startsWith("file:"));
+
+		testLocalDbPaths(dbStoragePath, folder);
+	}
+
+	@Test
+	public void testWithDefaultFsSchemeNoStoragePath() throws Exception {
+		try {
+			// set the default file system scheme
+			Configuration config = new Configuration();
+			config.setString(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, "s3://mydomain.com:8020/flink");
+			FileSystem.initialize(config);
+
+			testLocalDbPaths(null, new File(CommonTestUtils.getTempDir()));
+		}
+		finally {
+			FileSystem.initialize(new Configuration());
+		}
+	}
+
+	@Test
+	public void testWithDefaultFsSchemeAbsoluteStoragePath() throws Exception {
+		final File folder = tempFolder.newFolder();
+		final String dbStoragePath = folder.getAbsolutePath();
+
+		try {
+			// set the default file system scheme
+			Configuration config = new Configuration();
+			config.setString(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, "s3://mydomain.com:8020/flink");
+			FileSystem.initialize(config);
+
+			testLocalDbPaths(dbStoragePath, folder);
+		}
+		finally {
+			FileSystem.initialize(new Configuration());
+		}
+	}
+
+	private void testLocalDbPaths(String configuredPath, File expectedPath) throws Exception {
+		final RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(tempFolder.newFolder().toURI().toString());
+		rocksDbBackend.setDbStoragePath(configuredPath);
+
+		RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend);
 
 		try {
 			File instanceBasePath = keyedBackend.getInstanceBasePath();
-			assertThat(instanceBasePath.getAbsolutePath(), anyOf(startsWith(testDir1.getAbsolutePath()), startsWith(testDir2.getAbsolutePath())));
+			assertThat(instanceBasePath.getAbsolutePath(), startsWith(expectedPath.getAbsolutePath()));
 
 			//noinspection NullArgumentToVariableArgMethod
 			rocksDbBackend.setDbStoragePaths(null);
@@ -124,13 +187,19 @@ public class RocksDBStateBackendConfigTest {
 		}
 	}
 
+	/**
+	 * Validates that empty arguments for the local DB path are invalid.
+	 */
 	@Test(expected = IllegalArgumentException.class)
-	public void testSetNullPaths() throws Exception {
+	public void testSetEmptyPaths() throws Exception {
 		String checkpointPath = tempFolder.newFolder().toURI().toString();
 		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath);
 		rocksDbBackend.setDbStoragePaths();
 	}
 
+	/**
+	 * Validates that schemes other than 'file:/' are not allowed.
+	 */
 	@Test(expected = IllegalArgumentException.class)
 	public void testNonFileSchemePath() throws Exception {
 		String checkpointPath = tempFolder.newFolder().toURI().toString();
@@ -138,6 +207,12 @@ public class RocksDBStateBackendConfigTest {
 		rocksDbBackend.setDbStoragePath("hdfs:///some/path/to/perdition");
 	}
 
+	@Test(expected = IllegalArgumentException.class)
+	public void testDbPathRelativePaths() throws Exception {
+		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(tempFolder.newFolder().toURI().toString());
+		rocksDbBackend.setDbStoragePath("relative/path");
+	}
+
 	// ------------------------------------------------------------------------
 	//  RocksDB local file automatic from temp directories
 	// ------------------------------------------------------------------------
@@ -381,7 +456,7 @@ public class RocksDBStateBackendConfigTest {
 
 	@Test
 	public void testCallsForwardedToNonPartitionedBackend() throws Exception {
-		AbstractStateBackend storageBackend = new MemoryStateBackend();
+		StateBackend storageBackend = new MemoryStateBackend();
 		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(storageBackend);
 		assertEquals(storageBackend, rocksDbBackend.getCheckpointBackend());
 	}
@@ -390,6 +465,22 @@ public class RocksDBStateBackendConfigTest {
 	//  Utilities
 	// ------------------------------------------------------------------------
 
+	static RocksDBKeyedStateBackend<Integer> createKeyedStateBackend(
+			RocksDBStateBackend rocksDbBackend) throws Exception {
+
+		final Environment env = getMockEnvironment();
+
+		return (RocksDBKeyedStateBackend<Integer>) rocksDbBackend.
+				createKeyedStateBackend(
+						env,
+						env.getJobID(),
+						"test_op",
+						IntSerializer.INSTANCE,
+						1,
+						new KeyGroupRange(0, 0),
+						env.getTaskKvStateRegistry());
+	}
+
 	static Environment getMockEnvironment() {
 		return getMockEnvironment(new File[] { new File(System.getProperty("java.io.tmpdir")) });
 	}


[02/16] flink git commit: [hotfix] Update README.md building prerequisites

Posted by se...@apache.org.
[hotfix] Update README.md building prerequisites

This closes #5924


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/58e53b0b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/58e53b0b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/58e53b0b

Branch: refs/heads/release-1.5
Commit: 58e53b0b9bc0f9583be3d77436a99739fe23cf32
Parents: f1d536a
Author: 陈梓立 <wa...@gmail.com>
Authored: Fri Apr 27 09:07:33 2018 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Apr 30 23:24:54 2018 +0200

----------------------------------------------------------------------
 README.md | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/58e53b0b/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 095f652a..7966ecd 100644
--- a/README.md
+++ b/README.md
@@ -67,10 +67,10 @@ counts.writeAsCsv(outputPath)
 
 Prerequisites for building Flink:
 
-* Unix-like environment (We use Linux, Mac OS X, Cygwin)
+* Unix-like environment (we use Linux, Mac OS X, Cygwin)
 * git
 * Maven (we recommend version 3.2.5)
-* Java 8
+* Java 8 (Java 9 and 10 are not yet supported)
 
 ```
 git clone https://github.com/apache/flink.git