You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/04/06 17:49:37 UTC
[4/7] flink git commit: [FLINK-1580] [taskmanager] Improve
TaskManager startup robustness
[FLINK-1580] [taskmanager] Improve TaskManager startup robustness
- Initialize network buffer pool and memory manager before the
asynchronous actor start
- Split messages into more topic-related groups
- Split message handling logic on TaskManager into topic-related functions
- Simplify registration logic
- Add a lot of tests that validate
- task manager configuration checking
- task manager registration
- task manager re-registration on failures/disconnects
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e74521c1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e74521c1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e74521c1
Branch: refs/heads/master
Commit: e74521c1fe5f5842185be5c454d27adc3bd254d5
Parents: 1da4b64
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Apr 6 14:49:09 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Apr 6 15:30:44 2015 +0200
----------------------------------------------------------------------
.../IllegalConfigurationException.java | 35 +-
.../flink/runtime/execution/ExecutionState.java | 29 +-
.../flink/runtime/executiongraph/Execution.java | 21 +-
.../instance/InstanceConnectionInfo.java | 3 +-
.../org/apache/flink/runtime/instance/Slot.java | 2 +-
.../runtime/io/network/NetworkEnvironment.java | 6 +-
.../memorymanager/DefaultMemoryManager.java | 6 +
.../runtime/memorymanager/MemoryManager.java | 9 +-
.../apache/flink/runtime/taskmanager/Task.java | 8 +-
.../taskmanager/TaskInputSplitProvider.java | 16 +-
.../apache/flink/runtime/ActorLogMessages.scala | 4 +-
.../apache/flink/runtime/akka/AkkaUtils.scala | 11 +
.../flink/runtime/jobmanager/JobManager.scala | 119 +-
.../StreamCheckpointCoordinator.scala | 51 +-
.../messages/CheckpointingMessages.scala | 52 +
.../messages/ExecutionGraphMessages.scala | 2 +-
.../runtime/messages/JobManagerMessages.scala | 17 +-
.../flink/runtime/messages/Messages.scala | 25 +-
.../runtime/messages/RegistrationMessages.scala | 59 +-
.../runtime/messages/TaskManagerMessages.scala | 177 +-
.../flink/runtime/messages/TaskMessages.scala | 171 ++
.../minicluster/LocalFlinkMiniCluster.scala | 14 +-
.../flink/runtime/taskmanager/TaskManager.scala | 1768 +++++++++++-------
.../taskmanager/TaskManagerConfiguration.scala | 11 +-
.../executiongraph/ExecutionGraphTestUtils.java | 24 +-
.../ExecutionVertexCancelTest.java | 10 +-
.../ExecutionVertexDeploymentTest.java | 2 +-
.../runtime/taskmanager/RegistrationTest.java | 379 ++++
...askManagerComponentsStartupShutdownTest.java | 141 ++
.../TaskManagerConfigurationTest.java | 140 ++
.../TaskManagerProcessReapingTest.java | 2 +-
.../runtime/taskmanager/TaskManagerTest.java | 447 +++--
.../flink/runtime/taskmanager/TaskTest.java | 1 -
.../jobmanager/JobManagerConnectionTest.scala | 14 +-
.../TaskManagerRegistrationITCase.scala | 132 --
.../TaskManagerRegistrationTest.scala | 139 ++
.../runtime/testingUtils/TestingCluster.scala | 14 +-
.../testingUtils/TestingTaskManager.scala | 33 +-
.../runtime/testingUtils/TestingUtils.scala | 37 +-
.../api/streamvertex/StreamVertex.java | 7 +-
.../test/util/ForkableFlinkMiniCluster.scala | 10 +-
.../AbstractProcessFailureRecoveryTest.java | 6 +-
.../yarn/appMaster/YarnTaskManagerRunner.java | 2 +-
.../org/apache/flink/yarn/YarnTaskManager.scala | 17 +-
44 files changed, 2842 insertions(+), 1331 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-core/src/main/java/org/apache/flink/configuration/IllegalConfigurationException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/IllegalConfigurationException.java b/flink-core/src/main/java/org/apache/flink/configuration/IllegalConfigurationException.java
index 1e71fc7..e6a2022 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/IllegalConfigurationException.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/IllegalConfigurationException.java
@@ -16,42 +16,35 @@
* limitations under the License.
*/
-
package org.apache.flink.configuration;
/**
- * An <code>IllegalConfigurationException</code> is thrown when the user
- * has configured job vertices in a way that either conflicts
- * with the expected usage of the respective task of the configuration
- * of the Nephele framework.
+ * An {@code IllegalConfigurationException} is thrown when
+ * the values in a given {@link Configuration} are not valid. This may refer
+ * to the Flink configuration with which the framework is started,
+ * or a Configuration passed internally between components.
*/
public class IllegalConfigurationException extends RuntimeException {
- /**
- * Generated serial UID.
- */
private static final long serialVersionUID = 695506964810499989L;
/**
- * Constructs an new illegal configuration exception with the given error message.
+ * Constructs an new IllegalConfigurationException with the given error message.
*
- * @param errorMsg
- * the error message to be included in the exception
+ * @param message The error message for the exception.
*/
- public IllegalConfigurationException(final String errorMsg) {
- super(errorMsg);
+ public IllegalConfigurationException(String message) {
+ super(message);
}
/**
- * Constructs an new illegal configuration exception with the given error message
+ * Constructs an new IllegalConfigurationException with the given error message
* and a given cause.
- *
- * @param errorMsg
- * The error message to be included in the exception.
- * @param cause
- * The exception that caused this exception.
+ *
+ * @param message The error message for the exception.
+ * @param cause The exception that caused this exception.
*/
- public IllegalConfigurationException(final String errorMsg, final Throwable cause) {
- super(errorMsg, cause);
+ public IllegalConfigurationException(String message, Throwable cause) {
+ super(message, cause);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java
index e5235e3..2fcaea1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java
@@ -18,6 +18,28 @@
package org.apache.flink.runtime.execution;
+/**
+ * An enumeration of all states that a task can be in during its execution.
+ * Tasks usually start in the state {@code CREATED} and switch states according to
+ * this diagram:
+ * <pre>
+ *
+ * CREATED -> SCHEDULED -> DEPLOYING -> RUNNING -> FINISHED
+ * | | |
+ * | | +------+
+ * | V V
+ * | CANCELLING -----+----> CANCELED
+ * | |
+ * +-------------------------+
+ *
+ * ... -> FAILED
+ * </pre>
+ *
+ * It is possible to enter the {@code FAILED} state from any other state.
+ *
+ * The states {@code FINISHED}, {@code CANCELED}, and {@code FAILED} are
+ * considered terminal states.
+ */
public enum ExecutionState {
CREATED,
@@ -34,5 +56,10 @@ public enum ExecutionState {
CANCELED,
- FAILED
+ FAILED;
+
+
+ public boolean isTerminal() {
+ return this == FINISHED || this == CANCELED || this == FAILED;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index a9ee105..3ba378c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -43,7 +43,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFutureAction;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult;
+import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
@@ -68,12 +68,13 @@ import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
import static org.apache.flink.runtime.execution.ExecutionState.RUNNING;
import static org.apache.flink.runtime.execution.ExecutionState.SCHEDULED;
-import static org.apache.flink.runtime.messages.TaskManagerMessages.CancelTask;
-import static org.apache.flink.runtime.messages.TaskManagerMessages.FailIntermediateResultPartitions;
-import static org.apache.flink.runtime.messages.TaskManagerMessages.SubmitTask;
-import static org.apache.flink.runtime.messages.TaskManagerMessages.UpdateTask;
-import static org.apache.flink.runtime.messages.TaskManagerMessages.UpdateTaskSinglePartitionInfo;
-import static org.apache.flink.runtime.messages.TaskManagerMessages.createUpdateTaskMultiplePartitionInfos;
+
+import static org.apache.flink.runtime.messages.TaskMessages.CancelTask;
+import static org.apache.flink.runtime.messages.TaskMessages.FailIntermediateResultPartitions;
+import static org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
+import static org.apache.flink.runtime.messages.TaskMessages.UpdatePartitionInfo;
+import static org.apache.flink.runtime.messages.TaskMessages.UpdateTaskSinglePartitionInfo;
+import static org.apache.flink.runtime.messages.TaskMessages.createUpdateTaskMultiplePartitionInfos;
/**
* A single execution of a vertex. While an {@link ExecutionVertex} can be executed multiple times (for recovery,
@@ -526,7 +527,7 @@ public class Execution implements Serializable {
final InputChannelDeploymentDescriptor descriptor = new InputChannelDeploymentDescriptor(
partitionId, partitionLocation);
- final UpdateTask updateTaskMessage = new UpdateTaskSinglePartitionInfo(
+ final UpdatePartitionInfo updateTaskMessage = new UpdateTaskSinglePartitionInfo(
consumer.getAttemptId(), partition.getIntermediateResult().getId(), descriptor);
sendUpdateTaskRpcCall(consumerSlot, updateTaskMessage);
@@ -685,7 +686,7 @@ public class Execution implements Serializable {
inputChannelDeploymentDescriptors.add(partialInputChannelDeploymentDescriptor.createInputChannelDeploymentDescriptor(this));
}
- UpdateTask updateTaskMessage =
+ UpdatePartitionInfo updateTaskMessage =
createUpdateTaskMultiplePartitionInfos(attemptId, resultIDs,
inputChannelDeploymentDescriptors);
@@ -845,7 +846,7 @@ public class Execution implements Serializable {
}
private void sendUpdateTaskRpcCall(final SimpleSlot consumerSlot,
- final UpdateTask updateTaskMsg) {
+ final UpdatePartitionInfo updateTaskMsg) {
if (consumerSlot != null) {
final Instance instance = consumerSlot.getInstance();
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
index ee79c23..eb87292 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
@@ -101,7 +101,8 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
// take IP textual representation
this.hostName = this.fqdnHostName;
LOG.warn("No hostname could be resolved for the IP address {}, using IP address as host name. "
- + "Local input split assignment (such as for HDFS files) may be impacted.");
+ + "Local input split assignment (such as for HDFS files) may be impacted.",
+ this.inetAddress.getHostAddress());
}
else {
this.hostName = NetUtils.getHostnameFromFQDN(this.fqdnHostName);
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
index bf8464c..082fbf2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
@@ -184,7 +184,7 @@ public abstract class Slot {
return "(" + slotNumber + ")" + (getParent() != null ? getParent().hierarchy() : "");
}
- private static final String getStateName(int state) {
+ private static String getStateName(int state) {
switch (state) {
case ALLOCATED_AND_ALIVE:
return "ALLOCATED/ALIVE";
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 55b89b4..dbf1586 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -49,7 +49,7 @@ import java.io.IOException;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
-import static org.apache.flink.runtime.messages.TaskManagerMessages.FailTask;
+import static org.apache.flink.runtime.messages.TaskMessages.FailTask;
/**
* Network I/O components of each {@link TaskManager} instance. The network environment contains
@@ -165,6 +165,7 @@ public class NetworkEnvironment {
{
// good, not currently associated. start the individual components
+ LOG.debug("Starting result partition manager and network connection manager");
this.partitionManager = new ResultPartitionManager();
this.taskEventDispatcher = new TaskEventDispatcher();
this.partitionConsumableNotifier = new JobManagerResultPartitionConsumableNotifier(
@@ -176,6 +177,7 @@ public class NetworkEnvironment {
: new LocalConnectionManager();
try {
+ LOG.debug("Starting network connection manager");
connectionManager.start(partitionManager, taskEventDispatcher, networkBufferPool);
}
catch (Throwable t) {
@@ -312,7 +314,7 @@ public class NetworkEnvironment {
}
public void unregisterTask(Task task) {
- LOG.debug("Unregistering task {} ({}) from network environment (state: {}).",
+ LOG.debug("Unregister task {} from network environment (state: {}).",
task.getTaskNameWithSubtasks(), task.getExecutionState());
final ExecutionAttemptID executionId = task.getExecutionId();
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java
index cd677ac..28ebe13 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java
@@ -165,6 +165,12 @@ public class DefaultMemoryManager implements MemoryManager {
// -------------------- END CRITICAL SECTION -------------------
}
+ @Override
+ public boolean isShutdown() {
+ return this.isShutDown;
+ }
+
+ @Override
public boolean verifyEmpty() {
synchronized (this.lock) {
return this.freeSegments.size() == this.totalNumPages;
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/MemoryManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/MemoryManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/MemoryManager.java
index 1ab6931..631f0b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/MemoryManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/MemoryManager.java
@@ -117,7 +117,14 @@ public interface MemoryManager {
* code that allocated them from the memory manager.
*/
void shutdown();
-
+
+ /**
+ * Checks whether the MemoryManager has been shut down.
+ *
+ * @return True, if the memory manager is shut down, false otherwise.
+ */
+ boolean isShutdown();
+
/**
* Checks if the memory manager all memory available.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 3d1419a..f6eb907 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -30,8 +30,8 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.messages.ExecutionGraphMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.TaskManagerMessages.UnregisterTask;
+import org.apache.flink.runtime.messages.TaskMessages;
+import org.apache.flink.runtime.messages.TaskMessages.UnregisterTask;
import org.apache.flink.runtime.profiling.TaskManagerProfiler;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
@@ -304,7 +304,7 @@ public class Task {
}
}
else {
- throw new RuntimeException("unexpected state for cancelling: " + current);
+ throw new RuntimeException("unexpected state for failing the task: " + current);
}
}
}
@@ -361,7 +361,7 @@ public class Task {
Throwable optionalError) {
LOG.info("Update execution state of {} ({}) to {}.", this.getTaskName(),
this.getExecutionId(), executionState);
- taskManager.tell(new JobManagerMessages.UpdateTaskExecutionState(
+ taskManager.tell(new TaskMessages.UpdateTaskExecutionState(
new TaskExecutionState(jobId, executionId, executionState, optionalError)),
ActorRef.noSender());
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
index 6e446de..1bdc346 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
@@ -28,12 +28,10 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.apache.flink.util.InstantiationUtil;
import scala.concurrent.Await;
import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
public class TaskInputSplitProvider implements InputSplitProvider {
@@ -47,11 +45,11 @@ public class TaskInputSplitProvider implements InputSplitProvider {
private final ClassLoader usercodeClassLoader;
- private final FiniteDuration timeout;
+ private final Timeout timeout;
public TaskInputSplitProvider(ActorRef jobManager, JobID jobId, JobVertexID vertexId,
ExecutionAttemptID executionID, ClassLoader userCodeClassLoader,
- FiniteDuration timeout)
+ Timeout timeout)
{
this.jobManager = jobManager;
this.jobId = jobId;
@@ -66,20 +64,20 @@ public class TaskInputSplitProvider implements InputSplitProvider {
try {
final Future<Object> response = Patterns.ask(jobManager,
new JobManagerMessages.RequestNextInputSplit(jobId, vertexId, executionID),
- new Timeout(timeout));
+ timeout);
- final Object result = Await.result(response, timeout);
+ final Object result = Await.result(response, timeout.duration());
if (result == null) {
return null;
}
- if(!(result instanceof TaskManagerMessages.NextInputSplit)){
+ if(!(result instanceof JobManagerMessages.NextInputSplit)){
throw new RuntimeException("RequestNextInputSplit requires a response of type " +
"NextInputSplit. Instead response is of type " + result.getClass() + ".");
} else {
- final TaskManagerMessages.NextInputSplit nextInputSplit =
- (TaskManagerMessages.NextInputSplit) result;
+ final JobManagerMessages.NextInputSplit nextInputSplit =
+ (JobManagerMessages.NextInputSplit) result;
byte[] serializedData = nextInputSplit.splitData();
Object deserialized = InstantiationUtil.deserializeObject(serializedData,
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
index 5d4f89c..acd4346 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
@@ -37,14 +37,14 @@ trait ActorLogMessages {
_receiveWithLogMessages(x)
}
else {
- log.debug(s"Received message $x at ${that.self.path} from ${that.sender}.")
+ log.debug(s"Received message $x at ${that.self.path} from ${that.sender()}.")
val start = System.nanoTime()
_receiveWithLogMessages(x)
val duration = (System.nanoTime() - start) / 1000000
- log.debug(s"Handled message $x in $duration ms from ${that.sender}.")
+ log.debug(s"Handled message $x in $duration ms from ${that.sender()}.")
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index e898f44..5b33017 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -44,6 +44,17 @@ object AkkaUtils {
var globalExecutionContext: ExecutionContext = ExecutionContext.global
/**
+ * Creates a local actor system without remoting.
+ *
+ * @param configuration instance containing the user provided configuration values
+ * @return The created actor system
+ */
+ def createLocalActorSystem(configuration: Configuration): ActorSystem = {
+ val akkaConfig = getAkkaConfig(configuration, None)
+ createActorSystem(akkaConfig)
+ }
+
+ /**
* Creates an actor system. If a listening address is specified, then the actor system will listen
* on that address for messages from a remote actor system. If not, then a local actor system
* will be instantiated.
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 4b0a55b..49bb1d5 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -30,8 +30,10 @@ import org.apache.flink.runtime.client.{JobStatusMessage, JobSubmissionException
import org.apache.flink.runtime.executiongraph.{ExecutionJobVertex, ExecutionGraph}
import org.apache.flink.runtime.jobmanager.web.WebInfoServer
import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
+import org.apache.flink.runtime.messages.CheckpointingMessages.{StateBarrierAck, BarrierAck}
import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
import org.apache.flink.runtime.messages.Messages.{Disconnect, Acknowledge}
+import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState
import org.apache.flink.runtime.process.ProcessReaper
import org.apache.flink.runtime.security.SecurityUtils
import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
@@ -46,9 +48,9 @@ import org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager
import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
import org.apache.flink.runtime.messages.JobManagerMessages._
import org.apache.flink.runtime.messages.RegistrationMessages._
-import org.apache.flink.runtime.messages.TaskManagerMessages.{SendStackTrace, NextInputSplit, Heartbeat}
+import org.apache.flink.runtime.messages.TaskManagerMessages.{SendStackTrace, Heartbeat}
import org.apache.flink.runtime.profiling.ProfilingUtils
-import org.apache.flink.util.InstantiationUtil
+import org.apache.flink.util.{ExceptionUtils, InstantiationUtil}
import org.slf4j.LoggerFactory
@@ -76,7 +78,7 @@ import scala.collection.JavaConverters._
* is indicated by [[CancellationSuccess]] and a failure by [[CancellationFailure]]
*
* - [[UpdateTaskExecutionState]] is sent by a TaskManager to update the state of an
- * [[org.apache.flink.runtime.executiongraph.ExecutionVertex]] contained in the [[ExecutionGraph]].
+ ExecutionVertex contained in the [[ExecutionGraph]].
* A successful update is acknowledged by true and otherwise false.
*
* - [[RequestNextInputSplit]] requests the next input split for a running task on a
@@ -103,17 +105,17 @@ class JobManager(val flinkConfiguration: Configuration,
/** List of current jobs running jobs */
val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]()
-
+
/**
* Run when the job manager is started. Simply logs an informational message.
*/
override def preStart(): Unit = {
- LOG.info(s"Starting JobManager at ${self.path}.")
+ LOG.info(s"Starting JobManager at ${self.path.toSerializationFormat}.")
}
override def postStop(): Unit = {
- log.info(s"Stopping job manager ${self.path}.")
+ log.info(s"Stopping JobManager ${self.path.toSerializationFormat}.")
// disconnect the registered task managers
instanceManager.getAllRegisteredInstances.asScala.foreach {
@@ -148,29 +150,38 @@ class JobManager(val flinkConfiguration: Configuration,
*/
override def receiveWithLogMessages: Receive = {
- case RegisterTaskManager(connectionInfo, hardwareInformation, numberOfSlots) =>
- val taskManager = sender
+ case RegisterTaskManager(taskManager, connectionInfo, hardwareInformation, numberOfSlots) =>
if (instanceManager.isRegistered(taskManager)) {
val instanceID = instanceManager.getRegisteredInstance(taskManager).getId
- taskManager ! AlreadyRegistered(instanceID, libraryCacheManager.getBlobServerPort, profiler)
- } else {
- val instanceID = try {
- instanceManager.registerTaskManager(taskManager, connectionInfo,
+ // IMPORTANT: Send the response to the "sender", which is not the
+ // TaskManager actor, but the ask future!
+ sender() ! AlreadyRegistered(self, instanceID, libraryCacheManager.getBlobServerPort)
+ }
+ else {
+ try {
+ val instanceID = instanceManager.registerTaskManager(taskManager, connectionInfo,
hardwareInformation, numberOfSlots)
- } catch {
+
+ // IMPORTANT: Send the response to the "sender", which is not the
+ // TaskManager actor, but the ask future!
+ sender() ! AcknowledgeRegistration(self, instanceID,
+ libraryCacheManager.getBlobServerPort)
+
+ // to be notified when the taskManager is no longer reachable
+ context.watch(taskManager)
+ }
+ catch {
// registerTaskManager throws an IllegalStateException if it is already shut down
// let the actor crash and restart itself in this case
- case ex: Exception => throw new RuntimeException(s"Could not register the task manager " +
- s"${taskManager.path} at the instance manager.", ex)
- }
-
- // to be notified when the taskManager is no longer reachable
- context.watch(taskManager)
+ case e: Exception =>
+ log.error(e, "Failed to register TaskManager at instance manager")
- taskManager ! AcknowledgeRegistration(instanceID, libraryCacheManager.getBlobServerPort,
- profiler)
+ // IMPORTANT: Send the response to the "sender", which is not the
+ // TaskManager actor, but the ask future!
+ sender() ! RefuseRegistration(ExceptionUtils.stringifyException(e))
+ }
}
case RequestNumberRegisteredTaskManager =>
@@ -205,7 +216,7 @@ class JobManager(val flinkConfiguration: Configuration,
} else {
currentJobs.get(taskExecutionState.getJobID) match {
case Some((executionGraph, _)) =>
- val originalSender = sender
+ val originalSender = sender()
Future {
val result = executionGraph.updateState(taskExecutionState)
@@ -302,7 +313,7 @@ class JobManager(val flinkConfiguration: Configuration,
}
removeJob(jobID)
-
+
}
case None =>
removeJob(jobID)
@@ -320,7 +331,7 @@ class JobManager(val flinkConfiguration: Configuration,
jobExecution._1.getStateCheckpointerActor forward msg
case None =>
}
-
+
case ScheduleOrUpdateConsumers(jobId, partitionId) =>
currentJobs.get(jobId) match {
case Some((executionGraph, _)) =>
@@ -343,7 +354,7 @@ class JobManager(val flinkConfiguration: Configuration,
} catch {
case t: Throwable =>
log.error(t, "Could not process accumulator event of job {} received from {}.",
- accumulatorEvent.getJobID, sender.path)
+ accumulatorEvent.getJobID, sender().path)
}
case RequestAccumulatorResults(jobID) =>
@@ -396,9 +407,10 @@ class JobManager(val flinkConfiguration: Configuration,
case Heartbeat(instanceID, metricsReport) =>
try {
+ log.debug("Received hearbeat message from {}", instanceID)
instanceManager.reportHeartBeat(instanceID, metricsReport)
} catch {
- case t: Throwable => log.error(t, "Could not report heart beat from {}.", sender.path)
+ case t: Throwable => log.error(t, "Could not report heart beat from {}.", sender().path)
}
case RequestStackTrace(instanceID) =>
@@ -414,10 +426,10 @@ class JobManager(val flinkConfiguration: Configuration,
}
case RequestJobManagerStatus =>
- sender ! JobManagerStatusAlive
+ sender() ! JobManagerStatusAlive
case Disconnect(msg) =>
- val taskManager = sender
+ val taskManager = sender()
if (instanceManager.isRegistered(taskManager)) {
log.info("Task manager {} wants to disconnect, because {}.", taskManager.path, msg)
@@ -474,7 +486,7 @@ class JobManager(val flinkConfiguration: Configuration,
executionGraph = currentJobs.getOrElseUpdate(jobGraph.getJobID,
(new ExecutionGraph(jobGraph.getJobID, jobGraph.getName,
jobGraph.getJobConfiguration, timeout, jobGraph.getUserJarBlobKeys, userCodeLoader),
- JobInfo(sender, System.currentTimeMillis())))._1
+ JobInfo(sender(), System.currentTimeMillis())))._1
// configure the execution graph
val jobNumberRetries = if (jobGraph.getNumberOfExecutionRetries >= 0) {
@@ -486,7 +498,7 @@ class JobManager(val flinkConfiguration: Configuration,
executionGraph.setDelayBeforeRetrying(delayBetweenRetries)
executionGraph.setScheduleMode(jobGraph.getScheduleMode)
executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling)
-
+
executionGraph.setCheckpointingEnabled(jobGraph.isCheckpointingEnabled)
executionGraph.setCheckpointingInterval(jobGraph.getCheckpointingInterval)
@@ -532,15 +544,15 @@ class JobManager(val flinkConfiguration: Configuration,
}
// give an actorContext
- executionGraph.setParentContext(context);
-
+ executionGraph.setParentContext(context)
+
// get notified about job status changes
executionGraph.registerJobStatusListener(self)
if (listenToEvents) {
// the sender wants to be notified about state changes
- executionGraph.registerExecutionListener(sender)
- executionGraph.registerJobStatusListener(sender)
+ executionGraph.registerExecutionListener(sender())
+ executionGraph.registerJobStatusListener(sender())
}
// done with submitting the job
@@ -769,9 +781,12 @@ object JobManager {
if (executionMode == JobManagerMode.LOCAL) {
LOG.info("Starting embedded TaskManager for JobManager's LOCAL execution mode")
- val taskManagerActor = TaskManager.startTaskManagerActor(
- configuration, jobManagerSystem, listeningAddress,
- TaskManager.TASK_MANAGER_NAME, true, true, classOf[TaskManager])
+ val taskManagerActor = TaskManager.startTaskManagerComponentsAndActor(
+ configuration, jobManagerSystem,
+ listeningAddress,
+ Some(TaskManager.TASK_MANAGER_NAME),
+ Some(jobManager.path.toString),
+ true, classOf[TaskManager])
LOG.debug("Starting TaskManager process reaper")
jobManagerSystem.actorOf(
@@ -965,6 +980,25 @@ object JobManager {
def startJobManagerActors(configuration: Configuration,
actorSystem: ActorSystem): (ActorRef, ActorRef) = {
+ startJobManagerActors(configuration,actorSystem, Some(JOB_MANAGER_NAME), Some(ARCHIVE_NAME))
+ }
+ /**
+ * Starts the JobManager and job archiver based on the given configuration, in the
+ * given actor system.
+ *
+ * @param configuration The configuration for the JobManager
+ * @param actorSystem The actor system running the JobManager
+ * @param jobMangerActorName Optionally the name of the JobManager actor. If none is given,
+ * the actor will have the name generated by the actor system.
+ * @param archiverActorName Optionally the name of the archive actor. If none is given,
+ * the actor will have the name generated by the actor system.
+ * @return A tuple of references (JobManager Ref, Archiver Ref)
+ */
+ def startJobManagerActors(configuration: Configuration,
+ actorSystem: ActorSystem,
+ jobMangerActorName: Option[String],
+ archiverActorName: Option[String]): (ActorRef, ActorRef) = {
+
val (instanceManager, scheduler, libraryCacheManager, archiveProps, accumulatorManager,
profilerProps, executionRetries, delayBetweenRetries,
timeout, _) = createJobManagerComponents(configuration)
@@ -972,13 +1006,20 @@ object JobManager {
val profiler: Option[ActorRef] =
profilerProps.map( props => actorSystem.actorOf(props, PROFILER_NAME) )
- val archiver: ActorRef = actorSystem.actorOf(archiveProps, JobManager.ARCHIVE_NAME)
+ // start the archiver wither with the given name, or without (avoid name conflicts)
+ val archiver: ActorRef = archiverActorName match {
+ case Some(actorName) => actorSystem.actorOf(archiveProps, actorName)
+ case None => actorSystem.actorOf(archiveProps)
+ }
val jobManagerProps = Props(classOf[JobManager], configuration, instanceManager, scheduler,
libraryCacheManager, archiver, accumulatorManager, profiler, executionRetries,
delayBetweenRetries, timeout)
- val jobManager = startActor(jobManagerProps, actorSystem)
+ val jobManager: ActorRef = jobMangerActorName match {
+ case Some(actorName) => actorSystem.actorOf(jobManagerProps, actorName)
+ case None => actorSystem.actorOf(jobManagerProps)
+ }
(jobManager, archiver)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
index f42d08ab..48266e2 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
@@ -18,19 +18,19 @@
package org.apache.flink.runtime.jobmanager
-import java.lang.Long
+import java.lang.{Long => JLong}
import akka.actor._
-import org.apache.flink.api.common.JobID
import org.apache.flink.runtime.ActorLogMessages
-import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph, ExecutionVertex}
+import org.apache.flink.runtime.execution.ExecutionState
+import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionVertex}
import org.apache.flink.runtime.jobgraph.JobStatus._
import org.apache.flink.runtime.jobgraph.JobVertexID
+import org.apache.flink.runtime.messages.CheckpointingMessages._
import org.apache.flink.runtime.state.StateHandle
import scala.collection.JavaConversions._
import scala.collection.immutable.TreeMap
-import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.{FiniteDuration, _}
/**
@@ -63,15 +63,18 @@ import scala.concurrent.duration.{FiniteDuration, _}
*/
class StreamCheckpointCoordinator(val executionGraph: ExecutionGraph,
- val vertices: Iterable[ExecutionVertex],
- var acks: Map[(JobVertexID,Int),List[Long]],
- var states: Map[(JobVertexID, Integer, Long),
- StateHandle],
- val interval: FiniteDuration,var curId: Long,var ackId: Long)
- extends Actor with ActorLogMessages with ActorLogging {
-
+ val vertices: Iterable[ExecutionVertex],
+ var acks: Map[(JobVertexID,Int),List[JLong]],
+ var states: Map[(JobVertexID, Integer, JLong), StateHandle],
+ val interval: FiniteDuration,
+ var curId: JLong,
+ var ackId: JLong)
+extends Actor with ActorLogMessages with ActorLogging {
+
+ implicit private val executor = context.dispatcher
+
override def receiveWithLogMessages: Receive = {
-
+
case InitBarrierScheduler =>
context.system.scheduler.schedule(interval,interval,self,BarrierTimeout)
context.system.scheduler.schedule(2 * interval,2 * interval,self,CompactAndUpdate)
@@ -87,7 +90,7 @@ class StreamCheckpointCoordinator(val executionGraph: ExecutionGraph,
curId += 1
log.debug("Sending Barrier to vertices of Job " + executionGraph.getJobName)
vertices.filter(v => v.getJobVertex.getJobVertex.isInputVertex &&
- v.getExecutionState == RUNNING).foreach(vertex
+ v.getExecutionState == ExecutionState.RUNNING).foreach(vertex
=> vertex.getCurrentAssignedResource.getInstance.getTaskManager
! BarrierReq(vertex.getCurrentExecutionAttempt.getAttemptId,curId))
case _ =>
@@ -105,16 +108,17 @@ class StreamCheckpointCoordinator(val executionGraph: ExecutionGraph,
acks += (jobVertexID,instanceID) -> (checkpointID :: acklist)
case None =>
}
- log.debug(acks.toString)
+ log.debug(acks.toString())
case CompactAndUpdate =>
- val barrierCount = acks.values.foldLeft(TreeMap[Long,Int]().withDefaultValue(0))((dict,myList)
+ val barrierCount =
+ acks.values.foldLeft(TreeMap[JLong,Int]().withDefaultValue(0))((dict,myList)
=> myList.foldLeft(dict)((dict2,elem) => dict2.updated(elem,dict2(elem) + 1)))
val keysToKeep = barrierCount.filter(_._2 == acks.size).keys
- ackId = if(!keysToKeep.isEmpty) keysToKeep.max else ackId
+ ackId = if(keysToKeep.nonEmpty) keysToKeep.max else ackId
acks.keys.foreach(x => acks = acks.updated(x,acks(x).filter(_ >= ackId)))
states = states.filterKeys(_._3 >= ackId)
- log.debug("Last global barrier is " + ackId)
+ log.debug("[FT-MONITOR] Last global barrier is " + ackId)
executionGraph.loadOperatorStates(states)
}
@@ -128,7 +132,7 @@ object StreamCheckpointCoordinator {
val vertices: Iterable[ExecutionVertex] = getExecutionVertices(executionGraph)
val monitor = context.system.actorOf(Props(new StreamCheckpointCoordinator(executionGraph,
vertices,vertices.map(x => ((x.getJobVertex.getJobVertexId,x.getParallelSubtaskIndex),
- List.empty[Long])).toMap, Map() ,interval,0L,-1L)))
+ List.empty[JLong])).toMap, Map() ,interval,0L,-1L)))
monitor ! InitBarrierScheduler
monitor
}
@@ -145,14 +149,3 @@ case class BarrierTimeout()
case class InitBarrierScheduler()
case class CompactAndUpdate()
-
-case class BarrierReq(attemptID: ExecutionAttemptID,checkpointID: Long)
-
-case class BarrierAck(jobID: JobID,jobVertexID: JobVertexID,instanceID: Int,checkpointID: Long)
-
-case class StateBarrierAck(jobID: JobID, jobVertexID: JobVertexID, instanceID: Integer,
- checkpointID: Long, states: StateHandle)
-
-
-
-
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/CheckpointingMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/CheckpointingMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/CheckpointingMessages.scala
new file mode 100644
index 0000000..9f6f51a
--- /dev/null
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/CheckpointingMessages.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages
+
+import org.apache.flink.api.common.JobID
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
+import org.apache.flink.runtime.jobgraph.JobVertexID
+import org.apache.flink.runtime.state.StateHandle
+
+/**
+ * Actor messages specific to checkpoints (triggering, acknowledging,
+ * state transfer, ...)
+ */
+object CheckpointingMessages {
+
+ /**
+ * Abstract base trait for all checkpoint messages.
+ */
+ trait CheckpointingMessage
+
+ // --------------------------------------------------------------------------
+
+ case class BarrierReq(attemptID: ExecutionAttemptID,
+ checkpointID: Long) extends CheckpointingMessage
+
+ case class BarrierAck(jobID: JobID,
+ jobVertexID:JobVertexID,
+ instanceID: Int,
+ checkpointID: Long) extends CheckpointingMessage
+
+ case class StateBarrierAck(jobID: JobID,
+ jobVertexID: JobVertexID,
+ instanceID: Integer,
+ checkpointID: java.lang.Long,
+ states: StateHandle) extends CheckpointingMessage
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
index 6785c31..6bc59a2 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
@@ -64,7 +64,7 @@ object ExecutionGraphMessages {
/**
* Denotes the job state change of a job.
*
- * @param jobID identifying the correspong job
+ * @param jobID identifying the corresponding job
* @param newJobStatus
* @param timestamp
* @param error
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index dab4671..73e20c0 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -25,7 +25,6 @@ import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGra
import org.apache.flink.runtime.instance.{InstanceID, Instance}
import org.apache.flink.runtime.io.network.partition.ResultPartitionID
import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID}
-import org.apache.flink.runtime.taskmanager.TaskExecutionState
import scala.collection.JavaConverters._
@@ -53,14 +52,6 @@ object JobManagerMessages {
case class CancelJob(jobID: JobID)
/**
- * Denotes a state change of a task at the JobManager. The update success is acknowledged by a
- * boolean value which is sent back to the sender.
- *
- * @param taskExecutionState
- */
- case class UpdateTaskExecutionState(taskExecutionState: TaskExecutionState)
-
- /**
* Requesting next input split for the
* [[org.apache.flink.runtime.executiongraph.ExecutionJobVertex]]
* of the job specified by [[jobID]]. The next input split is sent back to the sender as a
@@ -73,6 +64,14 @@ object JobManagerMessages {
ExecutionAttemptID)
/**
+ * Contains the next input split for a task. This message is a response to
+ * [[org.apache.flink.runtime.messages.JobManagerMessages.RequestNextInputSplit]].
+ *
+ * @param splitData
+ */
+ case class NextInputSplit(splitData: Array[Byte])
+
+ /**
* Notifies the [[org.apache.flink.runtime.jobmanager.JobManager]] about available data for a
* produced partition.
* <p>
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala
index 91c3b2d..ab8b8c2 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala
@@ -18,17 +18,34 @@
package org.apache.flink.runtime.messages
+/**
+ * Generic messages between JobManager, TaskManager, JobClient.
+ */
object Messages {
/**
* Message to signal the successful reception of another message
*/
- case object Acknowledge
+ case object Acknowledge {
+
+ /**
+ * Accessor for the case object instance, to simplify Java interoperability.
+ *
+ * @return The Acknowledge case object instance.
+ */
+ def get(): Acknowledge.type = this
+ }
/**
- * Signals that the JobManager/TaskManager shall disconnect from the sender
- * (TaskManager/JobManager)
- * @param reason
+ * Signals that the receiver (JobManager/TaskManager) shall disconnect the sender.
+ *
+ * The TaskManager may send this on shutdown to let the JobManager realize the TaskManager
+ * loss more quickly.
+ *
+ * The JobManager may send this message to its TaskManagers to let them clean up their
+ * tasks that depend on the JobManager and go into a clean state.
+ *
+ * @param reason The reason for disconnecting, to be displayed in log and error messages.
*/
case class Disconnect(reason: String)
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
index 1a3479a..3051d00 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
@@ -21,40 +21,67 @@ package org.apache.flink.runtime.messages
import akka.actor.ActorRef
import org.apache.flink.runtime.instance.{InstanceConnectionInfo, InstanceID, HardwareDescription}
+import scala.concurrent.duration.{Deadline, FiniteDuration}
+
+/**
+ * A set of messages from the between TaskManager and JobManager handle the
+ * registration of the TaskManager at the JobManager.
+ */
object RegistrationMessages {
/**
+ * Marker trait for registration messages.
+ */
+ trait RegistrationMessage
+
+ /**
+ * Triggers the TaskManager to attempt a registration at the JobManager.
+ *
+ * @param jobManagerAkkaURL The actor URL of the JobManager.
+ * @param timeout The timeout for the message. The next retry will double this timeout.
+ * @param deadline Optional deadline until when the registration must be completed.
+ * @param attempt The attempt number, for logging.
+ */
+ case class TriggerTaskManagerRegistration(jobManagerAkkaURL: String,
+ timeout: FiniteDuration,
+ deadline: Option[Deadline],
+ attempt: Int)
+ extends RegistrationMessage
+
+ /**
* Registers a task manager at the job manager. A successful registration is acknowledged by
* [[AcknowledgeRegistration]].
*
- * @param connectionInfo
- * @param hardwareDescription
- * @param numberOfSlots
+ * @param taskManager The TaskManager actor.
+ * @param connectionInfo The TaskManagers connection information.
+ * @param resources The TaskManagers resources.
+ * @param numberOfSlots The number of processing slots offered by the TaskManager.
*/
- case class RegisterTaskManager(connectionInfo: InstanceConnectionInfo,
- hardwareDescription: HardwareDescription,
+ case class RegisterTaskManager(taskManager: ActorRef,
+ connectionInfo: InstanceConnectionInfo,
+ resources: HardwareDescription,
numberOfSlots: Int)
+ extends RegistrationMessage
/**
* Denotes the successful registration of a task manager at the job manager. This is the
* response triggered by the [[RegisterTaskManager]] message.
*
- * @param instanceID
- * @param blobPort
- * @param profilerListener
+ * @param instanceID The instance ID under which the TaskManager is registered at the
+ * JobManager.
+ * @param blobPort The server port where the JobManager's BLOB service runs.
*/
- case class AcknowledgeRegistration(instanceID: InstanceID, blobPort: Int,
- profilerListener: Option[ActorRef])
+ case class AcknowledgeRegistration(jobManager: ActorRef, instanceID: InstanceID, blobPort: Int)
+ extends RegistrationMessage
/**
* Denotes that the TaskManager has already been registered at the JobManager.
*
- * @param instanceID
- * @param blobPort
- * @param profilerListener
+ * @param instanceID The instance ID under which the TaskManager is registered.
+ * @param blobPort The server port where the JobManager's BLOB service runs.
*/
- case class AlreadyRegistered(instanceID: InstanceID, blobPort: Int,
- profilerListener: Option[ActorRef])
+ case class AlreadyRegistered(jobManager: ActorRef, instanceID: InstanceID, blobPort: Int)
+ extends RegistrationMessage
/**
* Denotes the unsuccessful registration of a task manager at the job manager. This is the
@@ -63,5 +90,5 @@ object RegistrationMessages {
* @param reason Reason why the task manager registration was refused
*/
case class RefuseRegistration(reason: String)
-
+ extends RegistrationMessage
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
index d27885b..c81830c 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
@@ -18,116 +18,67 @@
package org.apache.flink.runtime.messages
-import org.apache.flink.core.io.InputSplit
-import org.apache.flink.runtime.deployment.{InputChannelDeploymentDescriptor, TaskDeploymentDescriptor}
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
import org.apache.flink.runtime.instance.InstanceID
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID
+/**
+ * Miscellaneous actor messages exchanged with the TaskManager.
+ */
object TaskManagerMessages {
/**
- * Cancels the task associated with [[attemptID]]. The result is sent back to the sender as a
- * [[TaskOperationResult]] message.
- *
- * @param attemptID
- */
- case class CancelTask(attemptID: ExecutionAttemptID)
-
- /**
- * Submits a task to the task manager. The submission result is sent back to the sender as a
- * [[TaskOperationResult]] message.
- *
- * @param tasks task deployment descriptor which contains the task relevant information
+ * Tells the task manager to send a heartbeat message to the job manager.
*/
- case class SubmitTask(tasks: TaskDeploymentDescriptor)
+ case object SendHeartbeat {
- /**
- * Contains the next input split for a task. This message is a response to
- * [[org.apache.flink.runtime.messages.JobManagerMessages.RequestNextInputSplit]].
- *
- * @param splitData
- */
- case class NextInputSplit(splitData: Array[Byte])
+ /**
+ * Accessor for the case object instance, to simplify Java interoperability.
+ * @return The SendHeartbeat case object instance.
+ */
+ def get() : SendHeartbeat.type = SendHeartbeat
+ }
/**
- * Unregisters the task identified by [[executionID]] from the task manager.
+ * Reports liveliness of the TaskManager instance with the given instance ID to the
+ * This message is sent to the job. This message reports the TaskManagers
+ * metrics, as a byte array.
*
- * @param executionID
+ * @param instanceID The instance ID of the reporting TaskManager.
+ * @param metricsReport utf-8 encoded JSON metrics report from the metricRegistry.
*/
- case class UnregisterTask(executionID: ExecutionAttemptID)
+ case class Heartbeat(instanceID: InstanceID, metricsReport: Array[Byte])
- /**
- * Updates the reader of the task identified by
- * [[executionID]] from the task manager.
- */
- sealed trait UpdateTask{
- def executionID: ExecutionAttemptID
- }
- case class UpdateTaskSinglePartitionInfo(
- executionID: ExecutionAttemptID,
- resultId: IntermediateDataSetID,
- partitionInfo: InputChannelDeploymentDescriptor)
- extends UpdateTask
-
- case class UpdateTaskMultiplePartitionInfos(
- executionID: ExecutionAttemptID,
- partitionInfos: Seq[(IntermediateDataSetID, InputChannelDeploymentDescriptor)])
- extends UpdateTask
-
- def createUpdateTaskMultiplePartitionInfos(
- executionID: ExecutionAttemptID,
- resultIDs: java.util.List[IntermediateDataSetID],
- partitionInfos: java.util.List[InputChannelDeploymentDescriptor]):
- UpdateTaskMultiplePartitionInfos = {
- require(resultIDs.size() == partitionInfos.size(), "ResultIDs must have the same length as" +
- "partitionInfos.")
-
- import scala.collection.JavaConverters.asScalaBufferConverter
- new UpdateTaskMultiplePartitionInfos(executionID,
- resultIDs.asScala.zip(partitionInfos.asScala))
- }
+ // --------------------------------------------------------------------------
+ // Utility messages used for notifications during TaskManager startup
+ // --------------------------------------------------------------------------
/**
- * Fails all intermediate result partitions identified by [[executionID]] from the task manager.
- *
- * @param executionID
+ * Tells the TaskManager to send a stack trace of all threads to the sender.
+ * The response to this message is the [[StackTrace]] message.
*/
- case class FailIntermediateResultPartitions(executionID: ExecutionAttemptID)
+ case object SendStackTrace {
- /**
- * Reports whether a task manager operation has been successful or not. This message will be
- * sent to the sender as a response to [[SubmitTask]] and [[CancelTask]].
- *
- * @param executionID identifying the respective task
- * @param success indicating whether the operation has been successful
- * @param description
- */
- case class TaskOperationResult(executionID: ExecutionAttemptID, success: Boolean,
- description: String = ""){
- def this(executionID: ExecutionAttemptID, success: Boolean) = this(executionID, success, "")
+ /**
+ * Accessor for the case object instance, to simplify Java interoperability.
+ * @return The SendStackTrace case object instance.
+ */
+ def get() : SendStackTrace.type = SendStackTrace
}
/**
- * Reports liveliness of an instance with [[instanceID]] to the
- * [[org.apache.flink.runtime.instance.InstanceManager]]. This message is sent to the job
- * manager which forwards it to the InstanceManager.
- *
- * @param instanceID
- * @param metricsReport utf-8 encoded JSON report from the metricRegistry.
- */
- case class Heartbeat(instanceID: InstanceID, metricsReport: Array[Byte])
-
- /**
- * Sends StackTrace Message of an instance with [[instanceID]]. This message is a response to
- * [[org.apache.flink.runtime.messages.TaskManagerMessages.SendStackTrace]].
+ * Communicates the stack trace of the TaskManager with the given ID.
+ * This message is the response to [[SendStackTrace]].
*
- * @param instanceID
- * @param stackTrace
+ * @param instanceID The ID of the responding task manager.
+ * @param stackTrace The stack trace, as a string.
*/
case class StackTrace(instanceID: InstanceID, stackTrace: String)
+
+ // --------------------------------------------------------------------------
+ // Utility messages used for notifications during TaskManager startup
+ // --------------------------------------------------------------------------
+
/**
* Requests a notification from the task manager as soon as the task manager has been
* registered at the job manager. Once the task manager is registered at the job manager a
@@ -141,55 +92,23 @@ object TaskManagerMessages {
*/
case object RegisteredAtJobManager
- /**
- * Registers the sender as task manager at the job manager.
- */
- case object RegisterAtJobManager
-
- /**
- * Makes the task manager sending a heartbeat message to the job manager.
- */
- case object SendHeartbeat
- /**
- * Logs the current memory usage as debug level output.
- */
- case object LogMemoryUsage
+ // --------------------------------------------------------------------------
+ // Utility getters for case objects to simplify access from Java
+ // --------------------------------------------------------------------------
/**
- * Makes the task manager sending a stack trace message to the sender.
+ * Accessor for the case object instance, to simplify Java interoperability.
+ * @return The NotifyWhenRegisteredAtJobManager case object instance.
*/
- case object SendStackTrace
+ def getNotifyWhenRegisteredAtJobManagerMessage:
+ NotifyWhenRegisteredAtJobManager.type = NotifyWhenRegisteredAtJobManager
/**
- * Fail the specified task externally
- *
- * @param executionID identifying the task to fail
- * @param cause reason for the external failure
+ * Accessor for the case object instance, to simplify Java interoperability.
+ * @return The RegisteredAtJobManager case object instance.
*/
- case class FailTask(executionID: ExecutionAttemptID, cause: Throwable)
-
- // --------------------------------------------------------------------------
- // Utility methods to allow simpler case object access from Java
- // --------------------------------------------------------------------------
-
- def getNotifyWhenRegisteredAtJobManagerMessage : AnyRef = {
- NotifyWhenRegisteredAtJobManager
- }
-
- def getRegisteredAtJobManagerMessage : AnyRef = {
- RegisteredAtJobManager
- }
-
- def getRegisterAtJobManagerMessage : AnyRef = {
- RegisterAtJobManager
- }
+ def getRegisteredAtJobManagerMessage:
+ RegisteredAtJobManager.type = RegisteredAtJobManager
- def getSendHeartbeatMessage : AnyRef = {
- SendHeartbeat
- }
-
- def getLogMemoryUsageMessage : AnyRef = {
- RegisteredAtJobManager
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala
new file mode 100644
index 0000000..c8c5726
--- /dev/null
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages
+
+import org.apache.flink.runtime.deployment.{TaskDeploymentDescriptor, InputChannelDeploymentDescriptor}
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID
+import org.apache.flink.runtime.taskmanager.TaskExecutionState
+
+/**
+ * A set of messages that control the deployment and the state of Tasks executed
+ * on the TaskManager
+ */
+object TaskMessages {
+
+ /**
+ * Marker trait for task messages.
+ */
+ trait TaskMessage
+
+ // --------------------------------------------------------------------------
+ // Starting and stopping Tasks
+ // --------------------------------------------------------------------------
+
+ /**
+ * Submits a task to the task manager. The result is to this message is a
+ * [[TaskOperationResult]] message.
+ *
+ * @param tasks Descriptor which contains the information to start the task.
+ */
+ case class SubmitTask(tasks: TaskDeploymentDescriptor)
+ extends TaskMessage
+
+ /**
+ * Cancels the task associated with [[attemptID]]. The result is sent back to the sender as a
+ * [[TaskOperationResult]] message.
+ *
+ * @param attemptID The task's execution attempt ID.
+ */
+ case class CancelTask(attemptID: ExecutionAttemptID)
+ extends TaskMessage
+
+ /**
+ * Triggers a fail of specified task from the outside (as opposed to the task throwing
+ * an exception itself) with the given exception as the cause.
+ *
+ * @param executionID The task's execution attempt ID.
+ * @param cause The reason for the external failure.
+ */
+ case class FailTask(executionID: ExecutionAttemptID, cause: Throwable)
+ extends TaskMessage
+
+ /**
+ * Unregister the task identified by [[executionID]] from the TaskManager.
+ * Sent to the TaskManager by futures and callbacks.
+ *
+ * @param executionID The task's execution attempt ID.
+ */
+ case class UnregisterTask(executionID: ExecutionAttemptID)
+ extends TaskMessage
+
+
+ // --------------------------------------------------------------------------
+ // Updates to Intermediate Results
+ // --------------------------------------------------------------------------
+
+ /**
+ * Base class for messages that update the information about location of input partitions
+ */
+ abstract sealed class UpdatePartitionInfo extends TaskMessage {
+ def executionID: ExecutionAttemptID
+ }
+
+ /**
+ *
+ * @param executionID The task's execution attempt ID.
+ * @param resultId The input reader to update.
+ * @param partitionInfo The partition info update.
+ */
+ case class UpdateTaskSinglePartitionInfo(executionID: ExecutionAttemptID,
+ resultId: IntermediateDataSetID,
+ partitionInfo: InputChannelDeploymentDescriptor)
+ extends UpdatePartitionInfo
+
+ /**
+ *
+ * @param executionID The task's execution attempt ID.
+ * @param partitionInfos List of input gates with channel descriptors to update.
+ */
+ case class UpdateTaskMultiplePartitionInfos(
+ executionID: ExecutionAttemptID,
+ partitionInfos: Seq[(IntermediateDataSetID, InputChannelDeploymentDescriptor)])
+ extends UpdatePartitionInfo
+
+ /**
+ * Fails (and releases) all intermediate result partitions identified by
+ * [[executionID]] from the task manager.
+ *
+ * @param executionID The task's execution attempt ID.
+ */
+ case class FailIntermediateResultPartitions(executionID: ExecutionAttemptID)
+ extends TaskMessage
+
+
+ // --------------------------------------------------------------------------
+ // Report Messages
+ // --------------------------------------------------------------------------
+
+ /**
+ * Denotes a state change of a task at the JobManager. The update success is acknowledged by a
+ * boolean value which is sent back to the sender.
+ *
+ * @param taskExecutionState The changed task state
+ */
+ case class UpdateTaskExecutionState(taskExecutionState: TaskExecutionState)
+ extends TaskMessage
+
+ /**
+ * Response message to updates in the task state. Send for example as a response to
+ *
+ * - [[SubmitTask]]
+ * - [[CancelTask]]
+ *
+ * @param executionID identifying the respective task
+ * @param success indicating whether the operation has been successful
+ * @param description Optional description for unsuccessful results.
+ */
+ case class TaskOperationResult(executionID: ExecutionAttemptID,
+ success: Boolean,
+ description: String)
+ extends TaskMessage
+ {
+ def this(executionID: ExecutionAttemptID, success: Boolean) = this(executionID, success, "")
+ }
+
+
+ // --------------------------------------------------------------------------
+ // Utility Functions
+ // --------------------------------------------------------------------------
+
+ def createUpdateTaskMultiplePartitionInfos(
+ executionID: ExecutionAttemptID,
+ resultIDs: java.util.List[IntermediateDataSetID],
+ partitionInfos: java.util.List[InputChannelDeploymentDescriptor]):
+ UpdateTaskMultiplePartitionInfos = {
+
+ require(resultIDs.size() == partitionInfos.size(),
+ "ResultIDs must have the same length as partitionInfos.")
+
+ import scala.collection.JavaConverters.asScalaBufferConverter
+
+ new UpdateTaskMultiplePartitionInfos(executionID,
+ resultIDs.asScala.zip(partitionInfos.asScala))
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index cb79fbc..13e1ccd 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -100,8 +100,18 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem:
TaskManager.TASK_MANAGER_NAME
}
- TaskManager.startTaskManagerActor(config, system, HOSTNAME, taskManagerActorName,
- singleActorSystem, localExecution, classOf[TaskManager])
+ val jobManagerPath: Option[String] = if (singleActorSystem) {
+ Some(jobManagerActor.path.toString)
+ } else {
+ None
+ }
+
+ TaskManager.startTaskManagerComponentsAndActor(config, system,
+ HOSTNAME, // network interface to bind to
+ Some(taskManagerActorName), // actor name
+ jobManagerPath, // job manager akka URL
+ localExecution, // start network stack?
+ classOf[TaskManager])
}
def getJobClient(): ActorRef = {