You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/02 21:58:37 UTC
[32/50] [abbrv] flink git commit: [hotfix] Add methods defined in the
gateway to the ResourceManager and TaskExecutor
[hotfix] Add methods defined in the gateway to the ResourceManager and TaskExecutor
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/125bda03
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/125bda03
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/125bda03
Branch: refs/heads/flip-6
Commit: 125bda03a1a6e19d1eca34e36f26c0f855e3bcce
Parents: 9bd91a6
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Sep 21 14:14:05 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Oct 2 23:44:45 2016 +0200
----------------------------------------------------------------------
.../runtime/resourcemanager/ResourceManager.java | 1 +
.../resourcemanager/ResourceManagerGateway.java | 2 +-
.../flink/runtime/taskexecutor/TaskExecutor.java | 15 +++++++++++++++
.../runtime/taskexecutor/TaskExecutorGateway.java | 6 +++---
4 files changed, 20 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/125bda03/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 29aba1a..d9a7134 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.resourcemanager;
+import akka.dispatch.Futures;
import akka.dispatch.Mapper;
import org.apache.flink.annotation.VisibleForTesting;
http://git-wip-us.apache.org/repos/asf/flink/blob/125bda03/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index e5c8b64..c8e3488 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -58,7 +58,7 @@ public interface ResourceManagerGateway extends RpcGateway {
* @param slotRequest Slot request
* @return Future slot assignment
*/
- Future<SlotRequestRegistered> requestSlot(SlotRequest slotRequest);
+ Future<SlotRequestReply> requestSlot(SlotRequest slotRequest);
/**
*
http://git-wip-us.apache.org/repos/asf/flink/blob/125bda03/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index a455fe2..fadae5f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.taskexecutor;
import akka.actor.ActorSystem;
import akka.util.Timeout;
import com.typesafe.config.Config;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.LocalConnectionManager;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
@@ -30,6 +31,8 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
import org.apache.flink.runtime.query.netty.KvStateServer;
+import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
+import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
@@ -201,6 +204,18 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
}
/**
+ * Requests a slot from the TaskManager
+ *
+ * @param allocationID id for the request
+ * @param resourceManagerLeaderID current leader id of the ResourceManager
+ * @return answer to the slot request
+ */
+ @RpcMethod
+ public SlotRequestReply requestSlot(AllocationID allocationID, UUID resourceManagerLeaderID) {
+ return new SlotRequestRegistered(allocationID);
+ }
+
+ /**
* Starts and runs the TaskManager.
* <p/>
* This method first tries to select the network interface to use for the TaskManager
http://git-wip-us.apache.org/repos/asf/flink/blob/125bda03/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index 7257436..65323a8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -39,12 +39,12 @@ public interface TaskExecutorGateway extends RpcGateway {
void notifyOfNewResourceManagerLeader(String address, UUID resourceManagerLeaderId);
/**
- * Send by the ResourceManager to the TaskExecutor
+ * Requests a slot from the TaskManager
+ *
* @param allocationID id for the request
* @param resourceManagerLeaderID current leader id of the ResourceManager
- * @return SlotRequestReply Answer to the request
+ * @return answer to the slot request
*/
-
Future<SlotRequestReply> requestSlot(
AllocationID allocationID,
UUID resourceManagerLeaderID,