You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/05 01:59:52 UTC

[03/10] flink git commit: [FLINK-5238] [minicluster] MiniCluster starts local communication if only one TaskManager is used

[FLINK-5238] [minicluster] MiniCluster starts local communication if only one TaskManager is used


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/62e8e33f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/62e8e33f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/62e8e33f

Branch: refs/heads/flip-6
Commit: 62e8e33f341e95b70e090a6d0f7d5e75b9c4d4c9
Parents: 6b3283e
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Dec 2 17:00:25 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Dec 5 02:49:43 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/minicluster/MiniCluster.java  |  4 +++-
 .../runtime/taskexecutor/TaskManagerRunner.java | 22 +++++++++++++++-----
 2 files changed, 20 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/62e8e33f/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 1b9f265..29a6e59 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -525,6 +525,7 @@ public class MiniCluster {
 			RpcService[] taskManagerRpcServices) throws Exception {
 
 		final TaskManagerRunner[] taskManagerRunners = new TaskManagerRunner[numTaskManagers];
+		final boolean localCommunication = numTaskManagers == 1;
 
 		for (int i = 0; i < numTaskManagers; i++) {
 			taskManagerRunners[i] = new TaskManagerRunner(
@@ -532,7 +533,8 @@ public class MiniCluster {
 				new ResourceID(UUID.randomUUID().toString()),
 				taskManagerRpcServices[i],
 				haServices,
-				metricRegistry);
+				metricRegistry,
+				localCommunication);
 
 			taskManagerRunners[i].start();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/62e8e33f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index a18ff40..1145a46 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -66,11 +66,22 @@ public class TaskManagerRunner implements FatalErrorHandler {
 	private final TaskExecutor taskManager;
 
 	public TaskManagerRunner(
+			Configuration configuration,
+			ResourceID resourceID,
+			RpcService rpcService,
+			HighAvailabilityServices highAvailabilityServices,
+			MetricRegistry metricRegistry) throws Exception {
+
+		this(configuration, resourceID, rpcService, highAvailabilityServices, metricRegistry, false);
+	}
+
+	public TaskManagerRunner(
 		Configuration configuration,
 		ResourceID resourceID,
 		RpcService rpcService,
 		HighAvailabilityServices highAvailabilityServices,
-		MetricRegistry metricRegistry) throws Exception {
+		MetricRegistry metricRegistry,
+		boolean localCommunicationOnly) throws Exception {
 
 		this.configuration = Preconditions.checkNotNull(configuration);
 		this.resourceID = Preconditions.checkNotNull(resourceID);
@@ -80,10 +91,11 @@ public class TaskManagerRunner implements FatalErrorHandler {
 
 		InetAddress remoteAddress = InetAddress.getByName(rpcService.getAddress());
 
-		TaskManagerServicesConfiguration taskManagerServicesConfiguration = TaskManagerServicesConfiguration.fromConfiguration(
-			configuration,
-			remoteAddress,
-			false);
+		TaskManagerServicesConfiguration taskManagerServicesConfiguration = 
+				TaskManagerServicesConfiguration.fromConfiguration(
+						configuration,
+						remoteAddress,
+						localCommunicationOnly);
 
 		TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
 			taskManagerServicesConfiguration,