You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2014/12/18 19:45:44 UTC

[48/82] [abbrv] incubator-flink git commit: Removed blocking call in Execution.deploySlot

Removed blocking call in Execution.deploySlot


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e85b82f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e85b82f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e85b82f8

Branch: refs/heads/master
Commit: e85b82f8990a089e023fea4f0e44b7c2ae913f40
Parents: 0924120
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Oct 29 18:23:50 2014 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Dec 18 18:58:30 2014 +0100

----------------------------------------------------------------------
 .../org/apache/flink/runtime/executiongraph/Execution.java   | 8 +++++++-
 .../flink/runtime/jobmanager/web/JobmanagerInfoServlet.java  | 8 ++++----
 2 files changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e85b82f8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 139409b..14e247f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -33,6 +33,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -46,6 +47,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFutureAction;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult;
 import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
@@ -276,7 +278,11 @@ public class Execution {
 				@Override
 				public TaskOperationResult call() throws Exception {
 					Instance instance = slot.getInstance();
-					return instance.submitTask(deployment);
+//					return instance.submitTask(deployment);
+
+					//TODO realize as an actor
+					return (TaskOperationResult)Patterns.ask(instance.getTaskManager(), new TaskManagerMessages
+							.SubmitTask(deployment), AkkaUtils.FUTURE_TIMEOUT());
 				}
 			}, AkkaUtils.globalExecutionContext());
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e85b82f8/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
index 9797808..f52da0d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
@@ -125,13 +125,13 @@ public class JobmanagerInfoServlet extends HttpServlet {
 				}
 			}
 			else if("taskmanagers".equals(req.getParameter("get"))) {
-				int numberOfTaskManagrs = AkkaUtils.<Integer>ask(jobmanager,
+				int numberOfTaskManagers = AkkaUtils.<Integer>ask(jobmanager,
 						RequestNumberRegisteredTaskManager$.MODULE$);
-				int numberOfRegisteredSltos = AkkaUtils.<Integer>ask(jobmanager,
+				int numberOfRegisteredSlots = AkkaUtils.<Integer>ask(jobmanager,
 						RequestTotalNumberOfSlots$.MODULE$);
 
-				resp.getWriter().write("{\"taskmanagers\": " + numberOfTaskManagrs +", " +
-						"\"slots\": "+numberOfRegisteredSltos+"}");
+				resp.getWriter().write("{\"taskmanagers\": " + numberOfTaskManagers +", " +
+						"\"slots\": "+numberOfRegisteredSlots+"}");
 			}
 			else if("cancel".equals(req.getParameter("get"))) {
 				String jobId = req.getParameter("job");