You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/06/25 19:21:40 UTC
[07/12] flink git commit: [streaming] Add stateHandle to checkpointed
message
[streaming] Add stateHandle to checkpointed message
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f27c3f1d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f27c3f1d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f27c3f1d
Branch: refs/heads/master
Commit: f27c3f1d1cbff7433a23227e75f2f8e30058397a
Parents: ef11e63
Author: Paris Carbone <se...@gmail.com>
Authored: Thu May 28 15:07:49 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Thu Jun 25 16:38:06 2015 +0200
----------------------------------------------------------------------
.../runtime/checkpoint/CheckpointCoordinator.java | 9 ++++++---
.../runtime/checkpoint/SuccessfulCheckpoint.java | 16 +++++++++++++++-
.../tasks/CheckpointCommittingOperator.java | 5 ++++-
.../messages/checkpoint/ConfirmCheckpoint.java | 13 ++++++++++++-
.../org/apache/flink/runtime/taskmanager/Task.java | 5 +++--
.../flink/runtime/taskmanager/TaskManager.scala | 3 ++-
.../checkpoint/CheckpointCoordinatorTest.java | 14 +++++++-------
.../runtime/messages/CheckpointMessagesTest.java | 2 +-
.../api/operators/AbstractUdfStreamOperator.java | 2 +-
.../api/operators/StatefulStreamOperator.java | 2 +-
.../flink/streaming/runtime/tasks/StreamTask.java | 10 +++++++---
11 files changed, 59 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f27c3f1d/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 424a9ed..8a31bee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -315,7 +315,7 @@ public class CheckpointCoordinator {
final long checkpointId = message.getCheckpointId();
SuccessfulCheckpoint completed = null;
-
+ PendingCheckpoint checkpoint;
synchronized (lock) {
// we need to check inside the lock for being shutdown as well, otherwise we
// get races and invalid error log messages
@@ -323,7 +323,8 @@ public class CheckpointCoordinator {
return;
}
- PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);
+ checkpoint = pendingCheckpoints.get(checkpointId);
+
if (checkpoint != null && !checkpoint.isDiscarded()) {
if (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getState())) {
@@ -367,11 +368,13 @@ public class CheckpointCoordinator {
// to be outside the lock scope
if (completed != null) {
final long timestamp = completed.getTimestamp();
+
for (ExecutionVertex ev : tasksToCommitTo) {
Execution ee = ev.getCurrentExecutionAttempt();
if (ee != null) {
ExecutionAttemptID attemptId = ee.getAttemptId();
- ConfirmCheckpoint confirmMessage = new ConfirmCheckpoint(job, attemptId, checkpointId, timestamp);
+ ConfirmCheckpoint confirmMessage = new ConfirmCheckpoint(job, attemptId, checkpointId,
+ timestamp, completed.getState(ev.getJobvertexId()).getState() );
ev.sendMessageToCurrentExecution(confirmMessage, ee.getAttemptId());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f27c3f1d/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
index be0b301..85a3a79 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
@@ -18,11 +18,14 @@
package org.apache.flink.runtime.checkpoint;
+import com.google.common.collect.Maps;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
+import java.util.Map;
/**
* A successful checkpoint describes a checkpoint after all required tasks acknowledged it (with their state)
@@ -38,7 +41,9 @@ public class SuccessfulCheckpoint {
private final long timestamp;
- private final List<StateForTask> states;
+ private final Map<JobVertexID, StateForTask> vertexToState;
+
+ private final List<StateForTask> states;
public SuccessfulCheckpoint(JobID job, long checkpointID, long timestamp, List<StateForTask> states) {
@@ -46,6 +51,10 @@ public class SuccessfulCheckpoint {
this.checkpointID = checkpointID;
this.timestamp = timestamp;
this.states = states;
+ vertexToState = Maps.newHashMap();
+ for(StateForTask state : states){
+ vertexToState.put(state.getOperatorId(), state);
+ }
}
public JobID getJobId() {
@@ -63,6 +72,11 @@ public class SuccessfulCheckpoint {
public List<StateForTask> getStates() {
return states;
}
+
+ public StateForTask getState(JobVertexID jobVertexID)
+ {
+ return vertexToState.get(jobVertexID);
+ }
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f27c3f1d/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java
index be203d2..a6f9851 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java
@@ -18,7 +18,10 @@
package org.apache.flink.runtime.jobgraph.tasks;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.util.SerializedValue;
+
public interface CheckpointCommittingOperator {
- void confirmCheckpoint(long checkpointId, long timestamp) throws Exception;
+ void confirmCheckpoint(long checkpointId, SerializedValue<StateHandle<?>> state) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f27c3f1d/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java
index d3a4374..43b5d4c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.messages.checkpoint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.util.SerializedValue;
/**
* This message is sent from the {@link org.apache.flink.runtime.jobmanager.JobManager} to the
@@ -32,10 +34,15 @@ public class ConfirmCheckpoint extends AbstractCheckpointMessage implements java
/** The timestamp associated with the checkpoint */
private final long timestamp;
+
+ /** The stateHandle associated with the checkpoint confirmation message*/
+ private final SerializedValue<StateHandle<?>> state;
- public ConfirmCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, long timestamp) {
+ public ConfirmCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, long timestamp,
+ SerializedValue<StateHandle<?>> state) {
super(job, taskExecutionId, checkpointId);
this.timestamp = timestamp;
+ this.state = state;
}
// --------------------------------------------------------------------------------------------
@@ -46,6 +53,10 @@ public class ConfirmCheckpoint extends AbstractCheckpointMessage implements java
// --------------------------------------------------------------------------------------------
+ public SerializedValue<StateHandle<?>> getState() {
+ return state;
+ }
+
@Override
public int hashCode() {
return super.hashCode() + (int) (timestamp ^ (timestamp >>> 32));
http://git-wip-us.apache.org/repos/asf/flink/blob/f27c3f1d/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 6250837..25ad28d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -882,7 +882,8 @@ public class Task implements Runnable {
}
}
- public void confirmCheckpoint(final long checkpointID, final long checkpointTimestamp) {
+ public void confirmCheckpoint(final long checkpointID,
+ final SerializedValue<StateHandle<?>> state) {
AbstractInvokable invokable = this.invokable;
if (executionState == ExecutionState.RUNNING && invokable != null) {
@@ -897,7 +898,7 @@ public class Task implements Runnable {
@Override
public void run() {
try {
- checkpointer.confirmCheckpoint(checkpointID, checkpointTimestamp);
+ checkpointer.confirmCheckpoint(checkpointID, state);
}
catch (Throwable t) {
logger.error("Error while confirming checkpoint for " + taskName, t);
http://git-wip-us.apache.org/repos/asf/flink/blob/f27c3f1d/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 0b76bda..612d5c0 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -429,12 +429,13 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
val taskExecutionId = message.getTaskExecutionId
val checkpointId = message.getCheckpointId
val timestamp = message.getTimestamp
+ val state = message.getState
log.debug(s"Receiver ConfirmCheckpoint ${checkpointId}@${timestamp} for $taskExecutionId.")
val task = runningTasks.get(taskExecutionId)
if (task != null) {
- task.confirmCheckpoint(checkpointId, timestamp)
+ task.confirmCheckpoint(checkpointId, state)
} else {
log.debug(
s"Taskmanager received a checkpoint confirmation for unknown task $taskExecutionId.")
http://git-wip-us.apache.org/repos/asf/flink/blob/f27c3f1d/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 514b7c7..d56704d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -199,8 +199,8 @@ public class CheckpointCoordinatorTest {
// validate that the relevant tasks got a confirmation message
{
- ConfirmCheckpoint confirmMessage1 = new ConfirmCheckpoint(jid, attemptID1, checkpointId, timestamp);
- ConfirmCheckpoint confirmMessage2 = new ConfirmCheckpoint(jid, attemptID2, checkpointId, timestamp);
+ ConfirmCheckpoint confirmMessage1 = new ConfirmCheckpoint(jid, attemptID1, checkpointId, timestamp, null);
+ ConfirmCheckpoint confirmMessage2 = new ConfirmCheckpoint(jid, attemptID2, checkpointId, timestamp, null);
verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(confirmMessage1), eq(attemptID1));
verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(confirmMessage2), eq(attemptID2));
}
@@ -237,8 +237,8 @@ public class CheckpointCoordinatorTest {
verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(expectedMessage1), eq(attemptID1));
verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(expectedMessage2), eq(attemptID2));
- ConfirmCheckpoint confirmMessage1 = new ConfirmCheckpoint(jid, attemptID1, checkpointIdNew, timestampNew);
- ConfirmCheckpoint confirmMessage2 = new ConfirmCheckpoint(jid, attemptID2, checkpointIdNew, timestampNew);
+ ConfirmCheckpoint confirmMessage1 = new ConfirmCheckpoint(jid, attemptID1, checkpointIdNew, timestampNew, null);
+ ConfirmCheckpoint confirmMessage2 = new ConfirmCheckpoint(jid, attemptID2, checkpointIdNew, timestampNew, null);
verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(confirmMessage1), eq(attemptID1));
verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(confirmMessage2), eq(attemptID2));
}
@@ -343,7 +343,7 @@ public class CheckpointCoordinatorTest {
// the first confirm message should be out
verify(commitVertex, times(1)).sendMessageToCurrentExecution(
- new ConfirmCheckpoint(jid, commitAttemptID, checkpointId1, timestamp1), commitAttemptID);
+ new ConfirmCheckpoint(jid, commitAttemptID, checkpointId1, timestamp1, null), commitAttemptID);
// send the last remaining ack for the second checkpoint
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2));
@@ -355,7 +355,7 @@ public class CheckpointCoordinatorTest {
// the second commit message should be out
verify(commitVertex, times(1)).sendMessageToCurrentExecution(
- new ConfirmCheckpoint(jid, commitAttemptID, checkpointId2, timestamp2), commitAttemptID);
+ new ConfirmCheckpoint(jid, commitAttemptID, checkpointId2, timestamp2, null), commitAttemptID);
// validate the committed checkpoints
List<SuccessfulCheckpoint> scs = coord.getSuccessfulCheckpoints();
@@ -482,7 +482,7 @@ public class CheckpointCoordinatorTest {
// the first confirm message should be out
verify(commitVertex, times(1)).sendMessageToCurrentExecution(
- new ConfirmCheckpoint(jid, commitAttemptID, checkpointId2, timestamp2), commitAttemptID);
+ new ConfirmCheckpoint(jid, commitAttemptID, checkpointId2, timestamp2, null), commitAttemptID);
// send the last remaining ack for the first checkpoint. This should not do anything
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1));
http://git-wip-us.apache.org/repos/asf/flink/blob/f27c3f1d/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
index a204c3d..05255ac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
@@ -38,7 +38,7 @@ public class CheckpointMessagesTest {
@Test
public void testTriggerAndConfirmCheckpoint() {
try {
- ConfirmCheckpoint cc = new ConfirmCheckpoint(new JobID(), new ExecutionAttemptID(), 45287698767345L, 467L);
+ ConfirmCheckpoint cc = new ConfirmCheckpoint(new JobID(), new ExecutionAttemptID(), 45287698767345L, 467L, null);
testSerializabilityEqualsHashCode(cc);
TriggerCheckpoint tc = new TriggerCheckpoint(new JobID(), new ExecutionAttemptID(), 347652734L, 7576752L);
http://git-wip-us.apache.org/repos/asf/flink/blob/f27c3f1d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index da0fdb7..94b4c4c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -97,7 +97,7 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function & Serial
return (Serializable) snapshots;
}
- public void confirmCheckpointCompleted(long checkpointId, long timestamp,
+ public void confirmCheckpointCompleted(long checkpointId,
StateHandle<Serializable> checkpointedState) throws Exception {
if (userFunction instanceof CheckpointCommitter) {
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/f27c3f1d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java
index 343f87d..13012d3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java
@@ -33,5 +33,5 @@ public interface StatefulStreamOperator<OUT> extends StreamOperator<OUT> {
Serializable getStateSnapshotFromFunction(long checkpointId, long timestamp) throws Exception;
- void confirmCheckpointCompleted(long checkpointId, long timestamp, StateHandle<Serializable> checkpointedState) throws Exception;
+ void confirmCheckpointCompleted(long checkpointId, StateHandle<Serializable> checkpointedState) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f27c3f1d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 4ae72d2..d4ec51c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.state.FileStateHandle;
import org.apache.flink.runtime.state.LocalStateHandle;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.runtime.util.SerializedValue;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -296,18 +297,21 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
}
+ @SuppressWarnings({ "unchecked", "rawtypes" })
@Override
- public void confirmCheckpoint(long checkpointId, long timestamp) throws Exception {
+ public void confirmCheckpoint(long checkpointId, SerializedValue<StateHandle<?>> state) throws Exception {
// we do nothing here so far. this should call commit on the source function, for example
synchronized (checkpointLock) {
if (streamOperator instanceof StatefulStreamOperator) {
- ((StatefulStreamOperator<?>) streamOperator).confirmCheckpointCompleted(checkpointId, timestamp, null);
+ ((StatefulStreamOperator) streamOperator).confirmCheckpointCompleted(checkpointId,
+ state.deserializeValue(getUserCodeClassLoader()));
}
if (hasChainedOperators) {
for (OneInputStreamOperator<?, ?> chainedOperator : outputHandler.getChainedOperators()) {
if (chainedOperator instanceof StatefulStreamOperator) {
- ((StatefulStreamOperator<?>) chainedOperator).confirmCheckpointCompleted(checkpointId, timestamp, null);
+ ((StatefulStreamOperator) chainedOperator).confirmCheckpointCompleted(checkpointId,
+ state.deserializeValue(getUserCodeClassLoader()));
}
}
}