You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/05 01:59:52 UTC
[03/10] flink git commit: [FLINK-5238] [minicluster] MiniCluster
starts local communication if only one TaskManager is used
[FLINK-5238] [minicluster] MiniCluster starts local communication if only one TaskManager is used
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/62e8e33f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/62e8e33f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/62e8e33f
Branch: refs/heads/flip-6
Commit: 62e8e33f341e95b70e090a6d0f7d5e75b9c4d4c9
Parents: 6b3283e
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Dec 2 17:00:25 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Dec 5 02:49:43 2016 +0100
----------------------------------------------------------------------
.../flink/runtime/minicluster/MiniCluster.java | 4 +++-
.../runtime/taskexecutor/TaskManagerRunner.java | 22 +++++++++++++++-----
2 files changed, 20 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/62e8e33f/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 1b9f265..29a6e59 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -525,6 +525,7 @@ public class MiniCluster {
RpcService[] taskManagerRpcServices) throws Exception {
final TaskManagerRunner[] taskManagerRunners = new TaskManagerRunner[numTaskManagers];
+ final boolean localCommunication = numTaskManagers == 1;
for (int i = 0; i < numTaskManagers; i++) {
taskManagerRunners[i] = new TaskManagerRunner(
@@ -532,7 +533,8 @@ public class MiniCluster {
new ResourceID(UUID.randomUUID().toString()),
taskManagerRpcServices[i],
haServices,
- metricRegistry);
+ metricRegistry,
+ localCommunication);
taskManagerRunners[i].start();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/62e8e33f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index a18ff40..1145a46 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -66,11 +66,22 @@ public class TaskManagerRunner implements FatalErrorHandler {
private final TaskExecutor taskManager;
public TaskManagerRunner(
+ Configuration configuration,
+ ResourceID resourceID,
+ RpcService rpcService,
+ HighAvailabilityServices highAvailabilityServices,
+ MetricRegistry metricRegistry) throws Exception {
+
+ this(configuration, resourceID, rpcService, highAvailabilityServices, metricRegistry, false);
+ }
+
+ public TaskManagerRunner(
Configuration configuration,
ResourceID resourceID,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
- MetricRegistry metricRegistry) throws Exception {
+ MetricRegistry metricRegistry,
+ boolean localCommunicationOnly) throws Exception {
this.configuration = Preconditions.checkNotNull(configuration);
this.resourceID = Preconditions.checkNotNull(resourceID);
@@ -80,10 +91,11 @@ public class TaskManagerRunner implements FatalErrorHandler {
InetAddress remoteAddress = InetAddress.getByName(rpcService.getAddress());
- TaskManagerServicesConfiguration taskManagerServicesConfiguration = TaskManagerServicesConfiguration.fromConfiguration(
- configuration,
- remoteAddress,
- false);
+ TaskManagerServicesConfiguration taskManagerServicesConfiguration =
+ TaskManagerServicesConfiguration.fromConfiguration(
+ configuration,
+ remoteAddress,
+ localCommunicationOnly);
TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
taskManagerServicesConfiguration,