You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/07/13 18:54:10 UTC
[7/7] flink git commit: [FLINK-2329] [runtime] Introduces
InstanceGateway as an abstraction to communicate with the TaskManager.
[FLINK-2329] [runtime] Introduces InstanceGateway as an abstraction to communicate with the TaskManager.
Replaces AkkaUtils.globalExecutionContext with instance dependent ExecutionContext.
This closes #893
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2ccb5fdb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2ccb5fdb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2ccb5fdb
Branch: refs/heads/master
Commit: 2ccb5fdb47aa0e3766fd7fbd17a41feaca29fcbc
Parents: aa5e5b3
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jul 7 11:41:44 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jul 13 17:54:31 2015 +0200
----------------------------------------------------------------------
.../flink/runtime/executiongraph/Execution.java | 89 ++-
.../runtime/executiongraph/ExecutionGraph.java | 55 +-
.../runtime/executiongraph/ExecutionVertex.java | 40 +-
.../runtime/instance/AkkaInstanceGateway.java | 111 ++++
.../apache/flink/runtime/instance/Instance.java | 34 +-
.../flink/runtime/instance/InstanceGateway.java | 82 +++
.../flink/runtime/instance/InstanceManager.java | 3 +-
.../runtime/io/network/NetworkEnvironment.java | 34 +-
.../runtime/jobmanager/scheduler/Scheduler.java | 11 +-
.../jobmanager/web/SetupInfoServlet.java | 3 +-
.../web-docs-infoserver/js/taskmanager.js | 2 +-
.../apache/flink/runtime/akka/AkkaUtils.scala | 2 -
.../flink/runtime/jobmanager/JobManager.scala | 95 +++-
.../runtime/minicluster/FlinkMiniCluster.scala | 20 +-
.../flink/runtime/taskmanager/TaskManager.scala | 34 +-
.../executiongraph/AllVerticesIteratorTest.java | 2 +
.../ExecutionGraphConstructionTest.java | 54 +-
.../ExecutionGraphDeploymentTest.java | 59 +-
.../executiongraph/ExecutionGraphTestUtils.java | 121 ++--
.../ExecutionStateProgressTest.java | 31 +-
.../ExecutionVertexCancelTest.java | 563 ++++++++-----------
.../ExecutionVertexDeploymentTest.java | 115 +---
.../ExecutionVertexSchedulingTest.java | 35 +-
.../executiongraph/LocalInputSplitsTest.java | 53 +-
.../executiongraph/PointwisePatternTest.java | 50 +-
.../TerminalStateDeadlockTest.java | 10 +-
.../VertexLocationConstraintTest.java | 91 +--
.../executiongraph/VertexSlotSharingTest.java | 7 +-
.../instance/BaseTestingInstanceGateway.java | 94 ++++
.../runtime/instance/DummyInstanceGateway.java | 57 ++
.../flink/runtime/instance/InstanceTest.java | 7 +-
.../flink/runtime/instance/SimpleSlotTest.java | 4 +-
.../io/network/NetworkEnvironmentTest.java | 6 +-
.../ScheduleWithCoLocationHintTest.java | 37 +-
.../scheduler/SchedulerIsolatedTasksTest.java | 31 +-
.../scheduler/SchedulerSlotSharingTest.java | 51 +-
.../scheduler/SchedulerTestUtils.java | 4 +-
...askManagerComponentsStartupShutdownTest.java | 6 +-
.../ExecutionGraphRestartTest.scala | 40 +-
.../TaskManagerLossFailsTasksTest.scala | 33 +-
.../runtime/jobmanager/RecoveryITCase.scala | 8 +-
.../runtime/testingUtils/TestingCluster.scala | 30 +-
.../testingUtils/TestingJobManager.scala | 15 +-
.../TestingJobManagerMessages.scala | 3 +-
.../runtime/testingUtils/TestingUtils.scala | 62 +-
.../apache/flink/test/util/TestBaseUtils.java | 6 +-
.../test/util/ForkableFlinkMiniCluster.scala | 36 +-
.../taskmanager/TaskManagerFailsITCase.scala | 12 +-
.../apache/flink/yarn/ApplicationMaster.scala | 30 +-
.../flink/yarn/ApplicationMasterActor.scala | 2 +-
50 files changed, 1399 insertions(+), 981 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 76a58e8..af67c3f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -18,13 +18,9 @@
package org.apache.flink.runtime.executiongraph;
-import akka.actor.ActorRef;
import akka.dispatch.OnComplete;
import akka.dispatch.OnFailure;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
@@ -32,6 +28,7 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.instance.InstanceGateway;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -50,6 +47,7 @@ import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.util.SerializedValue;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
+import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
@@ -131,9 +129,20 @@ public class Execution implements Serializable {
private SerializedValue<StateHandle<?>> operatorState;
+ /** The execution context which is used to execute futures. */
+ @SuppressWarnings("NonSerializableFieldInSerializableClass")
+ private ExecutionContext executionContext;
+
// --------------------------------------------------------------------------------------------
- public Execution(ExecutionVertex vertex, int attemptNumber, long startTimestamp, FiniteDuration timeout) {
+ public Execution(
+ ExecutionContext executionContext,
+ ExecutionVertex vertex,
+ int attemptNumber,
+ long startTimestamp,
+ FiniteDuration timeout) {
+ this.executionContext = checkNotNull(executionContext);
+
this.vertex = checkNotNull(vertex);
this.attemptId = new ExecutionAttemptID();
@@ -200,6 +209,8 @@ public class Execution implements Serializable {
}
assignedResource = null;
+ executionContext = null;
+
partialInputChannelDeploymentDescriptors.clear();
partialInputChannelDeploymentDescriptors = null;
}
@@ -338,8 +349,9 @@ public class Execution implements Serializable {
vertex.getExecutionGraph().registerExecution(this);
final Instance instance = slot.getInstance();
- final Future<Object> deployAction = Patterns.ask(instance.getTaskManager(),
- new SubmitTask(deployment), new Timeout(timeout));
+ final InstanceGateway gateway = instance.getInstanceGateway();
+
+ final Future<Object> deployAction = gateway.ask(new SubmitTask(deployment), timeout);
deployAction.onComplete(new OnComplete<Object>(){
@@ -366,7 +378,7 @@ public class Execution implements Serializable {
}
}
}
- }, AkkaUtils.globalExecutionContext());
+ }, executionContext);
}
catch (Throwable t) {
markFailed(t);
@@ -402,7 +414,7 @@ public class Execution implements Serializable {
else if (current == FINISHED || current == FAILED) {
// nothing to do any more. finished failed before it could be cancelled.
// in any case, the task is removed from the TaskManager already
- sendFailIntermediateResultPartitionsRPCCall();
+ sendFailIntermediateResultPartitionsRpcCall();
return;
}
@@ -485,7 +497,7 @@ public class Execution implements Serializable {
return true;
}
- }, AkkaUtils.globalExecutionContext());
+ }, executionContext);
// double check to resolve race conditions
if(consumerVertex.getExecutionState() == RUNNING){
@@ -533,7 +545,7 @@ public class Execution implements Serializable {
final UpdatePartitionInfo updateTaskMessage = new UpdateTaskSinglePartitionInfo(
consumer.getAttemptId(), partition.getIntermediateResult().getId(), descriptor);
- sendUpdateTaskRpcCall(consumerSlot, updateTaskMessage);
+ sendUpdatePartitionInfoRpcCall(consumerSlot, updateTaskMessage);
}
// ----------------------------------------------------------------
// Consumer is scheduled or deploying => cache input channel
@@ -689,11 +701,12 @@ public class Execution implements Serializable {
inputChannelDeploymentDescriptors.add(partialInputChannelDeploymentDescriptor.createInputChannelDeploymentDescriptor(this));
}
- UpdatePartitionInfo updateTaskMessage =
- createUpdateTaskMultiplePartitionInfos(attemptId, resultIDs,
- inputChannelDeploymentDescriptors);
+ UpdatePartitionInfo updateTaskMessage = createUpdateTaskMultiplePartitionInfos(
+ attemptId,
+ resultIDs,
+ inputChannelDeploymentDescriptors);
- sendUpdateTaskRpcCall(assignedResource, updateTaskMessage);
+ sendUpdatePartitionInfoRpcCall(assignedResource, updateTaskMessage);
}
}
@@ -804,14 +817,23 @@ public class Execution implements Serializable {
}
}
+ /**
+ * This method sends a CancelTask message to the instance of the assigned slot.
+ *
+ * The sending is tried up to NUM_CANCEL_CALL_TRIES times.
+ */
private void sendCancelRpcCall() {
final SimpleSlot slot = this.assignedResource;
if (slot != null) {
- Future<Object> cancelResult = AkkaUtils.retry(slot.getInstance().getTaskManager(), new
- CancelTask(attemptId), NUM_CANCEL_CALL_TRIES,
- AkkaUtils.globalExecutionContext(), timeout);
+ final InstanceGateway gateway = slot.getInstance().getInstanceGateway();
+
+ Future<Object> cancelResult = gateway.retry(
+ new CancelTask(attemptId),
+ NUM_CANCEL_CALL_TRIES,
+ timeout,
+ executionContext);
cancelResult.onComplete(new OnComplete<Object>() {
@@ -827,35 +849,40 @@ public class Execution implements Serializable {
}
}
}
- }, AkkaUtils.globalExecutionContext());
+ }, executionContext);
}
}
- private void sendFailIntermediateResultPartitionsRPCCall() {
+ private void sendFailIntermediateResultPartitionsRpcCall() {
final SimpleSlot slot = this.assignedResource;
if (slot != null) {
final Instance instance = slot.getInstance();
if (instance.isAlive()) {
- try {
- // TODO For some tests this could be a problem when querying too early if all resources were released
- instance.getTaskManager().tell(new FailIntermediateResultPartitions(attemptId), ActorRef.noSender());
- } catch (Throwable t) {
- fail(new Exception("Intermediate result partition could not be failed.", t));
- }
+ final InstanceGateway gateway = instance.getInstanceGateway();
+
+ // TODO For some tests this could be a problem when querying too early if all resources were released
+ gateway.tell(new FailIntermediateResultPartitions(attemptId));
}
}
}
- private void sendUpdateTaskRpcCall(final SimpleSlot consumerSlot,
- final UpdatePartitionInfo updateTaskMsg) {
+ /**
+ * Sends an UpdatePartitionInfo message to the instance of the consumerSlot.
+ *
+ * @param consumerSlot Slot to whose instance the message will be sent
+ * @param updatePartitionInfo UpdatePartitionInfo message
+ */
+ private void sendUpdatePartitionInfoRpcCall(
+ final SimpleSlot consumerSlot,
+ final UpdatePartitionInfo updatePartitionInfo) {
if (consumerSlot != null) {
final Instance instance = consumerSlot.getInstance();
+ final InstanceGateway gateway = instance.getInstanceGateway();
- Future<Object> futureUpdate = Patterns.ask(instance.getTaskManager(), updateTaskMsg,
- new Timeout(timeout));
+ Future<Object> futureUpdate = gateway.ask(updatePartitionInfo, timeout);
futureUpdate.onFailure(new OnFailure() {
@Override
@@ -863,7 +890,7 @@ public class Execution implements Serializable {
fail(new IllegalStateException("Update task on instance " + instance +
" failed due to:", failure));
}
- }, AkkaUtils.globalExecutionContext());
+ }, executionContext);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 84cbab7..47b7ae2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -24,7 +24,6 @@ import akka.actor.ActorSystem;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.execution.ExecutionState;
@@ -45,6 +44,7 @@ import org.apache.flink.util.InstantiationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.FiniteDuration;
import java.io.Serializable;
@@ -197,6 +197,10 @@ public class ExecutionGraph implements Serializable {
@SuppressWarnings("NonSerializableFieldInSerializableClass")
private CheckpointCoordinator checkpointCoordinator;
+ /** The execution context which is used to execute futures. */
+ @SuppressWarnings("NonSerializableFieldInSerializableClass")
+ private ExecutionContext executionContext;
+
// ------ Fields that are only relevant for archived execution graphs ------------
private ExecutionConfig executionConfig;
@@ -207,17 +211,38 @@ public class ExecutionGraph implements Serializable {
/**
* This constructor is for tests only, because it does not include class loading information.
*/
- ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig, FiniteDuration timeout) {
- this(jobId, jobName, jobConfig, timeout, new ArrayList<BlobKey>(), ExecutionGraph.class.getClassLoader());
- }
-
- public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig, FiniteDuration timeout,
- List<BlobKey> requiredJarFiles, ClassLoader userClassLoader) {
-
- if (jobId == null || jobName == null || jobConfig == null || userClassLoader == null) {
+ ExecutionGraph(
+ ExecutionContext executionContext,
+ JobID jobId,
+ String jobName,
+ Configuration jobConfig,
+ FiniteDuration timeout) {
+ this(
+ executionContext,
+ jobId,
+ jobName,
+ jobConfig,
+ timeout,
+ new ArrayList<BlobKey>(),
+ ExecutionGraph.class.getClassLoader()
+ );
+ }
+
+ public ExecutionGraph(
+ ExecutionContext executionContext,
+ JobID jobId,
+ String jobName,
+ Configuration jobConfig,
+ FiniteDuration timeout,
+ List<BlobKey> requiredJarFiles,
+ ClassLoader userClassLoader) {
+
+ if (executionContext == null || jobId == null || jobName == null || jobConfig == null || userClassLoader == null) {
throw new NullPointerException();
}
+ this.executionContext = executionContext;
+
this.jobID = jobId;
this.jobName = jobName;
this.jobConfiguration = jobConfig;
@@ -451,6 +476,15 @@ public class ExecutionGraph implements Serializable {
return this.stateTimestamps[status.ordinal()];
}
+ /**
+ * Returns the ExecutionContext associated with this ExecutionGraph.
+ *
+ * @return ExecutionContext associated with this ExecutionGraph
+ */
+ public ExecutionContext getExecutionContext() {
+ return executionContext;
+ }
+
// --------------------------------------------------------------------------------------------
// Actions
// --------------------------------------------------------------------------------------------
@@ -629,6 +663,7 @@ public class ExecutionGraph implements Serializable {
userClassLoader = null;
scheduler = null;
checkpointCoordinator = null;
+ executionContext = null;
for (ExecutionJobVertex vertex : verticesInCreationOrder) {
vertex.prepareForArchiving();
@@ -719,7 +754,7 @@ public class ExecutionGraph implements Serializable {
restart();
return null;
}
- }, AkkaUtils.globalExecutionContext());
+ }, executionContext);
break;
}
else if (numberOfRetriesLeft <= 0 && transitionState(current, JobStatus.FAILED, failureCause)) {
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index a70fa7d..f9001cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -18,8 +18,6 @@
package org.apache.flink.runtime.executiongraph;
-import akka.actor.ActorRef;
-
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
@@ -30,6 +28,7 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.instance.InstanceGateway;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -101,14 +100,20 @@ public class ExecutionVertex implements Serializable {
// --------------------------------------------------------------------------------------------
- public ExecutionVertex(ExecutionJobVertex jobVertex, int subTaskIndex,
- IntermediateResult[] producedDataSets, FiniteDuration timeout) {
+ public ExecutionVertex(
+ ExecutionJobVertex jobVertex,
+ int subTaskIndex,
+ IntermediateResult[] producedDataSets,
+ FiniteDuration timeout) {
this(jobVertex, subTaskIndex, producedDataSets, timeout, System.currentTimeMillis());
}
- public ExecutionVertex(ExecutionJobVertex jobVertex, int subTaskIndex,
- IntermediateResult[] producedDataSets, FiniteDuration timeout,
- long createTimestamp) {
+ public ExecutionVertex(
+ ExecutionJobVertex jobVertex,
+ int subTaskIndex,
+ IntermediateResult[] producedDataSets,
+ FiniteDuration timeout,
+ long createTimestamp) {
this.jobVertex = jobVertex;
this.subTaskIndex = subTaskIndex;
@@ -125,7 +130,12 @@ public class ExecutionVertex implements Serializable {
this.priorExecutions = new CopyOnWriteArrayList<Execution>();
- this.currentExecution = new Execution(this, 0, createTimestamp, timeout);
+ this.currentExecution = new Execution(
+ getExecutionGraph().getExecutionContext(),
+ this,
+ 0,
+ createTimestamp,
+ timeout);
// create a co-location scheduling hint, if necessary
CoLocationGroup clg = jobVertex.getCoLocationGroup();
@@ -416,8 +426,12 @@ public class ExecutionVertex implements Serializable {
if (state == FINISHED || state == CANCELED || state == FAILED) {
priorExecutions.add(execution);
- currentExecution = new Execution(this, execution.getAttemptNumber()+1,
- System.currentTimeMillis(), timeout);
+ currentExecution = new Execution(
+ getExecutionGraph().getExecutionContext(),
+ this,
+ execution.getAttemptNumber()+1,
+ System.currentTimeMillis(),
+ timeout);
CoLocationGroup grp = jobVertex.getCoLocationGroup();
if (grp != null) {
@@ -455,9 +469,9 @@ public class ExecutionVertex implements Serializable {
// send only if we actually have a target
if (slot != null) {
- ActorRef taskManager = slot.getInstance().getTaskManager();
- if (taskManager != null) {
- taskManager.tell(message, ActorRef.noSender());
+ InstanceGateway gateway = slot.getInstance().getInstanceGateway();
+ if (gateway != null) {
+ gateway.tell(message);
}
}
else {
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaInstanceGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaInstanceGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaInstanceGateway.java
new file mode 100644
index 0000000..b7d60c5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaInstanceGateway.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import akka.actor.ActorRef;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * InstanceGateway implementation which uses Akka to communicate with remote instances.
+ */
+public class AkkaInstanceGateway implements InstanceGateway {
+
+ /** ActorRef of the remote instance */
+ private final ActorRef taskManager;
+
+ public AkkaInstanceGateway(ActorRef taskManager) {
+ this.taskManager = taskManager;
+ }
+
+ /**
+ * Sends a message asynchronously and returns its response. The response to the message is
+ * returned as a future.
+ *
+ * @param message Message to be sent
+ * @param timeout Timeout until the Future is completed with an AskTimeoutException
+ * @return Future which contains the response to the sent message
+ */
+ @Override
+ public Future<Object> ask(Object message, FiniteDuration timeout) {
+ return Patterns.ask(taskManager, message, new Timeout(timeout));
+ }
+
+ /**
+ * Sends a message asynchronously without a result.
+ *
+ * @param message Message to be sent
+ */
+ @Override
+ public void tell(Object message) {
+ taskManager.tell(message, ActorRef.noSender());
+ }
+
+ /**
+ * Forwards a message. For the receiver of this message it looks as if sender has sent the
+ * message.
+ *
+ * @param message Message to be sent
+ * @param sender Sender of the forwarded message
+ */
+ @Override
+ public void forward(Object message, ActorRef sender) {
+ taskManager.tell(message, sender);
+ }
+
+ /**
+ * Retries to send asynchronously a message up to numberRetries times. The response to this
+ * message is returned as a future. The message is re-sent if the number of retries is not yet
+ * exceeded and if an exception occurred while sending it.
+ *
+ * @param message Message to be sent
+ * @param numberRetries Number of times to retry sending the message
+ * @param timeout Timeout for each sending attempt
+ * @param executionContext ExecutionContext which is used to send the message multiple times
+ * @return Future of the response to the sent message
+ */
+ @Override
+ public Future<Object> retry(
+ Object message,
+ int numberRetries,
+ FiniteDuration timeout,
+ ExecutionContext executionContext) {
+
+ return AkkaUtils.retry(
+ taskManager,
+ message,
+ numberRetries,
+ executionContext,
+ timeout);
+ }
+
+ /**
+ * Returns the ActorPath of the remote instance.
+ *
+ * @return ActorPath of the remote instance.
+ */
+ @Override
+ public String path() {
+ return taskManager.path().toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index 39caf08..1c44b5d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -25,8 +25,6 @@ import java.util.List;
import java.util.Queue;
import java.util.Set;
-import akka.actor.ActorRef;
-
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener;
import org.slf4j.Logger;
@@ -43,8 +41,8 @@ public class Instance {
/** The lock on which to synchronize allocations and failure state changes */
private final Object instanceLock = new Object();
- /** The actor ref to the task manager represented by this taskManager. */
- private final ActorRef taskManager;
+ /** The instacne gateway to communicate with the instance */
+ private final InstanceGateway instanceGateway;
/** The instance connection information for the data transfer. */
private final InstanceConnectionInfo connectionInfo;
@@ -81,15 +79,19 @@ public class Instance {
/**
* Constructs an instance reflecting a registered TaskManager.
*
- * @param taskManager The actor reference of the represented task manager.
+ * @param instanceGateway The instance gateway to communicate with the remote instance
* @param connectionInfo The remote connection where the task manager receives requests.
* @param id The id under which the taskManager is registered.
* @param resources The resources available on the machine.
* @param numberOfSlots The number of task slots offered by this taskManager.
*/
- public Instance(ActorRef taskManager, InstanceConnectionInfo connectionInfo, InstanceID id,
- HardwareDescription resources, int numberOfSlots) {
- this.taskManager = taskManager;
+ public Instance(
+ InstanceGateway instanceGateway,
+ InstanceConnectionInfo connectionInfo,
+ InstanceID id,
+ HardwareDescription resources,
+ int numberOfSlots) {
+ this.instanceGateway = instanceGateway;
this.connectionInfo = connectionInfo;
this.instanceId = id;
this.resources = resources;
@@ -327,12 +329,14 @@ public class Instance {
}
}
- public ActorRef getTaskManager() {
- return taskManager;
- }
-
- public String getPath(){
- return taskManager.path().toString();
+ /**
+ * Returns the InstanceGateway of this Instance. This gateway can be used to communicate with
+ * it.
+ *
+ * @return InstanceGateway associated with this instance
+ */
+ public InstanceGateway getInstanceGateway() {
+ return instanceGateway;
}
public InstanceConnectionInfo getInstanceConnectionInfo() {
@@ -386,6 +390,6 @@ public class Instance {
@Override
public String toString() {
return String.format("%s @ %s - %d slots - URL: %s", instanceId, connectionInfo.getHostname(),
- numberOfSlots, (taskManager != null ? taskManager.path() : "ActorRef.noSender"));
+ numberOfSlots, (instanceGateway != null ? instanceGateway.path() : "No instance gateway"));
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceGateway.java
new file mode 100644
index 0000000..a30b2f6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceGateway.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import akka.actor.ActorRef;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Interface to abstract the communication with an Instance.
+ *
+ * It allows to avoid direct interaction with an ActorRef.
+ */
+public interface InstanceGateway {
+
+ /**
+ * Sends a message asynchronously and returns its response. The response to the message is
+ * returned as a future.
+ *
+ * @param message Message to be sent
+ * @param timeout Timeout until the Future is completed with an AskTimeoutException
+ * @return Future which contains the response to the sent message
+ */
+ Future<Object> ask(Object message, FiniteDuration timeout);
+
+ /**
+ * Sends a message asynchronously without a result.
+ *
+ * @param message Message to be sent
+ */
+ void tell(Object message);
+
+ /**
+ * Forwards a message. For the receiver of this message it looks as if sender has sent the
+ * message.
+ *
+ * @param message Message to be sent
+ * @param sender Sender of the forwarded message
+ */
+ void forward(Object message, ActorRef sender);
+
+ /**
+ * Retries to send asynchronously a message up to numberRetries times. The response to this
+ * message is returned as a future. The message is re-sent if the number of retries is not yet
+ * exceeded and if an exception occurred while sending it.
+ *
+ * @param message Message to be sent
+ * @param numberRetries Number of times to retry sending the message
+ * @param timeout Timeout for each sending attempt
+ * @param executionContext ExecutionContext which is used to send the message multiple times
+ * @return Future of the response to the sent message
+ */
+ Future<Object> retry(
+ Object message,
+ int numberRetries,
+ FiniteDuration timeout,
+ ExecutionContext executionContext);
+
+ /**
+ * Returns the path of the remote instance.
+ *
+ * @return Path of the remote instance.
+ */
+ String path();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
index c1800bd..4f6c7a6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
@@ -149,8 +149,9 @@ public class InstanceManager {
id = new InstanceID();
} while (registeredHostsById.containsKey(id));
+ InstanceGateway instanceGateway = new AkkaInstanceGateway(taskManager);
- Instance host = new Instance(taskManager, connectionInfo, id, resources, numberOfSlots);
+ Instance host = new Instance(instanceGateway, connectionInfo, id, resources, numberOfSlots);
registeredHostsById.put(id, host);
registeredHostsByConnection.put(taskManager, host);
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index c082c6a..0ffc889 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -23,7 +23,6 @@ import akka.dispatch.OnFailure;
import akka.pattern.Patterns;
import akka.util.Timeout;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
@@ -46,6 +45,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Tuple2;
+import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
@@ -89,11 +89,20 @@ public class NetworkEnvironment {
private boolean isShutdown;
/**
+ * ExecutionEnvironment which is used to execute remote calls with the
+ * {@link JobManagerResultPartitionConsumableNotifier}
+ */
+ private final ExecutionContext executionContext;
+
+ /**
* Initializes all network I/O components.
*/
- public NetworkEnvironment(FiniteDuration jobManagerTimeout,
- NetworkEnvironmentConfiguration config) throws IOException {
+ public NetworkEnvironment(
+ ExecutionContext executionContext,
+ FiniteDuration jobManagerTimeout,
+ NetworkEnvironmentConfiguration config) throws IOException {
+ this.executionContext = executionContext;
this.configuration = checkNotNull(config);
this.jobManagerTimeout = checkNotNull(jobManagerTimeout);
@@ -182,7 +191,10 @@ public class NetworkEnvironment {
this.partitionManager = new ResultPartitionManager();
this.taskEventDispatcher = new TaskEventDispatcher();
this.partitionConsumableNotifier = new JobManagerResultPartitionConsumableNotifier(
- jobManagerRef, taskManagerRef, new Timeout(jobManagerTimeout));
+ executionContext,
+ jobManagerRef,
+ taskManagerRef,
+ new Timeout(jobManagerTimeout));
this.partitionStateChecker = new JobManagerPartitionStateChecker(
jobManagerRef, taskManagerRef);
@@ -414,6 +426,12 @@ public class NetworkEnvironment {
*/
private static class JobManagerResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier {
+ /**
+ * {@link ExecutionContext} which is used for the failure handler of {@link ScheduleOrUpdateConsumers}
+ * messages.
+ */
+ private final ExecutionContext executionContext;
+
private final ActorRef jobManager;
private final ActorRef taskManager;
@@ -421,8 +439,12 @@ public class NetworkEnvironment {
private final Timeout jobManagerMessageTimeout;
public JobManagerResultPartitionConsumableNotifier(
- ActorRef jobManager, ActorRef taskManager, Timeout jobManagerMessageTimeout) {
+ ExecutionContext executionContext,
+ ActorRef jobManager,
+ ActorRef taskManager,
+ Timeout jobManagerMessageTimeout) {
+ this.executionContext = executionContext;
this.jobManager = jobManager;
this.taskManager = taskManager;
this.jobManagerMessageTimeout = jobManagerMessageTimeout;
@@ -448,7 +470,7 @@ public class NetworkEnvironment {
taskManager.tell(failMsg, ActorRef.noSender());
}
- }, AkkaUtils.globalExecutionContext());
+ }, executionContext);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 940082e..cb99e52 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -37,7 +37,6 @@ import akka.dispatch.Futures;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.instance.SlotSharingGroupAssignment;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.instance.SharedSlot;
@@ -50,6 +49,7 @@ import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.ExecutionContext;
/**
* The scheduler is responsible for distributing the ready-to-run tasks among instances and slots.
@@ -95,12 +95,17 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
/** The number of slot allocations where locality could not be respected */
private int nonLocalizedAssignments;
+ /** The ExecutionContext which is used to execute newSlotAvailable futures. */
+ private final ExecutionContext executionContext;
+
// ------------------------------------------------------------------------
/**
* Creates a new scheduler.
*/
- public Scheduler() {}
+ public Scheduler(ExecutionContext executionContext) {
+ this.executionContext = executionContext;
+ }
/**
* Shuts the scheduler down. After shut down no more tasks can be added to the scheduler.
@@ -519,7 +524,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
handleNewSlot();
return null;
}
- }, AkkaUtils.globalExecutionContext());
+ }, executionContext);
}
private void handleNewSlot() {
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
index 4e028d4..567d15a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
@@ -146,8 +146,7 @@ public class SetupInfoServlet extends HttpServlet {
long time = new Date().getTime() - instance.getLastHeartBeat();
try {
- objInner.put("inetAdress", instance.getInstanceConnectionInfo().getInetAdress());
- objInner.put("ipcPort", instance.getTaskManager().path().address().hostPort());
+ objInner.put("path", instance.getInstanceGateway().path());
objInner.put("dataPort", instance.getInstanceConnectionInfo().dataPort());
objInner.put("timeSinceLastHeartbeat", time / 1000);
objInner.put("slotsNumber", instance.getTotalNumberOfSlots());
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/resources/web-docs-infoserver/js/taskmanager.js
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/resources/web-docs-infoserver/js/taskmanager.js b/flink-runtime/src/main/resources/web-docs-infoserver/js/taskmanager.js
index 1ea9a41..68e4278 100644
--- a/flink-runtime/src/main/resources/web-docs-infoserver/js/taskmanager.js
+++ b/flink-runtime/src/main/resources/web-docs-infoserver/js/taskmanager.js
@@ -224,7 +224,7 @@ function processTMdata(json) {
"</div>";
var content = "<tr id=\""+tmRowIdCssName+"\">" +
- "<td style=\"width:20%\">"+tm.inetAdress+" <br> IPC Port: "+tm.ipcPort+", Data Port: "+tm.dataPort+"</td>" + // first row: TaskManager
+ "<td style=\"width:20%\">"+tm.path+" <br> Data Port: "+tm.dataPort+"</td>" + // first row: TaskManager
"<td id=\""+tmRowIdCssName+"-memory\">"+tmMemoryBox+"</td>" + // second row: memory statistics
"<td id=\""+tmRowIdCssName+"-info\"><i>Loading Information</i></td>" + // Information
"</tr>";
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 7ffaddd..d38e503 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -41,8 +41,6 @@ object AkkaUtils {
val INF_TIMEOUT = 21474835 seconds
- var globalExecutionContext: ExecutionContext = ExecutionContext.global
-
/**
* Creates a local actor system without remoting.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index dc1599a..3b4ce15 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -60,6 +60,7 @@ import org.apache.flink.util.{ExceptionUtils, InstantiationUtil}
import scala.collection.JavaConverters._
import scala.concurrent._
import scala.concurrent.duration._
+import scala.concurrent.forkjoin.ForkJoinPool
import scala.language.postfixOps
/**
@@ -89,16 +90,18 @@ import scala.language.postfixOps
* - [[JobStatusChanged]] indicates that the status of job (RUNNING, CANCELING, FINISHED, etc.) has
* changed. This message is sent by the ExecutionGraph.
*/
-class JobManager(protected val flinkConfiguration: Configuration,
- protected val instanceManager: InstanceManager,
- protected val scheduler: FlinkScheduler,
- protected val libraryCacheManager: BlobLibraryCacheManager,
- protected val archive: ActorRef,
- protected val accumulatorManager: AccumulatorManager,
- protected val defaultExecutionRetries: Int,
- protected val delayBetweenRetries: Long,
- protected val timeout: FiniteDuration,
- protected val mode: StreamingMode)
+class JobManager(
+ protected val flinkConfiguration: Configuration,
+ protected val executionContext: ExecutionContext,
+ protected val instanceManager: InstanceManager,
+ protected val scheduler: FlinkScheduler,
+ protected val libraryCacheManager: BlobLibraryCacheManager,
+ protected val archive: ActorRef,
+ protected val accumulatorManager: AccumulatorManager,
+ protected val defaultExecutionRetries: Int,
+ protected val delayBetweenRetries: Long,
+ protected val timeout: FiniteDuration,
+ protected val mode: StreamingMode)
extends Actor with ActorLogMessages with ActorSynchronousLogging {
/** List of current jobs running jobs */
@@ -117,7 +120,7 @@ class JobManager(protected val flinkConfiguration: Configuration,
// disconnect the registered task managers
instanceManager.getAllRegisteredInstances.asScala.foreach {
- _.getTaskManager ! Disconnect("JobManager is shutting down")
+ _.getInstanceGateway().tell(Disconnect("JobManager is shutting down"))
}
archive ! PoisonPill
@@ -136,7 +139,6 @@ class JobManager(protected val flinkConfiguration: Configuration,
}
log.debug(s"Job manager ${self.path} is completely stopped.")
-
}
/**
@@ -411,8 +413,8 @@ class JobManager(protected val flinkConfiguration: Configuration,
case message: AccumulatorMessage => handleAccumulatorMessage(message)
case RequestStackTrace(instanceID) =>
- val taskManager = instanceManager.getRegisteredInstanceById(instanceID).getTaskManager
- taskManager forward SendStackTrace
+ val gateway = instanceManager.getRegisteredInstanceById(instanceID).getInstanceGateway
+ gateway.forward(SendStackTrace, sender)
case Terminated(taskManager) =>
if (instanceManager.isRegistered(taskManager)) {
@@ -480,10 +482,18 @@ class JobManager(protected val flinkConfiguration: Configuration,
}
// see if there already exists an ExecutionGraph for the corresponding job ID
- executionGraph = currentJobs.getOrElseUpdate(jobGraph.getJobID,
- (new ExecutionGraph(jobGraph.getJobID, jobGraph.getName,
- jobGraph.getJobConfiguration, timeout, jobGraph.getUserJarBlobKeys, userCodeLoader),
- JobInfo(sender(), System.currentTimeMillis())))._1
+ executionGraph = currentJobs.getOrElseUpdate(
+ jobGraph.getJobID,
+ (new ExecutionGraph(
+ executionContext,
+ jobGraph.getJobID,
+ jobGraph.getName,
+ jobGraph.getJobConfiguration,
+ timeout,
+ jobGraph.getUserJarBlobKeys,
+ userCodeLoader),
+ JobInfo(sender(), System.currentTimeMillis()))
+ )._1
// configure the execution graph
val jobNumberRetries = if (jobGraph.getNumberOfExecutionRetries >= 0) {
@@ -1046,8 +1056,8 @@ object JobManager {
* @param configuration The configuration from which to parse the config values.
* @return The members for a default JobManager.
*/
- def createJobManagerComponents(configuration: Configuration) :
- (InstanceManager, FlinkScheduler, BlobLibraryCacheManager,
+ def createJobManagerComponents(configuration: Configuration)
+ : (ExecutionContext, InstanceManager, FlinkScheduler, BlobLibraryCacheManager,
Props, AccumulatorManager, Int, Long, FiniteDuration, Int) = {
val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration)
@@ -1083,6 +1093,8 @@ object JobManager {
val accumulatorManager: AccumulatorManager = new AccumulatorManager(Math.min(1, archiveCount))
+ val executionContext = ExecutionContext.fromExecutor(new ForkJoinPool())
+
var blobServer: BlobServer = null
var instanceManager: InstanceManager = null
var scheduler: FlinkScheduler = null
@@ -1091,7 +1103,7 @@ object JobManager {
try {
blobServer = new BlobServer(configuration)
instanceManager = new InstanceManager()
- scheduler = new FlinkScheduler()
+ scheduler = new FlinkScheduler(executionContext)
libraryCacheManager = new BlobLibraryCacheManager(blobServer, cleanupInterval)
instanceManager.addInstanceListener(scheduler)
@@ -1114,8 +1126,16 @@ object JobManager {
}
}
- (instanceManager, scheduler, libraryCacheManager, archiveProps, accumulatorManager,
- executionRetries, delayBetweenRetries, timeout, archiveCount)
+ (executionContext,
+ instanceManager,
+ scheduler,
+ libraryCacheManager,
+ archiveProps,
+ accumulatorManager,
+ executionRetries,
+ delayBetweenRetries,
+ timeout,
+ archiveCount)
}
/**
@@ -1154,9 +1174,16 @@ object JobManager {
archiverActorName: Option[String],
streamingMode: StreamingMode): (ActorRef, ActorRef) = {
- val (instanceManager, scheduler, libraryCacheManager, archiveProps, accumulatorManager,
- executionRetries, delayBetweenRetries,
- timeout, _) = createJobManagerComponents(configuration)
+ val (executionContext,
+ instanceManager,
+ scheduler,
+ libraryCacheManager,
+ archiveProps,
+ accumulatorManager,
+ executionRetries,
+ delayBetweenRetries,
+ timeout,
+ _) = createJobManagerComponents(configuration)
// start the archiver with the given name, or without (avoid name conflicts)
val archiver: ActorRef = archiverActorName match {
@@ -1164,9 +1191,19 @@ object JobManager {
case None => actorSystem.actorOf(archiveProps)
}
- val jobManagerProps = Props(classOf[JobManager], configuration, instanceManager, scheduler,
- libraryCacheManager, archiver, accumulatorManager, executionRetries,
- delayBetweenRetries, timeout, streamingMode)
+ val jobManagerProps = Props(
+ classOf[JobManager],
+ configuration,
+ executionContext,
+ instanceManager,
+ scheduler,
+ libraryCacheManager,
+ archiver,
+ accumulatorManager,
+ executionRetries,
+ delayBetweenRetries,
+ timeout,
+ streamingMode)
val jobManager: ActorRef = jobMangerActorName match {
case Some(actorName) => actorSystem.actorOf(jobManagerProps, actorName)
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 73a37de..49c701e 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -34,7 +34,7 @@ import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegistere
import org.slf4j.LoggerFactory
import scala.concurrent.duration.FiniteDuration
-import scala.concurrent.{Future, Await}
+import scala.concurrent.{ExecutionContext, Future, Await}
/**
* Abstract base class for Flink's mini cluster. The mini cluster starts a
@@ -48,9 +48,10 @@ import scala.concurrent.{Future, Await}
* @param streamingMode True, if the system should be started in streaming mode, false if
* in pure batch mode.
*/
-abstract class FlinkMiniCluster(val userConfiguration: Configuration,
- val singleActorSystem: Boolean,
- val streamingMode: StreamingMode) {
+abstract class FlinkMiniCluster(
+ val userConfiguration: Configuration,
+ val singleActorSystem: Boolean,
+ val streamingMode: StreamingMode) {
def this(userConfiguration: Configuration, singleActorSystem: Boolean)
= this(userConfiguration, singleActorSystem, StreamingMode.BATCH_ONLY)
@@ -157,7 +158,7 @@ abstract class FlinkMiniCluster(val userConfiguration: Configuration,
val future = gracefulStop(jobManagerActor, timeout)
- implicit val executionContext = AkkaUtils.globalExecutionContext
+ implicit val executionContext = ExecutionContext.global
Await.ready(Future.sequence(future +: futures), timeout)
@@ -179,7 +180,7 @@ abstract class FlinkMiniCluster(val userConfiguration: Configuration,
}
def waitForTaskManagersToBeRegistered(): Unit = {
- implicit val executionContext = AkkaUtils.globalExecutionContext
+ implicit val executionContext = ExecutionContext.global
val futures = taskManagerActors map {
taskManager => (taskManager ? NotifyWhenRegisteredAtJobManager)(timeout)
@@ -196,8 +197,11 @@ abstract class FlinkMiniCluster(val userConfiguration: Configuration,
}
@throws(classOf[JobExecutionException])
- def submitJobAndWait(jobGraph: JobGraph, printUpdates: Boolean, timeout: FiniteDuration)
- : SerializedJobExecutionResult = {
+ def submitJobAndWait(
+ jobGraph: JobGraph,
+ printUpdates: Boolean,
+ timeout: FiniteDuration)
+ : SerializedJobExecutionResult = {
val clientActorSystem = if (singleActorSystem) jobManagerActorSystem
else JobClient.startJobClientActorSystem(configuration)
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 520decd..44a0b04 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -65,6 +65,7 @@ import org.apache.flink.runtime.util.{ZooKeeperUtil, MathUtils, EnvironmentInfor
import scala.concurrent._
import scala.concurrent.duration._
+import scala.concurrent.forkjoin.ForkJoinPool
import scala.util.{Failure, Success}
import scala.collection.JavaConverters._
@@ -1363,14 +1364,16 @@ object TaskManager {
@throws(classOf[IllegalConfigurationException])
@throws(classOf[IOException])
@throws(classOf[Exception])
- def startTaskManagerComponentsAndActor(configuration: Configuration,
- actorSystem: ActorSystem,
- taskManagerHostname: String,
- taskManagerActorName: Option[String],
- jobManagerPath: Option[String],
- localTaskManagerCommunication: Boolean,
- streamingMode: StreamingMode,
- taskManagerClass: Class[_ <: TaskManager]): ActorRef = {
+ def startTaskManagerComponentsAndActor(
+ configuration: Configuration,
+ actorSystem: ActorSystem,
+ taskManagerHostname: String,
+ taskManagerActorName: Option[String],
+ jobManagerPath: Option[String],
+ localTaskManagerCommunication: Boolean,
+ streamingMode: StreamingMode,
+ taskManagerClass: Class[_ <: TaskManager])
+ : ActorRef = {
// get and check the JobManager config
val jobManagerAkkaUrl: String = jobManagerPath.getOrElse {
@@ -1380,17 +1383,20 @@ object TaskManager {
}
val (taskManagerConfig : TaskManagerConfiguration,
- netConfig: NetworkEnvironmentConfiguration,
- connectionInfo: InstanceConnectionInfo)
-
- = parseTaskManagerConfiguration(configuration, taskManagerHostname,
- localTaskManagerCommunication)
+ netConfig: NetworkEnvironmentConfiguration,
+ connectionInfo: InstanceConnectionInfo
+ ) = parseTaskManagerConfiguration(
+ configuration,
+ taskManagerHostname,
+ localTaskManagerCommunication)
// pre-start checks
checkTempDirs(taskManagerConfig.tmpDirPaths)
+ val executionContext = ExecutionContext.fromExecutor(new ForkJoinPool())
+
// we start the network first, to make sure it can allocate its buffers first
- val network = new NetworkEnvironment(taskManagerConfig.timeout, netConfig)
+ val network = new NetworkEnvironment(executionContext, taskManagerConfig.timeout, netConfig)
// computing the amount of memory to use depends on how much memory is available
// it strictly needs to happen AFTER the network stack has been initialized
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
index 693e014..1e66d81 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
@@ -43,6 +44,7 @@ public class AllVerticesIteratorTest {
v4.setParallelism(2);
ExecutionGraph eg = Mockito.mock(ExecutionGraph.class);
+ Mockito.when(eg.getExecutionContext()).thenReturn(TestingUtils.directExecutionContext());
ExecutionJobVertex ejv1 = new ExecutionJobVertex(eg, v1, 1,
AkkaUtils.getDefaultTimeout());
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
index a4bd03c..a47ea77 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.when;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.Test;
import org.mockito.Matchers;
@@ -99,7 +100,12 @@ public class ExecutionGraphConstructionTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
- ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout());
+ ExecutionGraph eg = new ExecutionGraph(
+ TestingUtils.defaultExecutionContext(),
+ jobId,
+ jobName,
+ cfg,
+ AkkaUtils.getDefaultTimeout());
try {
eg.attachJobGraph(ordered);
}
@@ -137,7 +143,12 @@ public class ExecutionGraphConstructionTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
- ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout());
+ ExecutionGraph eg = new ExecutionGraph(
+ TestingUtils.defaultExecutionContext(),
+ jobId,
+ jobName,
+ cfg,
+ AkkaUtils.getDefaultTimeout());
try {
eg.attachJobGraph(ordered);
}
@@ -198,7 +209,12 @@ public class ExecutionGraphConstructionTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
- ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout());
+ ExecutionGraph eg = new ExecutionGraph(
+ TestingUtils.defaultExecutionContext(),
+ jobId,
+ jobName,
+ cfg,
+ AkkaUtils.getDefaultTimeout());
try {
eg.attachJobGraph(ordered);
}
@@ -446,7 +462,12 @@ public class ExecutionGraphConstructionTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1));
- ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout());
+ ExecutionGraph eg = new ExecutionGraph(
+ TestingUtils.defaultExecutionContext(),
+ jobId,
+ jobName,
+ cfg,
+ AkkaUtils.getDefaultTimeout());
try {
eg.attachJobGraph(ordered);
}
@@ -496,7 +517,12 @@ public class ExecutionGraphConstructionTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v5, v4));
- ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout());
+ ExecutionGraph eg = new ExecutionGraph(
+ TestingUtils.defaultExecutionContext(),
+ jobId,
+ jobName,
+ cfg,
+ AkkaUtils.getDefaultTimeout());
try {
eg.attachJobGraph(ordered);
fail("Attached wrong jobgraph");
@@ -551,7 +577,11 @@ public class ExecutionGraphConstructionTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
- ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg,
+ ExecutionGraph eg = new ExecutionGraph(
+ TestingUtils.defaultExecutionContext(),
+ jobId,
+ jobName,
+ cfg,
AkkaUtils.getDefaultTimeout());
try {
eg.attachJobGraph(ordered);
@@ -591,7 +621,11 @@ public class ExecutionGraphConstructionTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
- ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg,
+ ExecutionGraph eg = new ExecutionGraph(
+ TestingUtils.defaultExecutionContext(),
+ jobId,
+ jobName,
+ cfg,
AkkaUtils.getDefaultTimeout());
try {
@@ -657,7 +691,11 @@ public class ExecutionGraphConstructionTest {
JobGraph jg = new JobGraph(jobId, jobName, v1, v2, v3, v4, v5, v6, v7, v8);
- ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg,
+ ExecutionGraph eg = new ExecutionGraph(
+ TestingUtils.defaultExecutionContext(),
+ jobId,
+ jobName,
+ cfg,
AkkaUtils.getDefaultTimeout());
eg.attachJobGraph(jg.getVerticesSortedTopologicallyFromSources());
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 03a41b4..cff7146 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -30,12 +30,6 @@ import java.util.Comparator;
import java.util.List;
import java.util.Map;
-import akka.actor.Actor;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestActorRef;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -51,29 +45,13 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.operators.RegularPactTask;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
import org.junit.Test;
public class ExecutionGraphDeploymentTest {
- private static ActorSystem system;
-
- @BeforeClass
- public static void setup() {
- system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
- }
-
- @AfterClass
- public static void teardown() {
- JavaTestKit.shutdownActorSystem(system);
- system = null;
- }
-
@Test
public void testBuildDeploymentDescriptor() {
try {
- TestingUtils.setCallingThreadDispatcher(system);
final JobID jobId = new JobID();
final JobVertexID jid1 = new JobVertexID();
@@ -100,7 +78,11 @@ public class ExecutionGraphDeploymentTest {
v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL);
v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL);
- ExecutionGraph eg = new ExecutionGraph(jobId, "some job", new Configuration(),
+ ExecutionGraph eg = new ExecutionGraph(
+ TestingUtils.defaultExecutionContext(),
+ jobId,
+ "some job",
+ new Configuration(),
AkkaUtils.getDefaultTimeout());
List<JobVertex> ordered = Arrays.asList(v1, v2, v3, v4);
@@ -110,15 +92,9 @@ public class ExecutionGraphDeploymentTest {
ExecutionJobVertex ejv = eg.getAllVertices().get(jid2);
ExecutionVertex vertex = ejv.getTaskVertices()[3];
- // create synchronous task manager
- final TestActorRef<? extends Actor> simpleTaskManager = TestActorRef.create(system,
- Props.create(ExecutionGraphTestUtils
- .SimpleAcknowledgingTaskManager.class));
-
- ExecutionGraphTestUtils.SimpleAcknowledgingTaskManager tm = (ExecutionGraphTestUtils
- .SimpleAcknowledgingTaskManager) simpleTaskManager.underlyingActor();
+ ExecutionGraphTestUtils.SimpleInstanceGateway instanceGateway = new ExecutionGraphTestUtils.SimpleInstanceGateway(TestingUtils.directExecutionContext());
- final Instance instance = getInstance(simpleTaskManager);
+ final Instance instance = getInstance(instanceGateway);
final SimpleSlot slot = instance.allocateSimpleSlot(jobId);
@@ -128,7 +104,7 @@ public class ExecutionGraphDeploymentTest {
assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
- TaskDeploymentDescriptor descr = tm.lastTDD;
+ TaskDeploymentDescriptor descr = instanceGateway.lastTDD;
assertNotNull(descr);
assertEquals(jobId, descr.getJobID());
@@ -152,9 +128,6 @@ public class ExecutionGraphDeploymentTest {
e.printStackTrace();
fail(e.getMessage());
}
- finally {
- TestingUtils.setGlobalExecutionContext();
- }
}
@Test
@@ -307,19 +280,23 @@ public class ExecutionGraphDeploymentTest {
v2.setInvokableClass(RegularPactTask.class);
// execution graph that executes actions synchronously
- ExecutionGraph eg = new ExecutionGraph(jobId, "some job", new Configuration(),
+ ExecutionGraph eg = new ExecutionGraph(
+ TestingUtils.directExecutionContext(),
+ jobId,
+ "some job",
+ new Configuration(),
AkkaUtils.getDefaultTimeout());
eg.setQueuedSchedulingAllowed(false);
List<JobVertex> ordered = Arrays.asList(v1, v2);
eg.attachJobGraph(ordered);
- // create a mock taskmanager that accepts deployment calls
- ActorRef tm = system.actorOf(Props.create(ExecutionGraphTestUtils.SimpleAcknowledgingTaskManager.class));
-
- Scheduler scheduler = new Scheduler();
+ Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
for (int i = 0; i < dop1 + dop2; i++) {
- scheduler.newInstanceAvailable(ExecutionGraphTestUtils.getInstance(tm));
+ scheduler.newInstanceAvailable(
+ ExecutionGraphTestUtils.getInstance(
+ new ExecutionGraphTestUtils.SimpleInstanceGateway(
+ TestingUtils.directExecutionContext())));
}
assertEquals(dop1 + dop2, scheduler.getNumberOfAvailableSlots());
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index a77a09e..8a63060 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -26,17 +26,16 @@ import java.lang.reflect.Field;
import java.net.InetAddress;
import java.util.LinkedList;
-import akka.actor.ActorRef;
-import akka.actor.Status;
-import akka.actor.UntypedActor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.BaseTestingInstanceGateway;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.instance.InstanceGateway;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -49,9 +48,11 @@ import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
import org.apache.flink.runtime.messages.TaskMessages.FailIntermediateResultPartitions;
import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.mockito.Matchers;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import scala.concurrent.ExecutionContext;
public class ExecutionGraphTestUtils {
@@ -100,103 +101,95 @@ public class ExecutionGraphTestUtils {
// utility mocking methods
// --------------------------------------------------------------------------------------------
- public static Instance getInstance(final ActorRef taskManager) throws
- Exception {
- return getInstance(taskManager, 1);
+ public static Instance getInstance(final InstanceGateway gateway) throws Exception {
+ return getInstance(gateway, 1);
}
- public static Instance getInstance(final ActorRef taskManager, final int numberOfSlots) throws Exception {
+ public static Instance getInstance(final InstanceGateway gateway, final int numberOfSlots) throws Exception {
HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
InetAddress address = InetAddress.getByName("127.0.0.1");
InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001);
-
- return new Instance(taskManager, connection, new InstanceID(), hardwareDescription, numberOfSlots);
+
+ return new Instance(gateway, connection, new InstanceID(), hardwareDescription, numberOfSlots);
}
- public static class SimpleAcknowledgingTaskManager extends UntypedActor {
+ public static class SimpleInstanceGateway extends BaseTestingInstanceGateway {
public TaskDeploymentDescriptor lastTDD;
+
+ public SimpleInstanceGateway(ExecutionContext executionContext){
+ super(executionContext);
+ }
+
@Override
- public void onReceive(Object msg) throws Exception {
- if (msg instanceof SubmitTask) {
- SubmitTask submitTask = (SubmitTask) msg;
+ public Object handleMessage(Object message) {
+ Object result = null;
+ if(message instanceof SubmitTask) {
+ SubmitTask submitTask = (SubmitTask) message;
lastTDD = submitTask.tasks();
- getSender().tell(Messages.getAcknowledge(), getSelf());
- } else if (msg instanceof CancelTask) {
- CancelTask cancelTask = (CancelTask) msg;
- getSender().tell(new TaskOperationResult(cancelTask.attemptID(), true), getSelf());
- }
- else if (msg instanceof FailIntermediateResultPartitions) {
- getSender().tell(new Object(), getSelf());
+ result = Messages.getAcknowledge();
+ } else if(message instanceof CancelTask) {
+ CancelTask cancelTask = (CancelTask) message;
+
+ result = new TaskOperationResult(cancelTask.attemptID(), true);
+ } else if(message instanceof FailIntermediateResultPartitions) {
+ result = new Object();
}
+
+ return result;
}
}
- public static final String ERROR_MESSAGE = "test_failure_error_message";
+ public static class SimpleFailingInstanceGateway extends BaseTestingInstanceGateway {
+ public SimpleFailingInstanceGateway(ExecutionContext executionContext) {
+ super(executionContext);
+ }
- public static class SimpleFailingTaskManager extends UntypedActor {
@Override
- public void onReceive(Object msg) throws Exception {
- if (msg instanceof SubmitTask) {
- getSender().tell(new Status.Failure(new Exception(ERROR_MESSAGE)), getSelf());
- } else if (msg instanceof CancelTask) {
- CancelTask cancelTask = (CancelTask) msg;
- getSender().tell(new TaskOperationResult(cancelTask.attemptID(), true), getSelf());
+ public Object handleMessage(Object message) throws Exception {
+ if(message instanceof SubmitTask) {
+ throw new Exception(ERROR_MESSAGE);
+ } else if (message instanceof CancelTask) {
+ CancelTask cancelTask = (CancelTask) message;
+
+ return new TaskOperationResult(cancelTask.attemptID(), true);
+ } else {
+ return null;
}
}
}
-
- public static ExecutionJobVertex getExecutionVertex(JobVertexID id) throws JobException {
+
+ public static final String ERROR_MESSAGE = "test_failure_error_message";
+
+ public static ExecutionJobVertex getExecutionVertex(JobVertexID id, ExecutionContext executionContext) throws JobException {
JobVertex ajv = new JobVertex("TestVertex", id);
ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
-
- ExecutionGraph graph = new ExecutionGraph(new JobID(), "test job", new Configuration(),
+
+ ExecutionGraph graph = new ExecutionGraph(
+ executionContext,
+ new JobID(),
+ "test job",
+ new Configuration(),
AkkaUtils.getDefaultTimeout());
-
+
ExecutionJobVertex ejv = spy(new ExecutionJobVertex(graph, ajv, 1,
AkkaUtils.getDefaultTimeout()));
-
+
Answer<Void> noop = new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) {
return null;
}
};
-
+
doAnswer(noop).when(ejv).vertexCancelled(Matchers.anyInt());
doAnswer(noop).when(ejv).vertexFailed(Matchers.anyInt(), Matchers.any(Throwable.class));
doAnswer(noop).when(ejv).vertexFinished(Matchers.anyInt());
-
+
return ejv;
}
- // --------------------------------------------------------------------------------------------
-
- public static final class ActionQueue {
-
- private final LinkedList<Runnable> runnables = new LinkedList<Runnable>();
-
- public void triggerNextAction() {
- Runnable r = runnables.remove();
- r.run();
- }
-
- public void triggerLatestAction(){
- Runnable r = runnables.removeLast();
- r.run();
- }
-
- public Runnable popNextAction() {
- Runnable r = runnables.remove();
- return r;
- }
-
- public void queueAction(Runnable r) {
- this.runnables.add(r);
- }
-
- public boolean isEmpty(){
- return runnables.isEmpty();
- }
+ public static ExecutionJobVertex getExecutionVertex(JobVertexID id) throws JobException {
+ return getExecutionVertex(id, TestingUtils.defaultExecutionContext());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
index 7787ab4..f47e92c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
@@ -24,10 +24,6 @@ import static org.mockito.Mockito.mock;
import java.util.Arrays;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.testkit.JavaTestKit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.instance.SimpleSlot;
@@ -37,24 +33,10 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
import org.junit.Test;
public class ExecutionStateProgressTest {
- private static ActorSystem system;
-
- @BeforeClass
- public static void setup(){
- system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
- }
-
- @AfterClass
- public static void teardown(){
- JavaTestKit.shutdownActorSystem(system);
- }
-
@Test
public void testAccumulatedStateFinished() {
try {
@@ -65,8 +47,13 @@ public class ExecutionStateProgressTest {
ajv.setParallelism(3);
ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
- ExecutionGraph graph = new ExecutionGraph(jid, "test job", new Configuration(),
+ ExecutionGraph graph = new ExecutionGraph(
+ TestingUtils.defaultExecutionContext(),
+ jid,
+ "test job",
+ new Configuration(),
AkkaUtils.getDefaultTimeout());
+
graph.attachJobGraph(Arrays.asList(ajv));
setGraphStatus(graph, JobStatus.RUNNING);
@@ -74,9 +61,11 @@ public class ExecutionStateProgressTest {
ExecutionJobVertex ejv = graph.getJobVertex(vid);
// mock resources and mock taskmanager
- ActorRef taskManager = system.actorOf(Props.create(SimpleAcknowledgingTaskManager.class));
for (ExecutionVertex ee : ejv.getTaskVertices()) {
- SimpleSlot slot = getInstance(taskManager).allocateSimpleSlot(jid);
+ SimpleSlot slot = getInstance(
+ new SimpleInstanceGateway(
+ TestingUtils.defaultExecutionContext())
+ ).allocateSimpleSlot(jid);
ee.deployToSlot(slot);
}