You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/06/25 19:21:40 UTC

[07/12] flink git commit: [streaming] Add stateHandle to checkpointed message

[streaming] Add stateHandle to checkpointed message


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f27c3f1d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f27c3f1d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f27c3f1d

Branch: refs/heads/master
Commit: f27c3f1d1cbff7433a23227e75f2f8e30058397a
Parents: ef11e63
Author: Paris Carbone <se...@gmail.com>
Authored: Thu May 28 15:07:49 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Thu Jun 25 16:38:06 2015 +0200

----------------------------------------------------------------------
 .../runtime/checkpoint/CheckpointCoordinator.java   |  9 ++++++---
 .../runtime/checkpoint/SuccessfulCheckpoint.java    | 16 +++++++++++++++-
 .../tasks/CheckpointCommittingOperator.java         |  5 ++++-
 .../messages/checkpoint/ConfirmCheckpoint.java      | 13 ++++++++++++-
 .../org/apache/flink/runtime/taskmanager/Task.java  |  5 +++--
 .../flink/runtime/taskmanager/TaskManager.scala     |  3 ++-
 .../checkpoint/CheckpointCoordinatorTest.java       | 14 +++++++-------
 .../runtime/messages/CheckpointMessagesTest.java    |  2 +-
 .../api/operators/AbstractUdfStreamOperator.java    |  2 +-
 .../api/operators/StatefulStreamOperator.java       |  2 +-
 .../flink/streaming/runtime/tasks/StreamTask.java   | 10 +++++++---
 11 files changed, 59 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f27c3f1d/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 424a9ed..8a31bee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -315,7 +315,7 @@ public class CheckpointCoordinator {
 		final long checkpointId = message.getCheckpointId();
 
 		SuccessfulCheckpoint completed = null;
-		
+		PendingCheckpoint checkpoint;
 		synchronized (lock) {
 			// we need to check inside the lock for being shutdown as well, otherwise we
 			// get races and invalid error log messages
@@ -323,7 +323,8 @@ public class CheckpointCoordinator {
 				return;
 			}
 			
-			PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);
+			checkpoint = pendingCheckpoints.get(checkpointId);
+			
 			if (checkpoint != null && !checkpoint.isDiscarded()) {
 				if (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getState())) {
 					
@@ -367,11 +368,13 @@ public class CheckpointCoordinator {
 		// to be outside the lock scope
 		if (completed != null) {
 			final long timestamp = completed.getTimestamp();
+			
 			for (ExecutionVertex ev : tasksToCommitTo) {
 				Execution ee = ev.getCurrentExecutionAttempt();
 				if (ee != null) {
 					ExecutionAttemptID attemptId = ee.getAttemptId();
-					ConfirmCheckpoint confirmMessage = new ConfirmCheckpoint(job, attemptId, checkpointId, timestamp);
+					ConfirmCheckpoint confirmMessage = new ConfirmCheckpoint(job, attemptId, checkpointId, 
+							timestamp, completed.getState(ev.getJobvertexId()).getState() );
 					ev.sendMessageToCurrentExecution(confirmMessage, ee.getAttemptId());
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/f27c3f1d/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
index be0b301..85a3a79 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
@@ -18,11 +18,14 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import com.google.common.collect.Maps;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
+import java.util.Map;
 
 /**
  * A successful checkpoint describes a checkpoint after all required tasks acknowledged it (with their state)
@@ -38,7 +41,9 @@ public class SuccessfulCheckpoint {
 	
 	private final long timestamp;
 	
-	private final List<StateForTask> states;
+	private final Map<JobVertexID, StateForTask> vertexToState;
+	
+	private final List<StateForTask> states; 
 
 
 	public SuccessfulCheckpoint(JobID job, long checkpointID, long timestamp, List<StateForTask> states) {
@@ -46,6 +51,10 @@ public class SuccessfulCheckpoint {
 		this.checkpointID = checkpointID;
 		this.timestamp = timestamp;
 		this.states = states;
+		vertexToState = Maps.newHashMap();
+		for(StateForTask state : states){
+			vertexToState.put(state.getOperatorId(), state);
+		}
 	}
 
 	public JobID getJobId() {
@@ -63,6 +72,11 @@ public class SuccessfulCheckpoint {
 	public List<StateForTask> getStates() {
 		return states;
 	}
+	
+	public StateForTask getState(JobVertexID jobVertexID)
+	{
+		return vertexToState.get(jobVertexID);
+	}
 
 	// --------------------------------------------------------------------------------------------
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/f27c3f1d/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java
index be203d2..a6f9851 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java
@@ -18,7 +18,10 @@
 
 package org.apache.flink.runtime.jobgraph.tasks;
 
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.util.SerializedValue;
+
 public interface CheckpointCommittingOperator {
 	
-	void confirmCheckpoint(long checkpointId, long timestamp) throws Exception;
+	void confirmCheckpoint(long checkpointId, SerializedValue<StateHandle<?>> state) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f27c3f1d/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java
index d3a4374..43b5d4c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.messages.checkpoint;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.util.SerializedValue;
 
 /**
  * This message is sent from the {@link org.apache.flink.runtime.jobmanager.JobManager} to the
@@ -32,10 +34,15 @@ public class ConfirmCheckpoint extends AbstractCheckpointMessage implements java
 
 	/** The timestamp associated with the checkpoint */
 	private final long timestamp;
+
+	/** The stateHandle associated with the checkpoint confirmation message*/
+	private final SerializedValue<StateHandle<?>> state;
 	
-	public ConfirmCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, long timestamp) {
+	public ConfirmCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, long timestamp, 
+		SerializedValue<StateHandle<?>> state) {
 		super(job, taskExecutionId, checkpointId);
 		this.timestamp = timestamp;
+		this.state = state;
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -46,6 +53,10 @@ public class ConfirmCheckpoint extends AbstractCheckpointMessage implements java
 
 	// --------------------------------------------------------------------------------------------
 
+	public SerializedValue<StateHandle<?>> getState() {
+		return state;
+	}
+	
 	@Override
 	public int hashCode() {
 		return super.hashCode() + (int) (timestamp ^ (timestamp >>> 32));

http://git-wip-us.apache.org/repos/asf/flink/blob/f27c3f1d/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 6250837..25ad28d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -882,7 +882,8 @@ public class Task implements Runnable {
 		}
 	}
 	
-	public void confirmCheckpoint(final long checkpointID, final long checkpointTimestamp) {
+	public void confirmCheckpoint(final long checkpointID, 
+		final SerializedValue<StateHandle<?>> state) {
 		AbstractInvokable invokable = this.invokable;
 
 		if (executionState == ExecutionState.RUNNING && invokable != null) {
@@ -897,7 +898,7 @@ public class Task implements Runnable {
 					@Override
 					public void run() {
 						try {
-							checkpointer.confirmCheckpoint(checkpointID, checkpointTimestamp);
+							checkpointer.confirmCheckpoint(checkpointID, state);
 						}
 						catch (Throwable t) {
 							logger.error("Error while confirming checkpoint for " + taskName, t);

http://git-wip-us.apache.org/repos/asf/flink/blob/f27c3f1d/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 0b76bda..612d5c0 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -429,12 +429,13 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
         val taskExecutionId = message.getTaskExecutionId
         val checkpointId = message.getCheckpointId
         val timestamp = message.getTimestamp
+        val state = message.getState
 
         log.debug(s"Receiver ConfirmCheckpoint ${checkpointId}@${timestamp} for $taskExecutionId.")
 
         val task = runningTasks.get(taskExecutionId)
         if (task != null) {
-          task.confirmCheckpoint(checkpointId, timestamp)
+          task.confirmCheckpoint(checkpointId, state)
         } else {
           log.debug(
             s"Taskmanager received a checkpoint confirmation for unknown task $taskExecutionId.")

http://git-wip-us.apache.org/repos/asf/flink/blob/f27c3f1d/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 514b7c7..d56704d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -199,8 +199,8 @@ public class CheckpointCoordinatorTest {
 			
 			// validate that the relevant tasks got a confirmation message
 			{
-				ConfirmCheckpoint confirmMessage1 = new ConfirmCheckpoint(jid, attemptID1, checkpointId, timestamp);
-				ConfirmCheckpoint confirmMessage2 = new ConfirmCheckpoint(jid, attemptID2, checkpointId, timestamp);
+				ConfirmCheckpoint confirmMessage1 = new ConfirmCheckpoint(jid, attemptID1, checkpointId, timestamp, null);
+				ConfirmCheckpoint confirmMessage2 = new ConfirmCheckpoint(jid, attemptID2, checkpointId, timestamp, null);
 				verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(confirmMessage1), eq(attemptID1));
 				verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(confirmMessage2), eq(attemptID2));
 			}
@@ -237,8 +237,8 @@ public class CheckpointCoordinatorTest {
 				verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(expectedMessage1), eq(attemptID1));
 				verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(expectedMessage2), eq(attemptID2));
 
-				ConfirmCheckpoint confirmMessage1 = new ConfirmCheckpoint(jid, attemptID1, checkpointIdNew, timestampNew);
-				ConfirmCheckpoint confirmMessage2 = new ConfirmCheckpoint(jid, attemptID2, checkpointIdNew, timestampNew);
+				ConfirmCheckpoint confirmMessage1 = new ConfirmCheckpoint(jid, attemptID1, checkpointIdNew, timestampNew, null);
+				ConfirmCheckpoint confirmMessage2 = new ConfirmCheckpoint(jid, attemptID2, checkpointIdNew, timestampNew, null);
 				verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(confirmMessage1), eq(attemptID1));
 				verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(confirmMessage2), eq(attemptID2));
 			}
@@ -343,7 +343,7 @@ public class CheckpointCoordinatorTest {
 			
 			// the first confirm message should be out
 			verify(commitVertex, times(1)).sendMessageToCurrentExecution(
-					new ConfirmCheckpoint(jid, commitAttemptID, checkpointId1, timestamp1), commitAttemptID);
+					new ConfirmCheckpoint(jid, commitAttemptID, checkpointId1, timestamp1, null), commitAttemptID);
 			
 			// send the last remaining ack for the second checkpoint
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2));
@@ -355,7 +355,7 @@ public class CheckpointCoordinatorTest {
 
 			// the second commit message should be out
 			verify(commitVertex, times(1)).sendMessageToCurrentExecution(
-					new ConfirmCheckpoint(jid, commitAttemptID, checkpointId2, timestamp2), commitAttemptID);
+					new ConfirmCheckpoint(jid, commitAttemptID, checkpointId2, timestamp2, null), commitAttemptID);
 			
 			// validate the committed checkpoints
 			List<SuccessfulCheckpoint> scs = coord.getSuccessfulCheckpoints();
@@ -482,7 +482,7 @@ public class CheckpointCoordinatorTest {
 
 			// the first confirm message should be out
 			verify(commitVertex, times(1)).sendMessageToCurrentExecution(
-					new ConfirmCheckpoint(jid, commitAttemptID, checkpointId2, timestamp2), commitAttemptID);
+					new ConfirmCheckpoint(jid, commitAttemptID, checkpointId2, timestamp2, null), commitAttemptID);
 
 			// send the last remaining ack for the first checkpoint. This should not do anything
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1));

http://git-wip-us.apache.org/repos/asf/flink/blob/f27c3f1d/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
index a204c3d..05255ac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
@@ -38,7 +38,7 @@ public class CheckpointMessagesTest {
 	@Test
 	public void testTriggerAndConfirmCheckpoint() {
 		try {
-			ConfirmCheckpoint cc = new ConfirmCheckpoint(new JobID(), new ExecutionAttemptID(), 45287698767345L, 467L);
+			ConfirmCheckpoint cc = new ConfirmCheckpoint(new JobID(), new ExecutionAttemptID(), 45287698767345L, 467L, null);
 			testSerializabilityEqualsHashCode(cc);
 			
 			TriggerCheckpoint tc = new TriggerCheckpoint(new JobID(), new ExecutionAttemptID(), 347652734L, 7576752L);

http://git-wip-us.apache.org/repos/asf/flink/blob/f27c3f1d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index da0fdb7..94b4c4c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -97,7 +97,7 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function & Serial
 		return (Serializable) snapshots;
 	}
 
-	public void confirmCheckpointCompleted(long checkpointId, long timestamp,
+	public void confirmCheckpointCompleted(long checkpointId,
 			StateHandle<Serializable> checkpointedState) throws Exception {
 		if (userFunction instanceof CheckpointCommitter) {
 			try {

http://git-wip-us.apache.org/repos/asf/flink/blob/f27c3f1d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java
index 343f87d..13012d3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java
@@ -33,5 +33,5 @@ public interface StatefulStreamOperator<OUT> extends StreamOperator<OUT> {
 
 	Serializable getStateSnapshotFromFunction(long checkpointId, long timestamp) throws Exception;
 
-	void confirmCheckpointCompleted(long checkpointId, long timestamp, StateHandle<Serializable> checkpointedState) throws Exception;
+	void confirmCheckpointCompleted(long checkpointId, StateHandle<Serializable> checkpointedState) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f27c3f1d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 4ae72d2..d4ec51c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.state.FileStateHandle;
 import org.apache.flink.runtime.state.LocalStateHandle;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.runtime.util.SerializedValue;
 import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -296,18 +297,21 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 
 	}
 
+	@SuppressWarnings({ "unchecked", "rawtypes" })
 	@Override
-	public void confirmCheckpoint(long checkpointId, long timestamp) throws Exception {
+	public void confirmCheckpoint(long checkpointId, SerializedValue<StateHandle<?>> state) throws Exception {
 		// we do nothing here so far. this should call commit on the source function, for example
 		synchronized (checkpointLock) {
 			if (streamOperator instanceof StatefulStreamOperator) {
-				((StatefulStreamOperator<?>) streamOperator).confirmCheckpointCompleted(checkpointId, timestamp, null);
+				((StatefulStreamOperator) streamOperator).confirmCheckpointCompleted(checkpointId,
+						state.deserializeValue(getUserCodeClassLoader()));
 			}
 
 			if (hasChainedOperators) {
 				for (OneInputStreamOperator<?, ?> chainedOperator : outputHandler.getChainedOperators()) {
 					if (chainedOperator instanceof StatefulStreamOperator) {
-						((StatefulStreamOperator<?>) chainedOperator).confirmCheckpointCompleted(checkpointId, timestamp, null);
+						((StatefulStreamOperator) chainedOperator).confirmCheckpointCompleted(checkpointId,
+								state.deserializeValue(getUserCodeClassLoader()));
 					}
 				}
 			}