You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/02 21:58:37 UTC

[32/50] [abbrv] flink git commit: [hotfix] Add methods defined in the gateway to the ResourceManager and TaskExecutor

[hotfix] Add methods defined in the gateway to the ResourceManager and TaskExecutor


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/125bda03
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/125bda03
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/125bda03

Branch: refs/heads/flip-6
Commit: 125bda03a1a6e19d1eca34e36f26c0f855e3bcce
Parents: 9bd91a6
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Sep 21 14:14:05 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Oct 2 23:44:45 2016 +0200

----------------------------------------------------------------------
 .../runtime/resourcemanager/ResourceManager.java     |  1 +
 .../resourcemanager/ResourceManagerGateway.java      |  2 +-
 .../flink/runtime/taskexecutor/TaskExecutor.java     | 15 +++++++++++++++
 .../runtime/taskexecutor/TaskExecutorGateway.java    |  6 +++---
 4 files changed, 20 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/125bda03/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 29aba1a..d9a7134 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
+import akka.dispatch.Futures;
 import akka.dispatch.Mapper;
 
 import org.apache.flink.annotation.VisibleForTesting;

http://git-wip-us.apache.org/repos/asf/flink/blob/125bda03/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index e5c8b64..c8e3488 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -58,7 +58,7 @@ public interface ResourceManagerGateway extends RpcGateway {
 	 * @param slotRequest Slot request
 	 * @return Future slot assignment
 	 */
-	Future<SlotRequestRegistered> requestSlot(SlotRequest slotRequest);
+	Future<SlotRequestReply> requestSlot(SlotRequest slotRequest);
 
 	/**
 	 * 

http://git-wip-us.apache.org/repos/asf/flink/blob/125bda03/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index a455fe2..fadae5f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.taskexecutor;
 import akka.actor.ActorSystem;
 import akka.util.Timeout;
 import com.typesafe.config.Config;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.LocalConnectionManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
@@ -30,6 +31,8 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
 import org.apache.flink.runtime.query.netty.KvStateServer;
+import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
+import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
@@ -201,6 +204,18 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	}
 
 	/**
+	 * Requests a slot from the TaskManager
+	 *
+	 * @param allocationID id for the request
+	 * @param resourceManagerLeaderID current leader id of the ResourceManager
+	 * @return answer to the slot request
+	 */
+	@RpcMethod
+	public SlotRequestReply requestSlot(AllocationID allocationID, UUID resourceManagerLeaderID) {
+		return new SlotRequestRegistered(allocationID);
+	}
+
+	/**
 	 * Starts and runs the TaskManager.
 	 * <p/>
 	 * This method first tries to select the network interface to use for the TaskManager

http://git-wip-us.apache.org/repos/asf/flink/blob/125bda03/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index 7257436..65323a8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -39,12 +39,12 @@ public interface TaskExecutorGateway extends RpcGateway {
 	void notifyOfNewResourceManagerLeader(String address, UUID resourceManagerLeaderId);
 
 	/**
-	 * Send by the ResourceManager to the TaskExecutor
+	 * Requests a slot from the TaskManager
+	 *
 	 * @param allocationID id for the request
 	 * @param resourceManagerLeaderID current leader id of the ResourceManager
-	 * @return SlotRequestReply Answer to the request
+	 * @return answer to the slot request
 	 */
-
 	Future<SlotRequestReply> requestSlot(
 		AllocationID allocationID,
 		UUID resourceManagerLeaderID,