You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/08/24 21:12:54 UTC
[5/6] flink git commit: [FLINK-4457] Make ExecutionGraph independent
of actors.
[FLINK-4457] Make ExecutionGraph independent of actors.
This introduced types JobStatusListener and ExecutionStatusListener interfaces
that replace the ActorRefs and ActorGateway for listeners
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/635c8693
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/635c8693
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/635c8693
Branch: refs/heads/master
Commit: 635c869326cc77e4199e4d8ee597aed69ed16cd2
Parents: 4e9d177
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Aug 24 19:12:07 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Aug 24 21:19:04 2016 +0200
----------------------------------------------------------------------
.../checkpoint/CheckpointCoordinator.java | 34 +++-----
.../CheckpointCoordinatorDeActivator.java | 45 ++++-------
.../runtime/executiongraph/ExecutionGraph.java | 81 ++++++++++----------
.../executiongraph/ExecutionStatusListener.java | 54 +++++++++++++
.../executiongraph/JobStatusListener.java | 39 ++++++++++
.../executiongraph/StatusListenerMessenger.java | 70 +++++++++++++++++
.../flink/runtime/jobmanager/JobManager.scala | 12 ++-
...ExecutionGraphCheckpointCoordinatorTest.java | 3 -
.../LeaderChangeJobRecoveryTest.java | 73 ++++--------------
.../flink/core/testutils/OneShotLatch.java | 55 +++++++++++++
.../flink/core/testutils/OneShotLatchTest.java | 55 +++++++++++++
11 files changed, 360 insertions(+), 161 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/635c8693/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index b710324..3619f48 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -18,9 +18,6 @@
package org.apache.flink.runtime.checkpoint;
-import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
import akka.dispatch.Futures;
import org.apache.flink.api.common.JobID;
@@ -31,8 +28,7 @@ import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
@@ -57,7 +53,6 @@ import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
-import java.util.UUID;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -137,7 +132,7 @@ public class CheckpointCoordinator {
private final Timer timer;
/** Actor that receives status updates from the execution graph this coordinator works for */
- private ActorGateway jobStatusListener;
+ private JobStatusListener jobStatusListener;
/** The number of consecutive failed trigger attempts */
private int numUnsuccessfulCheckpointsTriggers;
@@ -266,12 +261,6 @@ public class CheckpointCoordinator {
// shut down the thread that handles the timeouts and pending triggers
timer.cancel();
- // make sure that the actor does not linger
- if (jobStatusListener != null) {
- jobStatusListener.tell(PoisonPill.getInstance());
- jobStatusListener = null;
- }
-
// clear and discard all pending checkpoints
for (PendingCheckpoint pending : pendingCheckpoints.values()) {
pending.abortError(new Exception("Checkpoint Coordinator is shutting down"));
@@ -903,7 +892,7 @@ public class CheckpointCoordinator {
// Periodic scheduling of checkpoints
// --------------------------------------------------------------------------------------------
- public void startCheckpointScheduler() throws Exception {
+ public void startCheckpointScheduler() {
synchronized (lock) {
if (shutdown) {
throw new IllegalArgumentException("Checkpoint coordinator is shut down");
@@ -918,7 +907,7 @@ public class CheckpointCoordinator {
}
}
- public void stopCheckpointScheduler() throws Exception {
+ public void stopCheckpointScheduler() {
synchronized (lock) {
triggerRequestQueued = false;
periodicScheduling = false;
@@ -929,10 +918,14 @@ public class CheckpointCoordinator {
}
for (PendingCheckpoint p : pendingCheckpoints.values()) {
- p.abortError(new Exception("Checkpoint Coordinator is suspending."));
+ try {
+ p.abortError(new Exception("Checkpoint Coordinator is suspending."));
+ } catch (Throwable t) {
+ LOG.error("Error while disposing pending checkpoint", t);
+ }
}
- pendingCheckpoints.clear();
+ pendingCheckpoints.clear();
numUnsuccessfulCheckpointsTriggers = 0;
}
}
@@ -941,17 +934,14 @@ public class CheckpointCoordinator {
// job status listener that schedules / cancels periodic checkpoints
// ------------------------------------------------------------------------
- public ActorGateway createActivatorDeactivator(ActorSystem actorSystem, UUID leaderSessionID) {
+ public JobStatusListener createActivatorDeactivator() {
synchronized (lock) {
if (shutdown) {
throw new IllegalArgumentException("Checkpoint coordinator is shut down");
}
if (jobStatusListener == null) {
- Props props = Props.create(CheckpointCoordinatorDeActivator.class, this, leaderSessionID);
-
- // wrap the ActorRef in a AkkaActorGateway to support message decoration
- jobStatusListener = new AkkaActorGateway(actorSystem.actorOf(props), leaderSessionID);
+ jobStatusListener = new CheckpointCoordinatorDeActivator(this);
}
return jobStatusListener;
http://git-wip-us.apache.org/repos/asf/flink/blob/635c8693/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
index 7e26f71..2e23d6a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
@@ -18,51 +18,32 @@
package org.apache.flink.runtime.checkpoint;
-import org.apache.flink.runtime.akka.FlinkUntypedActor;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.messages.ExecutionGraphMessages;
-import org.apache.flink.util.Preconditions;
-import java.util.UUID;
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* This actor listens to changes in the JobStatus and activates or deactivates the periodic
* checkpoint scheduler.
*/
-public class CheckpointCoordinatorDeActivator extends FlinkUntypedActor {
+public class CheckpointCoordinatorDeActivator implements JobStatusListener {
private final CheckpointCoordinator coordinator;
- private final UUID leaderSessionID;
-
- public CheckpointCoordinatorDeActivator(
- CheckpointCoordinator coordinator,
- UUID leaderSessionID) {
- LOG.info("Create CheckpointCoordinatorDeActivator");
-
- this.coordinator = Preconditions.checkNotNull(coordinator, "The checkpointCoordinator must not be null.");
- this.leaderSessionID = leaderSessionID;
+ public CheckpointCoordinatorDeActivator(CheckpointCoordinator coordinator) {
+ this.coordinator = checkNotNull(coordinator);
}
@Override
- public void handleMessage(Object message) throws Exception {
- if (message instanceof ExecutionGraphMessages.JobStatusChanged) {
- JobStatus status = ((ExecutionGraphMessages.JobStatusChanged) message).newJobStatus();
-
- if (status == JobStatus.RUNNING) {
- // start the checkpoint scheduler
- coordinator.startCheckpointScheduler();
- } else {
- // anything else should stop the trigger for now
- coordinator.stopCheckpointScheduler();
- }
+ public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
+ if (newJobStatus == JobStatus.RUNNING) {
+ // start the checkpoint scheduler
+ coordinator.startCheckpointScheduler();
+ } else {
+ // anything else should stop the trigger for now
+ coordinator.stopCheckpointScheduler();
}
-
- // we ignore all other messages
- }
-
- @Override
- public UUID getLeaderSessionID() {
- return leaderSessionID;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/635c8693/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 12d8e66..7a94c0f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.executiongraph;
-import akka.actor.ActorSystem;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
@@ -41,7 +40,6 @@ import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
-import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -50,15 +48,16 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.messages.ExecutionGraphMessages;
import org.apache.flink.runtime.query.KvStateLocationRegistry;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.util.SerializableObject;
import org.apache.flink.runtime.util.SerializedThrowable;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.FiniteDuration;
@@ -75,12 +74,12 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
-import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* The execution graph is the central data structure that coordinates the distributed
* execution of a data flow. It keeps representations of each parallel task, each
@@ -151,12 +150,12 @@ public class ExecutionGraph {
* accessible on all nodes in the cluster. */
private final List<URL> requiredClasspaths;
- /** Listeners that receive messages when the entire job switches it status (such as from
- * RUNNING to FINISHED) */
- private final List<ActorGateway> jobStatusListenerActors;
+ /** Listeners that receive messages when the entire job switches it status
+ * (such as from RUNNING to FINISHED) */
+ private final List<JobStatusListener> jobStatusListeners;
/** Listeners that receive messages whenever a single task execution changes its status */
- private final List<ActorGateway> executionListenerActors;
+ private final List<ExecutionStatusListener> executionListeners;
/** Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()} when
* the execution graph transitioned into a certain state. The index into this array is the
@@ -284,8 +283,8 @@ public class ExecutionGraph {
this.verticesInCreationOrder = new ArrayList<ExecutionJobVertex>();
this.currentExecutions = new ConcurrentHashMap<ExecutionAttemptID, Execution>();
- this.jobStatusListenerActors = new CopyOnWriteArrayList<ActorGateway>();
- this.executionListenerActors = new CopyOnWriteArrayList<ActorGateway>();
+ this.jobStatusListeners = new CopyOnWriteArrayList<>();
+ this.executionListeners = new CopyOnWriteArrayList<>();
this.stateTimestamps = new long[JobStatus.values().length];
this.stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis();
@@ -345,8 +344,6 @@ public class ExecutionGraph {
List<ExecutionJobVertex> verticesToTrigger,
List<ExecutionJobVertex> verticesToWaitFor,
List<ExecutionJobVertex> verticesToCommitTo,
- ActorSystem actorSystem,
- UUID leaderSessionID,
CheckpointIDCounter checkpointIDCounter,
CompletedCheckpointStore checkpointStore,
SavepointStore savepointStore,
@@ -388,8 +385,7 @@ public class ExecutionGraph {
// the periodic checkpoint scheduler is activated and deactivated as a result of
// job status changes (running -> on, all other states -> off)
- registerJobStatusListener(
- checkpointCoordinator.createActivatorDeactivator(actorSystem, leaderSessionID));
+ registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator());
}
/**
@@ -935,8 +931,8 @@ public class ExecutionGraph {
intermediateResults.clear();
currentExecutions.clear();
requiredJarFiles.clear();
- jobStatusListenerActors.clear();
- executionListenerActors.clear();
+ jobStatusListeners.clear();
+ executionListeners.clear();
isArchived = true;
}
@@ -1173,45 +1169,52 @@ public class ExecutionGraph {
// Listeners & Observers
// --------------------------------------------------------------------------------------------
- public void registerJobStatusListener(ActorGateway listener) {
+ public void registerJobStatusListener(JobStatusListener listener) {
if (listener != null) {
- this.jobStatusListenerActors.add(listener);
+ jobStatusListeners.add(listener);
}
}
- public void registerExecutionListener(ActorGateway listener) {
+ public void registerExecutionListener(ExecutionStatusListener listener) {
if (listener != null) {
- this.executionListenerActors.add(listener);
+ executionListeners.add(listener);
}
}
private void notifyJobStatusChange(JobStatus newState, Throwable error) {
- if (jobStatusListenerActors.size() > 0) {
- ExecutionGraphMessages.JobStatusChanged message =
- new ExecutionGraphMessages.JobStatusChanged(jobID, newState, System.currentTimeMillis(),
- error == null ? null : new SerializedThrowable(error));
-
- for (ActorGateway listener: jobStatusListenerActors) {
- listener.tell(message);
+ if (jobStatusListeners.size() > 0) {
+ final long timestamp = System.currentTimeMillis();
+ final Throwable serializedError = error == null ? null : new SerializedThrowable(error);
+
+ for (JobStatusListener listener : jobStatusListeners) {
+ try {
+ listener.jobStatusChanges(jobID, newState, timestamp, serializedError);
+ } catch (Throwable t) {
+ LOG.warn("Error while notifying JobStatusListener", t);
+ }
}
}
}
- void notifyExecutionChange(JobVertexID vertexId, int subtask, ExecutionAttemptID executionID, ExecutionState
- newExecutionState, Throwable error)
+ void notifyExecutionChange(
+ JobVertexID vertexId, int subtask, ExecutionAttemptID executionID,
+ ExecutionState newExecutionState, Throwable error)
{
ExecutionJobVertex vertex = getJobVertex(vertexId);
- if (executionListenerActors.size() > 0) {
- String message = error == null ? null : ExceptionUtils.stringifyException(error);
- ExecutionGraphMessages.ExecutionStateChanged actorMessage =
- new ExecutionGraphMessages.ExecutionStateChanged(jobID, vertexId, vertex.getJobVertex().getName(),
- vertex.getParallelism(), subtask,
- executionID, newExecutionState,
- System.currentTimeMillis(), message);
-
- for (ActorGateway listener : executionListenerActors) {
- listener.tell(actorMessage);
+ if (executionListeners.size() > 0) {
+ final String message = error == null ? null : ExceptionUtils.stringifyException(error);
+ final long timestamp = System.currentTimeMillis();
+
+ for (ExecutionStatusListener listener : executionListeners) {
+ try {
+ listener.executionStatusChanged(
+ jobID, vertexId, vertex.getJobVertex().getName(),
+ vertex.getParallelism(), subtask, executionID, newExecutionState,
+ timestamp, message);
+ } catch (Throwable t) {
+ LOG.warn("Error while notifying ExecutionStatusListener", t);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/635c8693/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionStatusListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionStatusListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionStatusListener.java
new file mode 100644
index 0000000..6fb5a1a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionStatusListener.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+/**
+ * Interface for observers that monitor the status of individual task executions.
+ */
+public interface ExecutionStatusListener {
+
+ /**
+ * Called whenever the execution status of a task changes.
+ *
+ * @param jobID The ID of the job
+ * @param vertexID The ID of the task vertex
+ * @param taskName The name of the task
+ * @param totalNumberOfSubTasks The parallelism of the task
+ * @param subtaskIndex The subtask's parallel index
+ * @param executionID The ID of the execution attempt
+ * @param newExecutionState The status to which the task switched
+ * @param timestamp The timestamp when the change occurred. Informational only.
+ * @param optionalMessage An optional message attached to the status change, like an
+ * exception message.
+ */
+ void executionStatusChanged(
+ JobID jobID,
+ JobVertexID vertexID,
+ String taskName,
+ int totalNumberOfSubTasks,
+ int subtaskIndex,
+ ExecutionAttemptID executionID,
+ ExecutionState newExecutionState,
+ long timestamp,
+ String optionalMessage);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/635c8693/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java
new file mode 100644
index 0000000..1d97a5c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+
+/**
+ * Interface for observers that monitor the status of a job.
+ */
+public interface JobStatusListener {
+
+ /**
+ * This method is called whenever the status of the job changes.
+ *
+ * @param jobId The ID of the job.
+ * @param newJobStatus The status the job switched to.
+ * @param timestamp The timestamp when the status transition occurred.
+ * @param error In case the job status switches to a failure state, this is the
+ * exception that caused the failure.
+ */
+ void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/635c8693/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/StatusListenerMessenger.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/StatusListenerMessenger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/StatusListenerMessenger.java
new file mode 100644
index 0000000..01f1e75
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/StatusListenerMessenger.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import akka.actor.ActorRef;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.messages.ExecutionGraphMessages;
+import org.apache.flink.runtime.util.SerializedThrowable;
+
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@code JobStatusListener} and {@code ExecutionStatusListener} that sends an actor message
+ * for each status change.
+ */
+public class StatusListenerMessenger implements JobStatusListener, ExecutionStatusListener {
+
+ private final AkkaActorGateway target;
+
+ public StatusListenerMessenger(ActorRef target, UUID leaderSessionId) {
+ this.target = new AkkaActorGateway(checkNotNull(target), leaderSessionId);
+ }
+
+ @Override
+ public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
+ ExecutionGraphMessages.JobStatusChanged message =
+ new ExecutionGraphMessages.JobStatusChanged(jobId, newJobStatus, timestamp,
+ error == null ? null : new SerializedThrowable(error));
+
+ target.tell(message);
+ }
+
+ @Override
+ public void executionStatusChanged(
+ JobID jobID, JobVertexID vertexID,
+ String taskName, int taskParallelism, int subtaskIndex,
+ ExecutionAttemptID executionID, ExecutionState newExecutionState,
+ long timestamp, String optionalMessage) {
+
+ ExecutionGraphMessages.ExecutionStateChanged message =
+ new ExecutionGraphMessages.ExecutionStateChanged(
+ jobID, vertexID, taskName, taskParallelism, subtaskIndex,
+ executionID, newExecutionState, timestamp, optionalMessage);
+
+ target.tell(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/635c8693/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 34fed3f..0587987 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -51,7 +51,7 @@ import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceMa
import org.apache.flink.runtime.clusterframework.types.ResourceID
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
-import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionJobVertex}
+import org.apache.flink.runtime.executiongraph.{StatusListenerMessenger, ExecutionGraph, ExecutionJobVertex}
import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceManager}
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator
import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID}
@@ -1249,8 +1249,6 @@ class JobManager(
triggerVertices,
ackVertices,
confirmVertices,
- context.system,
- leaderSessionID.orNull,
checkpointIdCounter,
completedCheckpoints,
savepointStore,
@@ -1259,14 +1257,14 @@ class JobManager(
// get notified about job status changes
executionGraph.registerJobStatusListener(
- new AkkaActorGateway(self, leaderSessionID.orNull))
+ new StatusListenerMessenger(self, leaderSessionID.orNull))
if (jobInfo.listeningBehaviour == ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES) {
// the sender wants to be notified about state changes
- val gateway = new AkkaActorGateway(jobInfo.client, leaderSessionID.orNull)
+ val listener = new StatusListenerMessenger(jobInfo.client, leaderSessionID.orNull)
- executionGraph.registerExecutionListener(gateway)
- executionGraph.registerJobStatusListener(gateway)
+ executionGraph.registerExecutionListener(listener)
+ executionGraph.registerJobStatusListener(listener)
}
} catch {
case t: Throwable =>
http://git-wip-us.apache.org/repos/asf/flink/blob/635c8693/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 7b05fd7..49a9449 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -40,7 +40,6 @@ import scala.concurrent.duration.FiniteDuration;
import java.net.URL;
import java.util.Collections;
-import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.mockito.Mockito.mock;
@@ -117,8 +116,6 @@ public class ExecutionGraphCheckpointCoordinatorTest {
Collections.<ExecutionJobVertex>emptyList(),
Collections.<ExecutionJobVertex>emptyList(),
Collections.<ExecutionJobVertex>emptyList(),
- system,
- UUID.randomUUID(),
counter,
store,
new HeapSavepointStore(),
http://git-wip-us.apache.org/repos/asf/flink/blob/635c8693/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
index 57de2cd..450f9fb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
@@ -18,28 +18,28 @@
package org.apache.flink.runtime.leaderelection;
-import akka.actor.ActorRef;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.messages.ExecutionGraphMessages;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.util.TestLogger;
+
import org.junit.Before;
import org.junit.Test;
+
import scala.concurrent.Await;
-import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
-import scala.concurrent.Promise;
import scala.concurrent.duration.FiniteDuration;
import java.util.UUID;
@@ -113,15 +113,12 @@ public class LeaderChangeJobRecoveryTest extends TestLogger {
ExecutionGraph executionGraph = ((TestingJobManagerMessages.ExecutionGraphFound) responseExecutionGraph).executionGraph();
- TestActorGateway testActorGateway = new TestActorGateway();
-
- executionGraph.registerJobStatusListener(testActorGateway);
+ TestJobStatusListener testListener = new TestJobStatusListener();
+ executionGraph.registerJobStatusListener(testListener);
cluster.revokeLeadership();
- Future<Boolean> hasReachedTerminalState = testActorGateway.hasReachedTerminalState();
-
- assertTrue("The job should have reached a terminal state.", Await.result(hasReachedTerminalState, timeout));
+ testListener.waitForTerminalState(30000);
}
public JobGraph createBlockingJob(int parallelism) {
@@ -150,59 +147,19 @@ public class LeaderChangeJobRecoveryTest extends TestLogger {
return jobGraph;
}
- public static class TestActorGateway implements ActorGateway {
-
- private static final long serialVersionUID = -736146686160538227L;
- private transient Promise<Boolean> terminalState = new scala.concurrent.impl.Promise.DefaultPromise<>();
-
- public Future<Boolean> hasReachedTerminalState() {
- return terminalState.future();
- }
+ public static class TestJobStatusListener implements JobStatusListener {
- @Override
- public Future<Object> ask(Object message, FiniteDuration timeout) {
- return null;
- }
+ private final OneShotLatch terminalStateLatch = new OneShotLatch();
- @Override
- public void tell(Object message) {
- this.tell(message, new AkkaActorGateway(ActorRef.noSender(), null));
+ public void waitForTerminalState(long timeoutMillis) throws InterruptedException, TimeoutException {
+ terminalStateLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
}
@Override
- public void tell(Object message, ActorGateway sender) {
- if (message instanceof ExecutionGraphMessages.JobStatusChanged) {
- ExecutionGraphMessages.JobStatusChanged jobStatusChanged = (ExecutionGraphMessages.JobStatusChanged) message;
-
- if (jobStatusChanged.newJobStatus().isGloballyTerminalState() || jobStatusChanged.newJobStatus() == JobStatus.SUSPENDED) {
- terminalState.success(true);
- }
+ public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
+ if (newJobStatus.isGloballyTerminalState() || newJobStatus == JobStatus.SUSPENDED) {
+ terminalStateLatch.trigger();
}
}
-
- @Override
- public void forward(Object message, ActorGateway sender) {
-
- }
-
- @Override
- public Future<Object> retry(Object message, int numberRetries, FiniteDuration timeout, ExecutionContext executionContext) {
- return null;
- }
-
- @Override
- public String path() {
- return null;
- }
-
- @Override
- public ActorRef actor() {
- return null;
- }
-
- @Override
- public UUID leaderSessionID() {
- return null;
- }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/635c8693/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
index 54ac110..0418bf5 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
@@ -18,6 +18,9 @@
package org.apache.flink.core.testutils;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
/**
* Latch for synchronizing parts of code in tests. Once the latch has fired once calls to
* {@link #await()} will return immediately in the future.
@@ -44,6 +47,8 @@ public final class OneShotLatch {
/**
* Waits until {@link #trigger())} is called. Once {@code #trigger()} has been called this
* call will always return immediately.
+ *
+ * @throws InterruptedException Thrown if the thread is interrupted while waiting.
*/
public void await() throws InterruptedException {
synchronized (lock) {
@@ -52,4 +57,54 @@ public final class OneShotLatch {
}
}
}
+
+ /**
+ * Waits until {@link #trigger())} is called. Once {@code #trigger()} has been called this
+ * call will always return immediately.
+ *
+ * <p>If the latch is not triggered within the given timeout, a {@code TimeoutException}
+ * will be thrown after the timeout.
+ *
+ * <p>A timeout value of zero means infinite timeout and make this equivalent to {@link #await()}.
+ *
+ * @param timeout The value of the timeout, a value of zero indicating infinite timeout.
+ * @param timeUnit The unit of the timeout
+ *
+ * @throws InterruptedException Thrown if the thread is interrupted while waiting.
+ * @throws TimeoutException Thrown, if the latch is not triggered within the timeout time.
+ */
+ public void await(long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException {
+ if (timeout < 0) {
+ throw new IllegalArgumentException("time may not be negative");
+ }
+ if (timeUnit == null) {
+ throw new NullPointerException("timeUnit");
+ }
+
+ if (timeout == 0) {
+ await();
+ } else {
+ final long deadline = System.nanoTime() + timeUnit.toNanos(timeout);
+ long millisToWait;
+
+ synchronized (lock) {
+ while (!triggered && (millisToWait = (deadline - System.nanoTime()) / 1_000_000) > 0) {
+ lock.wait(millisToWait);
+ }
+
+ if (!triggered) {
+ throw new TimeoutException();
+ }
+ }
+ }
+ }
+
+ /**
+ * Checks if the latch was triggered.
+ *
+ * @return True, if the latch was triggered, false if not.
+ */
+ public boolean isTriggered() {
+ return triggered;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/635c8693/flink-test-utils-parent/flink-test-utils-junit/src/test/java/org/apache/flink/core/testutils/OneShotLatchTest.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/test/java/org/apache/flink/core/testutils/OneShotLatchTest.java b/flink-test-utils-parent/flink-test-utils-junit/src/test/java/org/apache/flink/core/testutils/OneShotLatchTest.java
new file mode 100644
index 0000000..575c84c
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/test/java/org/apache/flink/core/testutils/OneShotLatchTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.testutils;
+
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class OneShotLatchTest {
+
+ @Test
+ public void testAwaitWithTimeout() throws Exception {
+ OneShotLatch latch = new OneShotLatch();
+ assertFalse(latch.isTriggered());
+
+ try {
+ latch.await(1, TimeUnit.MILLISECONDS);
+ fail("should fail with a TimeoutException");
+ } catch (TimeoutException e) {
+ // expected
+ }
+
+ assertFalse(latch.isTriggered());
+
+ latch.trigger();
+ assertTrue(latch.isTriggered());
+
+ latch.await(100, TimeUnit.DAYS);
+ assertTrue(latch.isTriggered());
+
+ latch.await(0, TimeUnit.MILLISECONDS);
+ assertTrue(latch.isTriggered());
+ }
+}