You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2014/12/18 19:45:44 UTC
[48/82] [abbrv] incubator-flink git commit: Removed blocking call in
Execution.deploySlot
Removed blocking call in Execution.deploySlot
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e85b82f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e85b82f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e85b82f8
Branch: refs/heads/master
Commit: e85b82f8990a089e023fea4f0e44b7c2ae913f40
Parents: 0924120
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Oct 29 18:23:50 2014 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Dec 18 18:58:30 2014 +0100
----------------------------------------------------------------------
.../org/apache/flink/runtime/executiongraph/Execution.java | 8 +++++++-
.../flink/runtime/jobmanager/web/JobmanagerInfoServlet.java | 8 ++++----
2 files changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e85b82f8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 139409b..14e247f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -33,6 +33,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -46,6 +47,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFutureAction;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
@@ -276,7 +278,11 @@ public class Execution {
@Override
public TaskOperationResult call() throws Exception {
Instance instance = slot.getInstance();
- return instance.submitTask(deployment);
+// return instance.submitTask(deployment);
+
+ //TODO realize as an actor
+ return (TaskOperationResult)Patterns.ask(instance.getTaskManager(), new TaskManagerMessages
+ .SubmitTask(deployment), AkkaUtils.FUTURE_TIMEOUT());
}
}, AkkaUtils.globalExecutionContext());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e85b82f8/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
index 9797808..f52da0d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
@@ -125,13 +125,13 @@ public class JobmanagerInfoServlet extends HttpServlet {
}
}
else if("taskmanagers".equals(req.getParameter("get"))) {
- int numberOfTaskManagrs = AkkaUtils.<Integer>ask(jobmanager,
+ int numberOfTaskManagers = AkkaUtils.<Integer>ask(jobmanager,
RequestNumberRegisteredTaskManager$.MODULE$);
- int numberOfRegisteredSltos = AkkaUtils.<Integer>ask(jobmanager,
+ int numberOfRegisteredSlots = AkkaUtils.<Integer>ask(jobmanager,
RequestTotalNumberOfSlots$.MODULE$);
- resp.getWriter().write("{\"taskmanagers\": " + numberOfTaskManagrs +", " +
- "\"slots\": "+numberOfRegisteredSltos+"}");
+ resp.getWriter().write("{\"taskmanagers\": " + numberOfTaskManagers +", " +
+ "\"slots\": "+numberOfRegisteredSlots+"}");
}
else if("cancel".equals(req.getParameter("get"))) {
String jobId = req.getParameter("job");