You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/11/01 08:37:38 UTC
[3/4] flink git commit: [FLINK-4887] [execution graph] Introduce
TaskManagerGateway to encapsulate communcation logic
http://git-wip-us.apache.org/repos/asf/flink/blob/cfb3790f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 788dee4..c0ce799 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -18,49 +18,49 @@
package org.apache.flink.runtime.executiongraph;
-import akka.dispatch.OnComplete;
-import akka.dispatch.OnFailure;
import org.apache.flink.api.common.Archiveable;
import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
import org.apache.flink.runtime.concurrent.BiFunction;
-import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.messages.Messages;
-import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.StackTraceSampleResponse;
import org.apache.flink.runtime.state.TaskStateHandles;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.duration.FiniteDuration;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import static akka.dispatch.Futures.future;
import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
import static org.apache.flink.runtime.execution.ExecutionState.CANCELING;
import static org.apache.flink.runtime.execution.ExecutionState.CREATED;
@@ -69,13 +69,6 @@ import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
import static org.apache.flink.runtime.execution.ExecutionState.RUNNING;
import static org.apache.flink.runtime.execution.ExecutionState.SCHEDULED;
-import static org.apache.flink.runtime.messages.TaskMessages.CancelTask;
-import static org.apache.flink.runtime.messages.TaskMessages.FailIntermediateResultPartitions;
-import static org.apache.flink.runtime.messages.TaskMessages.StopTask;
-import static org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
-import static org.apache.flink.runtime.messages.TaskMessages.UpdatePartitionInfo;
-import static org.apache.flink.runtime.messages.TaskMessages.UpdateTaskSinglePartitionInfo;
-import static org.apache.flink.runtime.messages.TaskMessages.createUpdateTaskMultiplePartitionInfos;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -117,7 +110,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
private final int attemptNumber;
- private final FiniteDuration timeout;
+ private final Time timeout;
private ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor> partialInputChannelDeploymentDescriptors;
@@ -131,8 +124,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
private TaskStateHandles taskStateHandles;
- /** The execution context which is used to execute futures. */
- private ExecutionContext executionContext;
+ /** The executor which is used to execute futures. */
+ private Executor executor;
// ------------------------- Accumulators ---------------------------------
@@ -147,12 +140,12 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
// --------------------------------------------------------------------------------------------
public Execution(
- ExecutionContext executionContext,
+ Executor executor,
ExecutionVertex vertex,
int attemptNumber,
long startTimestamp,
- FiniteDuration timeout) {
- this.executionContext = checkNotNull(executionContext);
+ Time timeout) {
+ this.executor = checkNotNull(executor);
this.vertex = checkNotNull(vertex);
this.attemptId = new ExecutionAttemptID();
@@ -162,9 +155,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
this.stateTimestamps = new long[ExecutionState.values().length];
markTimestamp(ExecutionState.CREATED, startTimestamp);
- this.timeout = timeout;
+ this.timeout = checkNotNull(timeout);
- this.partialInputChannelDeploymentDescriptors = new ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor>();
+ this.partialInputChannelDeploymentDescriptors = new ConcurrentLinkedQueue<>();
}
// --------------------------------------------------------------------------------------------
@@ -280,9 +273,10 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
// in all cases where the deployment failed. we use many try {} finally {} clauses to assure that
final Future<SimpleSlot> slotAllocationFuture = slotProvider.allocateSlot(toSchedule, queued);
- // IMPORTANT: We have to use the direct executor here so that we directly deploy the tasks
- // if the slot allocation future is completed. This is necessary for immediate deployment
- final Future<Void> deploymentFuture = slotAllocationFuture.handleAsync(new BiFunction<SimpleSlot, Throwable, Void>() {
+ // IMPORTANT: We have to use the synchronous handle operation (direct executor) here so
+ // that we directly deploy the tasks if the slot allocation future is completed. This is
+ // necessary for immediate deployment.
+ final Future<Void> deploymentFuture = slotAllocationFuture.handle(new BiFunction<SimpleSlot, Throwable, Void>() {
@Override
public Void apply(SimpleSlot simpleSlot, Throwable throwable) {
if (simpleSlot != null) {
@@ -301,7 +295,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
}
return null;
}
- }, Executors.directExecutor());
+ });
// if tasks have to scheduled immediately check that the task has been deployed
if (!queued) {
@@ -370,34 +364,26 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
// register this execution at the execution graph, to receive call backs
vertex.getExecutionGraph().registerExecution(this);
- final ActorGateway gateway = slot.getTaskManagerActorGateway();
+ final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
- final scala.concurrent.Future<Object> deployAction = gateway.ask(new SubmitTask(deployment), timeout);
-
- deployAction.onComplete(new OnComplete<Object>(){
+ final Future<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, timeout);
+ submitResultFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
@Override
- public void onComplete(Throwable failure, Object success) throws Throwable {
- if (failure != null) {
- if (failure instanceof TimeoutException) {
- String taskname = deployment.getTaskInfo().getTaskNameWithSubtasks() + " (" + attemptId + ')';
-
- markFailed(new Exception(
- "Cannot deploy task " + taskname + " - TaskManager (" + assignedResourceLocation
- + ") not responding after a timeout of " + timeout, failure));
- }
- else {
- markFailed(failure);
- }
+ public Void apply(Throwable failure) {
+ if (failure instanceof TimeoutException) {
+ String taskname = deployment.getTaskInfo().getTaskNameWithSubtasks() + " (" + attemptId + ')';
+
+ markFailed(new Exception(
+ "Cannot deploy task " + taskname + " - TaskManager (" + assignedResourceLocation
+ + ") not responding after a timeout of " + timeout, failure));
}
else {
- if (!(success.equals(Messages.getAcknowledge()))) {
- markFailed(new Exception("Failed to deploy the task to slot. Response was not of type 'Acknowledge', but was " + success
- + "\nSlot Details: " + slot));
- }
+ markFailed(failure);
}
+ return null;
}
- }, executionContext);
+ }, executor);
}
catch (Throwable t) {
markFailed(t);
@@ -409,31 +395,29 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
* Sends stop RPC call.
*/
public void stop() {
- final SimpleSlot slot = this.assignedResource;
+ final SimpleSlot slot = assignedResource;
if (slot != null) {
- final ActorGateway gateway = slot.getTaskManagerActorGateway();
+ final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
- scala.concurrent.Future<Object> stopResult = gateway.retry(
- new StopTask(attemptId),
+ Future<Acknowledge> stopResultFuture = FutureUtils.retry(
+ new Callable<Future<Acknowledge>>() {
+
+ @Override
+ public Future<Acknowledge> call() throws Exception {
+ return taskManagerGateway.stopTask(attemptId, timeout);
+ }
+ },
NUM_STOP_CALL_TRIES,
- timeout,
- executionContext);
+ executor);
- stopResult.onComplete(new OnComplete<Object>() {
+ stopResultFuture.exceptionally(new ApplyFunction<Throwable, Void>() {
@Override
- public void onComplete(Throwable failure, Object success) throws Throwable {
- if (failure != null) {
- fail(new Exception("Task could not be stopped.", failure));
- } else {
- TaskOperationResult result = (TaskOperationResult) success;
- if (!result.success()) {
- LOG.info("Stopping task was not successful. Description: {}",
- result.description());
- }
- }
+ public Void apply(Throwable failure) {
+ LOG.info("Stopping task was not successful.", failure);
+ return null;
}
- }, executionContext);
+ });
}
}
@@ -532,9 +516,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
// TODO The current approach may send many update messages even though the consuming
// task has already been deployed with all necessary information. We have to check
// whether this is a problem and fix it, if it is.
- future(new Callable<Boolean>(){
+ FlinkFuture.supplyAsync(new Callable<Void>(){
@Override
- public Boolean call() throws Exception {
+ public Void call() throws Exception {
try {
consumerVertex.scheduleForExecution(
consumerVertex.getExecutionGraph().getSlotProvider(),
@@ -544,9 +528,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
"vertex " + consumerVertex, t));
}
- return true;
+ return null;
}
- }, executionContext);
+ }, executor);
// double check to resolve race conditions
if(consumerVertex.getExecutionState() == RUNNING){
@@ -592,12 +576,11 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
final InputChannelDeploymentDescriptor descriptor = new InputChannelDeploymentDescriptor(
partitionId, partitionLocation);
- final UpdatePartitionInfo updateTaskMessage = new UpdateTaskSinglePartitionInfo(
- consumer.getAttemptId(),
- partition.getIntermediateResult().getId(),
- descriptor);
-
- sendUpdatePartitionInfoRpcCall(consumerSlot, updateTaskMessage);
+ consumer.sendUpdatePartitionInfoRpcCall(
+ Collections.singleton(
+ new PartitionInfo(
+ partition.getIntermediateResult().getId(),
+ descriptor)));
}
// ----------------------------------------------------------------
// Consumer is scheduled or deploying => cache input channel
@@ -629,6 +612,78 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
processFail(t, false);
}
+ /**
+ * Request a stack trace sample from the task of this execution.
+ *
+ * @param sampleId of the stack trace sample
+ * @param numSamples the sample should contain
+ * @param delayBetweenSamples to wait
+ * @param maxStrackTraceDepth of the samples
+ * @param timeout until the request times out
+ * @return Future stack trace sample response
+ */
+ public Future<StackTraceSampleResponse> requestStackTraceSample(
+ int sampleId,
+ int numSamples,
+ Time delayBetweenSamples,
+ int maxStrackTraceDepth,
+ Time timeout) {
+
+ final SimpleSlot slot = assignedResource;
+
+ if (slot != null) {
+ final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
+
+ return taskManagerGateway.requestStackTraceSample(
+ attemptId,
+ sampleId,
+ numSamples,
+ delayBetweenSamples,
+ maxStrackTraceDepth,
+ timeout);
+ } else {
+ return FlinkCompletableFuture.completedExceptionally(new Exception("The execution has no slot assigned."));
+ }
+ }
+
+ /**
+ * Notify the task of this execution about a completed checkpoint.
+ *
+ * @param checkpointId of the completed checkpoint
+ * @param timestamp of the completed checkpoint
+ */
+ public void notifyCheckpointComplete(long checkpointId, long timestamp) {
+ final SimpleSlot slot = assignedResource;
+
+ if (slot != null) {
+ final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
+
+ taskManagerGateway.notifyCheckpointComplete(attemptId, getVertex().getJobId(), checkpointId, timestamp);
+ } else {
+ LOG.debug("The execution has no slot assigned. This indicates that the execution is " +
+ "no longer running.");
+ }
+ }
+
+ /**
+ * Trigger a new checkpoint on the task of this execution.
+ *
+ * @param checkpointId of th checkpoint to trigger
+ * @param timestamp of the checkpoint to trigger
+ */
+ public void triggerCheckpoint(long checkpointId, long timestamp) {
+ final SimpleSlot slot = assignedResource;
+
+ if (slot != null) {
+ final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
+
+ taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp);
+ } else {
+ LOG.debug("The execution has no slot assigned. This indicates that the execution is " +
+ "no longer running.");
+ }
+ }
+
// --------------------------------------------------------------------------------------------
// Callbacks
// --------------------------------------------------------------------------------------------
@@ -754,20 +809,16 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
PartialInputChannelDeploymentDescriptor partialInputChannelDeploymentDescriptor;
- List<IntermediateDataSetID> resultIDs = new ArrayList<IntermediateDataSetID>();
- List<InputChannelDeploymentDescriptor> inputChannelDeploymentDescriptors = new ArrayList<InputChannelDeploymentDescriptor>();
+ List<PartitionInfo> partitionInfos = new ArrayList<>(partialInputChannelDeploymentDescriptors.size());
while ((partialInputChannelDeploymentDescriptor = partialInputChannelDeploymentDescriptors.poll()) != null) {
- resultIDs.add(partialInputChannelDeploymentDescriptor.getResultId());
- inputChannelDeploymentDescriptors.add(partialInputChannelDeploymentDescriptor.createInputChannelDeploymentDescriptor(this));
+ partitionInfos.add(
+ new PartitionInfo(
+ partialInputChannelDeploymentDescriptor.getResultId(),
+ partialInputChannelDeploymentDescriptor.createInputChannelDeploymentDescriptor(this)));
}
- UpdatePartitionInfo updateTaskMessage = createUpdateTaskMultiplePartitionInfos(
- attemptId,
- resultIDs,
- inputChannelDeploymentDescriptors);
-
- sendUpdatePartitionInfoRpcCall(assignedResource, updateTaskMessage);
+ sendUpdatePartitionInfoRpcCall(partitionInfos);
}
}
@@ -888,70 +939,66 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
* The sending is tried up to NUM_CANCEL_CALL_TRIES times.
*/
private void sendCancelRpcCall() {
- final SimpleSlot slot = this.assignedResource;
+ final SimpleSlot slot = assignedResource;
if (slot != null) {
+ final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
- final ActorGateway gateway = slot.getTaskManagerActorGateway();
-
- scala.concurrent.Future<Object> cancelResult = gateway.retry(
- new CancelTask(attemptId),
+ Future<Acknowledge> cancelResultFuture = FutureUtils.retry(
+ new Callable<Future<Acknowledge>>() {
+ @Override
+ public Future<Acknowledge> call() throws Exception {
+ return taskManagerGateway.cancelTask(attemptId, timeout);
+ }
+ },
NUM_CANCEL_CALL_TRIES,
- timeout,
- executionContext);
-
- cancelResult.onComplete(new OnComplete<Object>() {
+ executor);
+ cancelResultFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
@Override
- public void onComplete(Throwable failure, Object success) throws Throwable {
- if (failure != null) {
- fail(new Exception("Task could not be canceled.", failure));
- } else {
- TaskOperationResult result = (TaskOperationResult) success;
- if (!result.success()) {
- LOG.debug("Cancel task call did not find task. Probably akka message call" +
- " race.");
- }
- }
+ public Void apply(Throwable failure) {
+ fail(new Exception("Task could not be canceled.", failure));
+ return null;
}
- }, executionContext);
+ }, executor);
}
}
private void sendFailIntermediateResultPartitionsRpcCall() {
- final SimpleSlot slot = this.assignedResource;
+ final SimpleSlot slot = assignedResource;
if (slot != null) {
- final ActorGateway gateway = slot.getTaskManagerActorGateway();
+ final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
// TODO For some tests this could be a problem when querying too early if all resources were released
- gateway.tell(new FailIntermediateResultPartitions(attemptId));
+ taskManagerGateway.failPartition(attemptId);
}
}
/**
- * Sends an UpdatePartitionInfo message to the instance of the consumerSlot.
+ * Update the partition infos on the assigned resource.
*
- * @param consumerSlot Slot to whose instance the message will be sent
- * @param updatePartitionInfo UpdatePartitionInfo message
+ * @param partitionInfos for the remote task
*/
private void sendUpdatePartitionInfoRpcCall(
- final SimpleSlot consumerSlot,
- final UpdatePartitionInfo updatePartitionInfo) {
+ final Iterable<PartitionInfo> partitionInfos) {
- if (consumerSlot != null) {
- final ActorGateway gateway = consumerSlot.getTaskManagerActorGateway();
- final TaskManagerLocation taskManagerLocation = consumerSlot.getTaskManagerLocation();
+ final SimpleSlot slot = assignedResource;
- scala.concurrent.Future<Object> futureUpdate = gateway.ask(updatePartitionInfo, timeout);
+ if (slot != null) {
+ final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
+ final TaskManagerLocation taskManagerLocation = slot.getTaskManagerLocation();
+
+ Future<Acknowledge> updatePartitionsResultFuture = taskManagerGateway.updatePartitions(attemptId, partitionInfos, timeout);
- futureUpdate.onFailure(new OnFailure() {
+ updatePartitionsResultFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
@Override
- public void onFailure(Throwable failure) throws Throwable {
+ public Void apply(Throwable failure) {
fail(new IllegalStateException("Update task on TaskManager " + taskManagerLocation +
- " failed due to:", failure));
+ " failed due to:", failure));
+ return null;
}
- }, executionContext);
+ }, executor);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cfb3790f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index e2701da..244a113 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
@@ -61,8 +62,6 @@ import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.duration.FiniteDuration;
import scala.Option;
import java.io.IOException;
@@ -80,6 +79,7 @@ import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -168,7 +168,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
private final long[] stateTimestamps;
/** The timeout for all messages that require a response/acknowledgement */
- private final FiniteDuration timeout;
+ private final Time timeout;
// ------ Configuration of the Execution -------
@@ -214,8 +214,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
* available after archiving. */
private CheckpointStatsTracker checkpointStatsTracker;
- /** The execution context which is used to execute futures. */
- private ExecutionContext executionContext;
+ /** The executor which is used to execute futures. */
+ private Executor executor;
/** Registered KvState instances reported by the TaskManagers. */
private KvStateLocationRegistry kvStateLocationRegistry;
@@ -231,15 +231,15 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
* This constructor is for tests only, because it does not include class loading information.
*/
ExecutionGraph(
- ExecutionContext executionContext,
+ Executor executor,
JobID jobId,
String jobName,
Configuration jobConfig,
SerializedValue<ExecutionConfig> serializedConfig,
- FiniteDuration timeout,
+ Time timeout,
RestartStrategy restartStrategy) {
this(
- executionContext,
+ executor,
jobId,
jobName,
jobConfig,
@@ -254,25 +254,25 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
}
public ExecutionGraph(
- ExecutionContext executionContext,
+ Executor executor,
JobID jobId,
String jobName,
Configuration jobConfig,
SerializedValue<ExecutionConfig> serializedConfig,
- FiniteDuration timeout,
+ Time timeout,
RestartStrategy restartStrategy,
List<BlobKey> requiredJarFiles,
List<URL> requiredClasspaths,
ClassLoader userClassLoader,
MetricGroup metricGroup) {
- checkNotNull(executionContext);
+ checkNotNull(executor);
checkNotNull(jobId);
checkNotNull(jobName);
checkNotNull(jobConfig);
checkNotNull(userClassLoader);
- this.executionContext = executionContext;
+ this.executor = executor;
this.jobID = jobId;
this.jobName = jobName;
@@ -594,8 +594,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
*
* @return ExecutionContext associated with this ExecutionGraph
*/
- public ExecutionContext getExecutionContext() {
- return executionContext;
+ public Executor getExecutor() {
+ return executor;
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/cfb3790f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index dcd6a5d..756464d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -40,9 +40,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.slf4j.Logger;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.ExecutionContext$;
-import scala.concurrent.duration.FiniteDuration;
import javax.annotation.Nullable;
import java.util.ArrayList;
@@ -55,8 +52,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* Utility class to encapsulate the logic of building an {@link ExecutionGraph} from a {@link JobGraph}.
*/
public class ExecutionGraphBuilder {
-
- /**
+ /**
* Builds the ExecutionGraph from the JobGraph.
* If a prior execution graph exists, the JobGraph will be attached. If no prior execution
* graph exists, then the JobGraph will become attach to a new emoty execution graph.
@@ -73,32 +69,6 @@ public class ExecutionGraphBuilder {
MetricGroup metrics,
int parallelismForAutoMax,
Logger log)
- throws JobExecutionException, JobException
- {
- final ExecutionContext executionContext = ExecutionContext$.MODULE$.fromExecutor(executor);
-
- return buildGraph(prior, jobGraph, jobManagerConfig, executionContext,
- classLoader, recoveryFactory, timeout, restartStrategy,
- metrics, parallelismForAutoMax, log);
- }
-
- /**
- * Builds the ExecutionGraph from the JobGraph.
- * If a prior execution graph exists, the JobGraph will be attached. If no prior execution
- * graph exists, then the JobGraph will become attach to a new emoty execution graph.
- */
- public static ExecutionGraph buildGraph(
- @Nullable ExecutionGraph prior,
- JobGraph jobGraph,
- Configuration jobManagerConfig,
- ExecutionContext executionContext,
- ClassLoader classLoader,
- CheckpointRecoveryFactory recoveryFactory,
- Time timeout,
- RestartStrategy restartStrategy,
- MetricGroup metrics,
- int parallelismForAutoMax,
- Logger log)
throws JobExecutionException, JobException
{
checkNotNull(jobGraph, "job graph cannot be null");
@@ -109,12 +79,12 @@ public class ExecutionGraphBuilder {
// create a new execution graph, if none exists so far
final ExecutionGraph executionGraph = (prior != null) ? prior :
new ExecutionGraph(
- executionContext,
+ executor,
jobId,
jobName,
jobGraph.getJobConfiguration(),
jobGraph.getSerializedExecutionConfig(),
- new FiniteDuration(timeout.getSize(), timeout.getUnit()),
+ timeout,
restartStrategy,
jobGraph.getUserJarBlobKeys(),
jobGraph.getClasspaths(),
http://git-wip-us.apache.org/repos/asf/flink/blob/cfb3790f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 2d9ec88..2bb63d4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.InputSplitSource;
@@ -45,7 +46,6 @@ import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import scala.Option;
-import scala.concurrent.duration.FiniteDuration;
import java.util.ArrayList;
import java.util.Collections;
@@ -90,7 +90,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
ExecutionGraph graph,
JobVertex jobVertex,
int defaultParallelism,
- FiniteDuration timeout) throws JobException {
+ Time timeout) throws JobException {
this(graph, jobVertex, defaultParallelism, timeout, System.currentTimeMillis());
}
@@ -99,7 +99,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
ExecutionGraph graph,
JobVertex jobVertex,
int defaultParallelism,
- FiniteDuration timeout,
+ Time timeout,
long createTimestamp) throws JobException {
if (graph == null || jobVertex == null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/cfb3790f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index b647385..8979d7c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.Archiveable;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
@@ -29,7 +30,6 @@ import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescript
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -46,9 +46,7 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
-import scala.concurrent.duration.FiniteDuration;
-import java.io.Serializable;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
@@ -86,7 +84,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
private final List<Execution> priorExecutions;
- private final FiniteDuration timeout;
+ private final Time timeout;
/** The name in the format "myTask (2/7)", cached to avoid frequent string concatenations */
private final String taskNameWithSubtask;
@@ -103,7 +101,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
ExecutionJobVertex jobVertex,
int subTaskIndex,
IntermediateResult[] producedDataSets,
- FiniteDuration timeout) {
+ Time timeout) {
this(jobVertex, subTaskIndex, producedDataSets, timeout, System.currentTimeMillis());
}
@@ -111,7 +109,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
ExecutionJobVertex jobVertex,
int subTaskIndex,
IntermediateResult[] producedDataSets,
- FiniteDuration timeout,
+ Time timeout,
long createTimestamp) {
this.jobVertex = jobVertex;
@@ -133,7 +131,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
this.priorExecutions = new CopyOnWriteArrayList<Execution>();
this.currentExecution = new Execution(
- getExecutionGraph().getExecutionContext(),
+ getExecutionGraph().getExecutor(),
this,
0,
createTimestamp,
@@ -440,7 +438,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
if (state == FINISHED || state == CANCELED || state == FAILED) {
priorExecutions.add(execution);
currentExecution = new Execution(
- getExecutionGraph().getExecutionContext(),
+ getExecutionGraph().getExecutor(),
this,
execution.getAttemptNumber()+1,
System.currentTimeMillis(),
@@ -477,50 +475,6 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
this.currentExecution.fail(t);
}
- public boolean sendMessageToCurrentExecution(
- Serializable message,
- ExecutionAttemptID attemptID) {
-
- return sendMessageToCurrentExecution(message, attemptID, null);
- }
-
- public boolean sendMessageToCurrentExecution(
- Serializable message,
- ExecutionAttemptID attemptID,
- ActorGateway sender) {
- Execution exec = getCurrentExecutionAttempt();
-
- // check that this is for the correct execution attempt
- if (exec != null && exec.getAttemptId().equals(attemptID)) {
- SimpleSlot slot = exec.getAssignedResource();
-
- // send only if we actually have a target
- if (slot != null) {
- ActorGateway gateway = slot.getTaskManagerActorGateway();
- if (gateway != null) {
- if (sender == null) {
- gateway.tell(message);
- } else {
- gateway.tell(message, sender);
- }
-
- return true;
- } else {
- return false;
- }
- }
- else {
- LOG.debug("Skipping message to undeployed task execution {}/{}", getSimpleName(), attemptID);
- return false;
- }
- }
- else {
- LOG.debug("Skipping message to {}/{} because it does not match the current execution",
- getSimpleName(), attemptID);
- return false;
- }
- }
-
/**
* Schedules or updates the consumer tasks of the result partition with the given ID.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/cfb3790f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/PartitionInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/PartitionInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/PartitionInfo.java
new file mode 100644
index 0000000..478550c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/PartitionInfo.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * Contains information where to find a partition. The partition is defined by the
+ * {@link IntermediateDataSetID} and the partition location is specified by
+ * {@link InputChannelDeploymentDescriptor}.
+ */
+public class PartitionInfo implements Serializable {
+
+ private static final long serialVersionUID = 1724490660830968430L;
+
+ private final IntermediateDataSetID intermediateDataSetID;
+ private final InputChannelDeploymentDescriptor inputChannelDeploymentDescriptor;
+
+ public PartitionInfo(IntermediateDataSetID intermediateResultPartitionID, InputChannelDeploymentDescriptor inputChannelDeploymentDescriptor) {
+ this.intermediateDataSetID = Preconditions.checkNotNull(intermediateResultPartitionID);
+ this.inputChannelDeploymentDescriptor = Preconditions.checkNotNull(inputChannelDeploymentDescriptor);
+ }
+
+ public IntermediateDataSetID getIntermediateDataSetID() {
+ return intermediateDataSetID;
+ }
+
+ public InputChannelDeploymentDescriptor getInputChannelDeploymentDescriptor() {
+ return inputChannelDeploymentDescriptor;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/cfb3790f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
index 528eacb..3962e91 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.executiongraph.restart;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.util.Preconditions;
import scala.concurrent.duration.Duration;
@@ -28,8 +29,6 @@ import scala.concurrent.duration.Duration;
import java.util.ArrayDeque;
import java.util.concurrent.TimeUnit;
-import static akka.dispatch.Futures.future;
-
/**
* Restart strategy which tries to restart the given {@link ExecutionGraph} when failure rate exceeded
* with a fixed time delay in between.
@@ -71,7 +70,7 @@ public class FailureRateRestartStrategy implements RestartStrategy {
restartTimestampsDeque.remove();
}
restartTimestampsDeque.add(System.currentTimeMillis());
- future(ExecutionGraphRestarter.restartWithDelay(executionGraph, delayInterval.toMilliseconds()), executionGraph.getExecutionContext());
+ FlinkFuture.supplyAsync(ExecutionGraphRestarter.restartWithDelay(executionGraph, delayInterval.toMilliseconds()), executionGraph.getExecutor());
}
private boolean isRestartTimestampsQueueFull() {
http://git-wip-us.apache.org/repos/asf/flink/blob/cfb3790f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
index 8053e95..5337c6a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
@@ -20,12 +20,11 @@ package org.apache.flink.runtime.executiongraph.restart;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.util.Preconditions;
import scala.concurrent.duration.Duration;
-import static akka.dispatch.Futures.future;
-
/**
* Restart strategy which tries to restart the given {@link ExecutionGraph} a fixed number of times
* with a fixed time delay in between.
@@ -59,7 +58,7 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
@Override
public void restart(final ExecutionGraph executionGraph) {
currentRestartAttempt++;
- future(ExecutionGraphRestarter.restartWithDelay(executionGraph, delayBetweenRestartAttempts), executionGraph.getExecutionContext());
+ FlinkFuture.supplyAsync(ExecutionGraphRestarter.restartWithDelay(executionGraph, delayBetweenRestartAttempts), executionGraph.getExecutor());
}
/**
@@ -125,4 +124,4 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
return new FixedDelayRestartStrategy(maxAttempts, delay);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/cfb3790f/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index d63d475..b5c6f23 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -29,8 +29,10 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,7 +51,7 @@ public class Instance implements SlotOwner {
private final Object instanceLock = new Object();
/** The instance gateway to communicate with the instance */
- private final ActorGateway actorGateway;
+ private final TaskManagerGateway taskManagerGateway;
/** The instance connection information for the data transfer. */
private final TaskManagerLocation location;
@@ -84,25 +86,25 @@ public class Instance implements SlotOwner {
/**
* Constructs an instance reflecting a registered TaskManager.
*
- * @param actorGateway The actor gateway to communicate with the remote instance
+ * @param taskManagerGateway The actor gateway to communicate with the remote instance
* @param location The remote connection where the task manager receives requests.
* @param id The id under which the taskManager is registered.
* @param resources The resources available on the machine.
* @param numberOfSlots The number of task slots offered by this taskManager.
*/
public Instance(
- ActorGateway actorGateway,
+ TaskManagerGateway taskManagerGateway,
TaskManagerLocation location,
InstanceID id,
HardwareDescription resources,
int numberOfSlots) {
- this.actorGateway = actorGateway;
- this.location = location;
- this.instanceId = id;
- this.resources = resources;
+ this.taskManagerGateway = Preconditions.checkNotNull(taskManagerGateway);
+ this.location = Preconditions.checkNotNull(location);
+ this.instanceId = Preconditions.checkNotNull(id);
+ this.resources = Preconditions.checkNotNull(resources);
this.numberOfSlots = numberOfSlots;
- this.availableSlots = new ArrayDeque<Integer>(numberOfSlots);
+ this.availableSlots = new ArrayDeque<>(numberOfSlots);
for (int i = 0; i < numberOfSlots; i++) {
this.availableSlots.add(i);
}
@@ -230,7 +232,7 @@ public class Instance implements SlotOwner {
return null;
}
else {
- SimpleSlot slot = new SimpleSlot(jobID, this, location, nextSlot, actorGateway);
+ SimpleSlot slot = new SimpleSlot(jobID, this, location, nextSlot, taskManagerGateway);
allocatedSlots.add(slot);
return slot;
}
@@ -268,7 +270,7 @@ public class Instance implements SlotOwner {
}
else {
SharedSlot slot = new SharedSlot(
- jobID, this, location, nextSlot, actorGateway, sharingGroupAssignment);
+ jobID, this, location, nextSlot, taskManagerGateway, sharingGroupAssignment);
allocatedSlots.add(slot);
return slot;
}
@@ -335,8 +337,8 @@ public class Instance implements SlotOwner {
*
* @return InstanceGateway associated with this instance
*/
- public ActorGateway getActorGateway() {
- return actorGateway;
+ public TaskManagerGateway getTaskManagerGateway() {
+ return taskManagerGateway;
}
public TaskManagerLocation getTaskManagerLocation() {
@@ -390,6 +392,6 @@ public class Instance implements SlotOwner {
@Override
public String toString() {
return String.format("%s @ %s - %d slots - URL: %s", instanceId, location.getHostname(),
- numberOfSlots, (actorGateway != null ? actorGateway.path() : "No instance gateway"));
+ numberOfSlots, (taskManagerGateway != null ? taskManagerGateway.getAddress() : "No instance gateway"));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cfb3790f/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
index 132ee6f..65909db 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
@@ -25,10 +25,9 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.UUID;
-import akka.actor.ActorRef;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -131,19 +130,17 @@ public class InstanceManager {
* Registers a task manager. Registration of a task manager makes it available to be used
* for the job execution.
*
- * @param taskManager ActorRef to the TaskManager which wants to be registered
+ * @param taskManagerGateway gateway to the task manager
* @param taskManagerLocation Location info of the TaskManager
* @param resources Hardware description of the TaskManager
* @param numberOfSlots Number of available slots on the TaskManager
- * @param leaderSessionID The current leader session ID of the JobManager
* @return The assigned InstanceID of the registered task manager
*/
public InstanceID registerTaskManager(
- ActorRef taskManager,
+ TaskManagerGateway taskManagerGateway,
TaskManagerLocation taskManagerLocation,
HardwareDescription resources,
- int numberOfSlots,
- UUID leaderSessionID) {
+ int numberOfSlots) {
synchronized (this.lock) {
if (this.isShutdown) {
@@ -163,11 +160,14 @@ public class InstanceManager {
" which was marked as dead earlier because of a heart-beat timeout.");
}
- ActorGateway actorGateway = new AkkaActorGateway(taskManager, leaderSessionID);
-
InstanceID instanceID = new InstanceID();
- Instance host = new Instance(actorGateway, taskManagerLocation, instanceID, resources, numberOfSlots);
+ Instance host = new Instance(
+ taskManagerGateway,
+ taskManagerLocation,
+ instanceID,
+ resources,
+ numberOfSlots);
registeredHostsById.put(instanceID, host);
registeredHostsByResource.put(taskManagerLocation.getResourceID(), host);
@@ -179,7 +179,7 @@ public class InstanceManager {
"Current number of registered hosts is %d. " +
"Current number of alive task slots is %d.",
taskManagerLocation.getHostname(),
- taskManager.path(),
+ taskManagerGateway.getAddress(),
instanceID,
registeredHostsById.size(),
totalNumberOfAliveTaskSlots));
@@ -195,7 +195,7 @@ public class InstanceManager {
}
/**
- * Unregisters the TaskManager with the given {@link ActorRef}. Unregistering means to mark
+ * Unregisters the TaskManager with the given instance id. Unregistering means to mark
* the given instance as dead and notify {@link InstanceListener} about the dead instance.
*
* @param instanceId TaskManager which is about to be marked dead.
@@ -204,8 +204,6 @@ public class InstanceManager {
Instance instance = registeredHostsById.get(instanceId);
if (instance != null){
- ActorRef host = instance.getActorGateway().actor();
-
registeredHostsById.remove(instance.getId());
registeredHostsByResource.remove(instance.getTaskManagerID());
@@ -219,9 +217,10 @@ public class InstanceManager {
notifyDeadInstance(instance);
- LOG.info("Unregistered task manager " + host.path() + ". Number of " +
- "registered task managers " + getNumberOfRegisteredTaskManagers() + ". Number" +
- " of available slots " + getTotalNumberOfSlots() + ".");
+ LOG.info(
+ "Unregistered task manager " + instance.getTaskManagerLocation().addressString() +
+ ". Number of registered task managers " + getNumberOfRegisteredTaskManagers() +
+ ". Number of available slots " + getTotalNumberOfSlots() + ".");
} else {
LOG.warn("Tried to unregister instance {} but it is not registered.", instanceId);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cfb3790f/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
index 97385b1..106a8ef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.instance;
import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.AbstractID;
import org.apache.flink.api.common.JobID;
@@ -63,15 +64,15 @@ public class SharedSlot extends Slot {
* @param owner The component from which this slot is allocated.
* @param location The location info of the TaskManager where the slot was allocated from
* @param slotNumber The number of the slot.
- * @param taskManagerActorGateway The actor gateway to communicate with the TaskManager
+ * @param taskManagerGateway The gateway to communicate with the TaskManager
* @param assignmentGroup The assignment group that this shared slot belongs to.
*/
public SharedSlot(
JobID jobID, SlotOwner owner, TaskManagerLocation location, int slotNumber,
- ActorGateway taskManagerActorGateway,
+ TaskManagerGateway taskManagerGateway,
SlotSharingGroupAssignment assignmentGroup) {
- this(jobID, owner, location, slotNumber, taskManagerActorGateway, assignmentGroup, null, null);
+ this(jobID, owner, location, slotNumber, taskManagerGateway, assignmentGroup, null, null);
}
/**
@@ -82,18 +83,22 @@ public class SharedSlot extends Slot {
* @param owner The component from which this slot is allocated.
* @param location The location info of the TaskManager where the slot was allocated from
* @param slotNumber The number of the slot.
- * @param taskManagerActorGateway The actor gateway to communicate with the TaskManager
+ * @param taskManagerGateway The gateway to communicate with the TaskManager
* @param assignmentGroup The assignment group that this shared slot belongs to.
* @param parent The parent slot of this slot.
* @param groupId The assignment group of this slot.
*/
public SharedSlot(
- JobID jobID, SlotOwner owner, TaskManagerLocation location, int slotNumber,
- ActorGateway taskManagerActorGateway,
+ JobID jobID,
+ SlotOwner owner,
+ TaskManagerLocation location,
+ int slotNumber,
+ TaskManagerGateway taskManagerGateway,
SlotSharingGroupAssignment assignmentGroup,
- @Nullable SharedSlot parent, @Nullable AbstractID groupId) {
+ @Nullable SharedSlot parent,
+ @Nullable AbstractID groupId) {
- super(jobID, owner, location, slotNumber, taskManagerActorGateway, parent, groupId);
+ super(jobID, owner, location, slotNumber, taskManagerGateway, parent, groupId);
this.assignmentGroup = checkNotNull(assignmentGroup);
this.subSlots = new HashSet<Slot>();
@@ -218,7 +223,7 @@ public class SharedSlot extends Slot {
if (isAlive()) {
SimpleSlot slot = new SimpleSlot(
getJobID(), getOwner(), getTaskManagerLocation(), subSlots.size(),
- getTaskManagerActorGateway(), this, groupId);
+ getTaskManagerGateway(), this, groupId);
subSlots.add(slot);
return slot;
}
@@ -240,7 +245,7 @@ public class SharedSlot extends Slot {
if (isAlive()) {
SharedSlot slot = new SharedSlot(
getJobID(), getOwner(), getTaskManagerLocation(), subSlots.size(),
- getTaskManagerActorGateway(), assignmentGroup, this, groupId);
+ getTaskManagerGateway(), assignmentGroup, this, groupId);
subSlots.add(slot);
return slot;
}
@@ -290,4 +295,4 @@ public class SharedSlot extends Slot {
public String toString() {
return "Shared " + super.toString();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/cfb3790f/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
index 479fa29..8c7ec01 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.AbstractID;
@@ -61,12 +62,12 @@ public class SimpleSlot extends Slot {
* @param owner The component from which this slot is allocated.
* @param location The location info of the TaskManager where the slot was allocated from
* @param slotNumber The number of the task slot on the instance.
- * @param taskManagerActorGateway The actor gateway to communicate with the TaskManager of this slot
+ * @param taskManagerGateway The gateway to communicate with the TaskManager of this slot
*/
public SimpleSlot(
JobID jobID, SlotOwner owner, TaskManagerLocation location, int slotNumber,
- ActorGateway taskManagerActorGateway) {
- this(jobID, owner, location, slotNumber, taskManagerActorGateway, null, null);
+ TaskManagerGateway taskManagerGateway) {
+ this(jobID, owner, location, slotNumber, taskManagerGateway, null, null);
}
/**
@@ -77,18 +78,19 @@ public class SimpleSlot extends Slot {
* @param owner The component from which this slot is allocated.
* @param location The location info of the TaskManager where the slot was allocated from
* @param slotNumber The number of the simple slot in its parent shared slot.
+ * @param taskManagerGateway to communicate with the associated task manager.
* @param parent The parent shared slot.
* @param groupID The ID that identifies the group that the slot belongs to.
*/
public SimpleSlot(
JobID jobID, SlotOwner owner, TaskManagerLocation location, int slotNumber,
- ActorGateway taskManagerActorGateway,
+ TaskManagerGateway taskManagerGateway,
@Nullable SharedSlot parent, @Nullable AbstractID groupID) {
super(parent != null ?
parent.getAllocatedSlot() :
new AllocatedSlot(NO_ALLOCATION_ID, jobID, location, slotNumber,
- ResourceProfile.UNKNOWN, taskManagerActorGateway),
+ ResourceProfile.UNKNOWN, taskManagerGateway),
owner, slotNumber, parent, groupID);
}
@@ -237,4 +239,4 @@ public class SimpleSlot extends Slot {
public String toString() {
return "SimpleSlot " + super.toString();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/cfb3790f/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
index b840b0c..8f8b897 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.AbstractID;
import org.apache.flink.api.common.JobID;
@@ -95,20 +96,29 @@ public abstract class Slot {
* @param owner The component from which this slot is allocated.
* @param location The location info of the TaskManager where the slot was allocated from
* @param slotNumber The number of this slot.
- * @param taskManagerActorGateway The actor gateway to communicate with the TaskManager
+ * @param taskManagerGateway The actor gateway to communicate with the TaskManager
* @param parent The parent slot that contains this slot. May be null, if this slot is the root.
* @param groupID The ID that identifies the task group for which this slot is allocated. May be null
* if the slot does not belong to any task group.
*/
protected Slot(
- JobID jobID, SlotOwner owner, TaskManagerLocation location, int slotNumber,
- ActorGateway taskManagerActorGateway,
- @Nullable SharedSlot parent, @Nullable AbstractID groupID) {
+ JobID jobID,
+ SlotOwner owner,
+ TaskManagerLocation location,
+ int slotNumber,
+ TaskManagerGateway taskManagerGateway,
+ @Nullable SharedSlot parent,
+ @Nullable AbstractID groupID) {
checkArgument(slotNumber >= 0);
this.allocatedSlot = new AllocatedSlot(
- NO_ALLOCATION_ID, jobID, location, slotNumber, ResourceProfile.UNKNOWN, taskManagerActorGateway);
+ NO_ALLOCATION_ID,
+ jobID,
+ location,
+ slotNumber,
+ ResourceProfile.UNKNOWN,
+ taskManagerGateway);
this.owner = checkNotNull(owner);
this.parent = parent; // may be null
@@ -184,8 +194,8 @@ public abstract class Slot {
*
* @return The actor gateway that can be used to send messages to the TaskManager.
*/
- public ActorGateway getTaskManagerActorGateway() {
- return allocatedSlot.getTaskManagerActorGateway();
+ public TaskManagerGateway getTaskManagerGateway() {
+ return allocatedSlot.getTaskManagerGateway();
}
/**
@@ -361,4 +371,4 @@ public abstract class Slot {
return "(unknown)";
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/cfb3790f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
new file mode 100644
index 0000000..fe4ecfb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.slots;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.messages.StackTrace;
+import org.apache.flink.runtime.messages.StackTraceSampleMessages;
+import org.apache.flink.runtime.messages.StackTraceSampleResponse;
+import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.apache.flink.runtime.messages.TaskMessages;
+import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
+import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
+import org.apache.flink.util.Preconditions;
+import scala.concurrent.duration.FiniteDuration;
+import scala.reflect.ClassTag$;
+
+/**
+ * Implementation of the {@link TaskManagerGateway} for {@link ActorGateway}.
+ */
+public class ActorTaskManagerGateway implements TaskManagerGateway {
+ private final ActorGateway actorGateway;
+
+ public ActorTaskManagerGateway(ActorGateway actorGateway) {
+ this.actorGateway = Preconditions.checkNotNull(actorGateway);
+ }
+
+ public ActorGateway getActorGateway() {
+ return actorGateway;
+ }
+
+ //-------------------------------------------------------------------------------
+ // Task manager rpc methods
+ //-------------------------------------------------------------------------------
+
+ @Override
+ public String getAddress() {
+ return actorGateway.path();
+ }
+
+ @Override
+ public void disconnectFromJobManager(InstanceID instanceId, Exception cause) {
+ actorGateway.tell(new Messages.Disconnect(instanceId, cause));
+ }
+
+ @Override
+ public void stopCluster(final ApplicationStatus applicationStatus, final String message) {
+ actorGateway.tell(new StopCluster(applicationStatus, message));
+ }
+
+ @Override
+ public Future<StackTrace> requestStackTrace(final Time timeout) {
+ Preconditions.checkNotNull(timeout);
+
+ scala.concurrent.Future<StackTrace> stackTraceFuture = actorGateway.ask(
+ TaskManagerMessages.SendStackTrace$.MODULE$.get(),
+ new FiniteDuration(timeout.getSize(), timeout.getUnit()))
+ .mapTo(ClassTag$.MODULE$.<StackTrace>apply(StackTrace.class));
+
+ return new FlinkFuture<>(stackTraceFuture);
+ }
+
+ @Override
+ public Future<StackTraceSampleResponse> requestStackTraceSample(
+ ExecutionAttemptID executionAttemptID,
+ int sampleId,
+ int numSamples,
+ Time delayBetweenSamples,
+ int maxStackTraceDepth,
+ Time timeout) {
+ Preconditions.checkNotNull(executionAttemptID);
+ Preconditions.checkArgument(numSamples > 0, "The number of samples must be greater than 0.");
+ Preconditions.checkNotNull(delayBetweenSamples);
+ Preconditions.checkArgument(maxStackTraceDepth >= 0, "The max stack trace depth must be greater or equal than 0.");
+ Preconditions.checkNotNull(timeout);
+
+ scala.concurrent.Future<StackTraceSampleResponse> stackTraceSampleResponseFuture = actorGateway.ask(
+ new StackTraceSampleMessages.TriggerStackTraceSample(
+ sampleId,
+ executionAttemptID,
+ numSamples,
+ delayBetweenSamples,
+ maxStackTraceDepth),
+ new FiniteDuration(timeout.getSize(), timeout.getUnit()))
+ .mapTo(ClassTag$.MODULE$.<StackTraceSampleResponse>apply(StackTraceSampleResponse.class));
+
+ return new FlinkFuture<>(stackTraceSampleResponseFuture);
+ }
+
+ @Override
+ public Future<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
+ Preconditions.checkNotNull(tdd);
+ Preconditions.checkNotNull(timeout);
+
+ scala.concurrent.Future<Acknowledge> submitResult = actorGateway.ask(
+ new TaskMessages.SubmitTask(tdd),
+ new FiniteDuration(timeout.getSize(), timeout.getUnit()))
+ .mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class));
+
+ return new FlinkFuture<>(submitResult);
+ }
+
+ @Override
+ public Future<Acknowledge> stopTask(ExecutionAttemptID executionAttemptID, Time timeout) {
+ Preconditions.checkNotNull(executionAttemptID);
+ Preconditions.checkNotNull(timeout);
+
+ scala.concurrent.Future<Acknowledge> stopResult = actorGateway.ask(
+ new TaskMessages.StopTask(executionAttemptID),
+ new FiniteDuration(timeout.getSize(), timeout.getUnit()))
+ .mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class));
+
+ return new FlinkFuture<>(stopResult);
+ }
+
+ @Override
+ public Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
+ Preconditions.checkNotNull(executionAttemptID);
+ Preconditions.checkNotNull(timeout);
+
+ scala.concurrent.Future<Acknowledge> cancelResult = actorGateway.ask(
+ new TaskMessages.CancelTask(executionAttemptID),
+ new FiniteDuration(timeout.getSize(), timeout.getUnit()))
+ .mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class));
+
+ return new FlinkFuture<>(cancelResult);
+ }
+
+ @Override
+ public Future<Acknowledge> updatePartitions(ExecutionAttemptID executionAttemptID, Iterable<PartitionInfo> partitionInfos, Time timeout) {
+ Preconditions.checkNotNull(executionAttemptID);
+ Preconditions.checkNotNull(partitionInfos);
+
+ TaskMessages.UpdatePartitionInfo updatePartitionInfoMessage = new TaskMessages.UpdateTaskMultiplePartitionInfos(
+ executionAttemptID,
+ partitionInfos);
+
+ scala.concurrent.Future<Acknowledge> updatePartitionsResult = actorGateway.ask(
+ updatePartitionInfoMessage,
+ new FiniteDuration(timeout.getSize(), timeout.getUnit()))
+ .mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class));
+
+ return new FlinkFuture<>(updatePartitionsResult);
+ }
+
+ @Override
+ public void failPartition(ExecutionAttemptID executionAttemptID) {
+ Preconditions.checkNotNull(executionAttemptID);
+
+ actorGateway.tell(new TaskMessages.FailIntermediateResultPartitions(executionAttemptID));
+ }
+
+ @Override
+ public void notifyCheckpointComplete(
+ ExecutionAttemptID executionAttemptID,
+ JobID jobId,
+ long checkpointId,
+ long timestamp) {
+
+ Preconditions.checkNotNull(executionAttemptID);
+ Preconditions.checkNotNull(jobId);
+
+ actorGateway.tell(new NotifyCheckpointComplete(jobId, executionAttemptID, checkpointId, timestamp));
+ }
+
+ @Override
+ public void triggerCheckpoint(
+ ExecutionAttemptID executionAttemptID,
+ JobID jobId,
+ long checkpointId,
+ long timestamp) {
+
+ Preconditions.checkNotNull(executionAttemptID);
+ Preconditions.checkNotNull(jobId);
+
+ actorGateway.tell(new TriggerCheckpoint(jobId, executionAttemptID, checkpointId, timestamp));
+ }
+
+ @Override
+ public Future<BlobKey> requestTaskManagerLog(Time timeout) {
+ return requestTaskManagerLog((TaskManagerMessages.RequestTaskManagerLog) TaskManagerMessages.getRequestTaskManagerLog(), timeout);
+ }
+
+ @Override
+ public Future<BlobKey> requestTaskManagerStdout(Time timeout) {
+ return requestTaskManagerLog((TaskManagerMessages.RequestTaskManagerLog) TaskManagerMessages.getRequestTaskManagerStdout(), timeout);
+ }
+
+ private Future<BlobKey> requestTaskManagerLog(TaskManagerMessages.RequestTaskManagerLog request, Time timeout) {
+ Preconditions.checkNotNull(request);
+ Preconditions.checkNotNull(timeout);
+
+ scala.concurrent.Future<BlobKey> blobKeyFuture = actorGateway
+ .ask(
+ request,
+ new FiniteDuration(timeout.getSize(), timeout.getUnit()))
+ .mapTo(ClassTag$.MODULE$.<BlobKey>apply(BlobKey.class));
+
+ return new FlinkFuture<>(blobKeyFuture);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/cfb3790f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
index 355524c..9419ab4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.jobmanager.slots;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -53,7 +52,7 @@ public class AllocatedSlot {
private final ResourceProfile resourceProfile;
/** TEMP until the new RPC is in place: The actor gateway to communicate with the TaskManager */
- private final ActorGateway taskManagerActorGateway;
+ private final TaskManagerGateway taskManagerGateway;
/** The number of the slot on the TaskManager to which slot belongs. Purely informational. */
private final int slotNumber;
@@ -66,14 +65,14 @@ public class AllocatedSlot {
TaskManagerLocation location,
int slotNumber,
ResourceProfile resourceProfile,
- ActorGateway actorGateway)
+ TaskManagerGateway taskManagerGateway)
{
this.slotAllocationId = checkNotNull(slotAllocationId);
this.jobID = checkNotNull(jobID);
this.taskManagerLocation = checkNotNull(location);
this.slotNumber = slotNumber;
this.resourceProfile = checkNotNull(resourceProfile);
- this.taskManagerActorGateway = checkNotNull(actorGateway);
+ this.taskManagerGateway = checkNotNull(taskManagerGateway);
}
public AllocatedSlot(AllocatedSlot other) {
@@ -82,7 +81,7 @@ public class AllocatedSlot {
this.taskManagerLocation = other.taskManagerLocation;
this.slotNumber = other.slotNumber;
this.resourceProfile = other.resourceProfile;
- this.taskManagerActorGateway = other.taskManagerActorGateway;
+ this.taskManagerGateway = other.taskManagerGateway;
}
// ------------------------------------------------------------------------
@@ -139,8 +138,8 @@ public class AllocatedSlot {
*
* @return The actor gateway that can be used to send messages to the TaskManager.
*/
- public ActorGateway getTaskManagerActorGateway() {
- return taskManagerActorGateway;
+ public TaskManagerGateway getTaskManagerGateway() {
+ return taskManagerGateway;
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/cfb3790f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
new file mode 100644
index 0000000..db0a3bf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.slots;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.StackTrace;
+import org.apache.flink.runtime.messages.StackTraceSampleResponse;
+
+/**
+ * Task manager gateway interface to communicate with the task manager.
+ */
+public interface TaskManagerGateway {
+
+ /**
+ * Return the address of the task manager with which the gateway is associated.
+ *
+ * @return Address of the task manager with which this gateway is associated.
+ */
+ String getAddress();
+
+ /**
+ * Disconnect the task manager from the job manager.
+ *
+ * @param instanceId identifying the task manager
+ * @param cause of the disconnection
+ */
+ void disconnectFromJobManager(InstanceID instanceId, Exception cause);
+
+ /**
+ * Stop the cluster.
+ *
+ * @param applicationStatus to stop the cluster with
+ * @param message to deliver
+ */
+ void stopCluster(final ApplicationStatus applicationStatus, final String message);
+
+ /**
+ * Request the stack trace from the task manager.
+ *
+ * @param timeout for the stack trace request
+ * @return Future for a stack trace
+ */
+ Future<StackTrace> requestStackTrace(final Time timeout);
+
+ /**
+ * Request a stack trace sample from the given task.
+ *
+ * @param executionAttemptID identifying the task to sample
+ * @param sampleId of the sample
+ * @param numSamples to take from the given task
+ * @param delayBetweenSamples to wait for
+ * @param maxStackTraceDepth of the returned sample
+ * @param timeout of the request
+ * @return Future of stack trace sample response
+ */
+ Future<StackTraceSampleResponse> requestStackTraceSample(
+ final ExecutionAttemptID executionAttemptID,
+ final int sampleId,
+ final int numSamples,
+ final Time delayBetweenSamples,
+ final int maxStackTraceDepth,
+ final Time timeout);
+
+ /**
+ * Submit a task to the task manager.
+ *
+ * @param tdd describing the task to submit
+ * @param timeout of the submit operation
+ * @return Future acknowledge of the successful operation
+ */
+ Future<Acknowledge> submitTask(
+ TaskDeploymentDescriptor tdd,
+ Time timeout);
+
+ /**
+ * Stop the given task.
+ *
+ * @param executionAttemptID identifying the task
+ * @param timeout of the submit operation
+ * @return Future acknowledge if the task is successfully stopped
+ */
+ Future<Acknowledge> stopTask(
+ ExecutionAttemptID executionAttemptID,
+ Time timeout);
+
+ /**
+ * Cancel the given task.
+ *
+ * @param executionAttemptID identifying the task
+ * @param timeout of the submit operation
+ * @return Future acknowledge if the task is successfully canceled
+ */
+ Future<Acknowledge> cancelTask(
+ ExecutionAttemptID executionAttemptID,
+ Time timeout);
+
+ /**
+ * Update the task where the given partitions can be found.
+ *
+ * @param executionAttemptID identifying the task
+ * @param partitionInfos telling where the partition can be retrieved from
+ * @param timeout of the submit operation
+ * @return Future acknowledge if the partitions have been successfully updated
+ */
+ Future<Acknowledge> updatePartitions(
+ ExecutionAttemptID executionAttemptID,
+ Iterable<PartitionInfo> partitionInfos,
+ Time timeout);
+
+ /**
+ * Fail all intermediate result partitions of the given task.
+ *
+ * @param executionAttemptID identifying the task
+ */
+ void failPartition(ExecutionAttemptID executionAttemptID);
+
+ /**
+ * Notify the given task about a completed checkpoint.
+ *
+ * @param executionAttemptID identifying the task
+ * @param jobId identifying the job to which the task belongs
+ * @param checkpointId of the completed checkpoint
+ * @param timestamp of the completed checkpoint
+ */
+ void notifyCheckpointComplete(
+ ExecutionAttemptID executionAttemptID,
+ JobID jobId,
+ long checkpointId,
+ long timestamp);
+
+ /**
+ * Trigger for the given task a checkpoint.
+ *
+ * @param executionAttemptID identifying the task
+ * @param jobId identifying the job to which the task belongs
+ * @param checkpointId of the checkpoint to trigger
+ * @param timestamp of the checkpoint to trigger
+ */
+ void triggerCheckpoint(
+ ExecutionAttemptID executionAttemptID,
+ JobID jobId,
+ long checkpointId,
+ long timestamp);
+
+ /**
+ * Request the task manager log from the task manager.
+ *
+ * @param timeout for the request
+ * @return Future blob key under which the task manager log has been stored
+ */
+ Future<BlobKey> requestTaskManagerLog(final Time timeout);
+
+ /**
+ * Request the task manager stdout from the task manager.
+ *
+ * @param timeout for the request
+ * @return Future blob key under which the task manager stdout file has been stored
+ */
+ Future<BlobKey> requestTaskManagerStdout(final Time timeout);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/cfb3790f/flink-runtime/src/main/java/org/apache/flink/runtime/messages/Acknowledge.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/Acknowledge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/Acknowledge.java
index 4df4a16..4bbc50a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/Acknowledge.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/Acknowledge.java
@@ -18,10 +18,12 @@
package org.apache.flink.runtime.messages;
+import java.io.Serializable;
+
/**
* A generic acknowledgement message.
*/
-public class Acknowledge implements RequiresLeaderSessionID, java.io.Serializable {
+public class Acknowledge implements Serializable {
private static final long serialVersionUID = 7808628311617273755L;
http://git-wip-us.apache.org/repos/asf/flink/blob/cfb3790f/flink-runtime/src/main/java/org/apache/flink/runtime/messages/StackTrace.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/StackTrace.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/StackTrace.java
new file mode 100644
index 0000000..c041655
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/StackTrace.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+public class StackTrace implements Serializable {
+
+ private static final long serialVersionUID = -899464298250067416L;
+
+ private final InstanceID instanceId;
+ private final String stackTrace;
+
+ public StackTrace(InstanceID instanceId, String stackTrace) {
+ this.instanceId = Preconditions.checkNotNull(instanceId);
+ this.stackTrace = Preconditions.checkNotNull(stackTrace);
+ }
+
+ public InstanceID getInstanceId() {
+ return instanceId;
+ }
+
+ public String getStackTrace() {
+ return stackTrace;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/cfb3790f/flink-runtime/src/main/java/org/apache/flink/runtime/messages/StackTraceSampleResponse.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/StackTraceSampleResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/StackTraceSampleResponse.java
new file mode 100644
index 0000000..52dbe77
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/StackTraceSampleResponse.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Response to the TriggerStackTraceSample message.
+ */
+public class StackTraceSampleResponse implements Serializable {
+
+ private static final long serialVersionUID = -4786454630050578031L;
+
+ private final int sampleId;
+
+ private final ExecutionAttemptID executionAttemptID;
+
+ private final List<StackTraceElement[]> samples;
+
+ public StackTraceSampleResponse(
+ int sampleId,
+ ExecutionAttemptID executionAttemptID,
+ List<StackTraceElement[]> samples) {
+ this.sampleId = sampleId;
+ this.executionAttemptID = Preconditions.checkNotNull(executionAttemptID);
+ this.samples = Preconditions.checkNotNull(samples);
+ }
+
+ public int getSampleId() {
+ return sampleId;
+ }
+
+ public ExecutionAttemptID getExecutionAttemptID() {
+ return executionAttemptID;
+ }
+
+ public List<StackTraceElement[]> getSamples() {
+ return samples;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/cfb3790f/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 80bdb73..27100fd 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -20,16 +20,18 @@ package org.apache.flink.runtime.akka
import java.io.IOException
import java.net._
-import java.util.concurrent.{TimeUnit, Callable}
+import java.util.concurrent.{Callable, TimeUnit}
import akka.actor._
import akka.pattern.{ask => akkaAsk}
-import com.typesafe.config.{ConfigValueFactory, ConfigParseOptions, Config, ConfigFactory}
+import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions, ConfigValueFactory}
+import org.apache.flink.api.common.time.Time
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.net.SSLUtils
import org.apache.flink.util.NetUtils
-import org.jboss.netty.logging.{Slf4JLoggerFactory, InternalLoggerFactory}
+import org.jboss.netty.logging.{InternalLoggerFactory, Slf4JLoggerFactory}
import org.slf4j.LoggerFactory
+
import scala.concurrent._
import scala.concurrent.duration._
import scala.language.postfixOps
@@ -545,10 +547,16 @@ object AkkaUtils {
new FiniteDuration(duration.toMillis, TimeUnit.MILLISECONDS)
}
- def getDefaultTimeout: FiniteDuration = {
+ def getDefaultTimeout: Time = {
val duration = Duration(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT)
- new FiniteDuration(duration.toMillis, TimeUnit.MILLISECONDS)
+ Time.milliseconds(duration.toMillis)
+ }
+
+ def getDefaultTimeoutAsFiniteDuration: FiniteDuration = {
+ val timeout = getDefaultTimeout
+
+ new FiniteDuration(timeout.toMilliseconds, TimeUnit.MILLISECONDS)
}
def getLookupTimeout(config: Configuration): FiniteDuration = {