You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/10 15:00:10 UTC
[10/14] flink git commit: [FLINK-1638] [streaming] Add StateHandle
and include javadoc
[FLINK-1638] [streaming] Add StateHandle and include javadoc
This closes #459
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f2b5c21d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f2b5c21d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f2b5c21d
Branch: refs/heads/master
Commit: f2b5c21da6a297f20ffad99e9f26ccb0a9491881
Parents: 490fa70
Author: Paris Carbone <se...@gmail.com>
Authored: Mon Mar 9 14:58:30 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Mar 10 14:58:49 2015 +0100
----------------------------------------------------------------------
.../deployment/TaskDeploymentDescriptor.java | 13 ++--
.../flink/runtime/executiongraph/Execution.java | 13 ++--
.../runtime/executiongraph/ExecutionGraph.java | 6 +-
.../runtime/executiongraph/ExecutionVertex.java | 29 ++++----
.../jobgraph/tasks/BarrierTransceiver.java | 16 ++++-
.../jobgraph/tasks/OperatorStateCarrier.java | 15 ++--
.../flink/runtime/state/LocalStateHandle.java | 41 +++++++++++
.../apache/flink/runtime/state/StateHandle.java | 40 +++++++++++
.../StreamCheckpointCoordinator.scala | 76 +++++++++++++-------
.../flink/runtime/taskmanager/TaskManager.scala | 2 +-
.../api/streamvertex/StreamVertex.java | 9 ++-
11 files changed, 191 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index b2573f7..6993248 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -23,12 +23,11 @@ import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.state.OperatorState;
+import org.apache.flink.runtime.state.StateHandle;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -79,7 +78,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
/** The list of JAR files required to run this task. */
private final List<BlobKey> requiredJarFiles;
- private Map<String, OperatorState<?>> operatorStates;
+ private StateHandle operatorStates;
/**
* Constructs a task deployment descriptor.
@@ -129,13 +128,13 @@ public final class TaskDeploymentDescriptor implements Serializable {
Configuration taskConfiguration, String invokableClassName,
List<PartitionDeploymentDescriptor> producedPartitions,
List<PartitionConsumerDeploymentDescriptor> consumedPartitions,
- List<BlobKey> requiredJarFiles, int targetSlotNumber, Map<String,OperatorState<?>> operatorStates) {
+ List<BlobKey> requiredJarFiles, int targetSlotNumber, StateHandle operatorStates) {
this(jobID, vertexID, executionId, taskName, indexInSubtaskGroup, numberOfSubtasks,
jobConfiguration, taskConfiguration, invokableClassName, producedPartitions,
consumedPartitions, requiredJarFiles, targetSlotNumber);
- setOperatorStates(operatorStates);
+ setOperatorState(operatorStates);
}
/**
@@ -244,11 +243,11 @@ public final class TaskDeploymentDescriptor implements Serializable {
strProducedPartitions, strConsumedPartitions);
}
- public void setOperatorStates(Map<String,OperatorState<?>> operatorStates) {
+ public void setOperatorState(StateHandle operatorStates) {
this.operatorStates = operatorStates;
}
- public Map<String, OperatorState<?>> getOperatorStates() {
+ public StateHandle getOperatorStates() {
return operatorStates;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 93845c7..cf24b20 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -46,7 +46,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFutureAction;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult;
-import org.apache.flink.runtime.state.OperatorState;
+import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
@@ -56,7 +56,6 @@ import scala.concurrent.duration.FiniteDuration;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeoutException;
@@ -124,7 +123,7 @@ public class Execution implements Serializable {
private volatile InstanceConnectionInfo assignedResourceLocation; // for the archived execution
- private Map<String,OperatorState<?>> operatorStates;
+ private StateHandle operatorState;
// --------------------------------------------------------------------------------------------
@@ -858,11 +857,11 @@ public class Execution implements Serializable {
(assignedResource == null ? "(unassigned)" : assignedResource.toString()), state);
}
- public void setOperatorStates(Map<String,OperatorState<?>> operatorStates) {
- this.operatorStates = operatorStates;
+ public void setOperatorState(StateHandle operatorStates) {
+ this.operatorState = operatorStates;
}
- public Map<String,OperatorState<?>> getOperatorStates() {
- return operatorStates;
+ public StateHandle getOperatorState() {
+ return operatorState;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 0c6c3a7..c319a5c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -34,7 +34,7 @@ import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmanager.StreamCheckpointCoordinator;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.messages.ExecutionGraphMessages;
-import org.apache.flink.runtime.state.OperatorState;
+import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
@@ -567,9 +567,9 @@ public class ExecutionGraph implements Serializable {
}
}
- public synchronized void loadOperatorStates(Map<Tuple3<JobVertexID, Integer, Long> , Map<String,OperatorState<?>>> states)
+ public synchronized void loadOperatorStates(Map<Tuple3<JobVertexID, Integer, Long> , StateHandle> states)
{
- for(Map.Entry<Tuple3<JobVertexID, Integer, Long> , Map<String,OperatorState<?>>> state : states.entrySet())
+ for(Map.Entry<Tuple3<JobVertexID, Integer, Long> , StateHandle> state : states.entrySet())
{
tasks.get(state.getKey()._1()).getTaskVertices()[state.getKey()._2()].setOperatorState(state.getValue());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 41d34d5..24bcf21 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -18,17 +18,17 @@
package org.apache.flink.runtime.executiongraph;
-import org.apache.flink.runtime.deployment.PartialPartitionInfo;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
-import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.deployment.PartialPartitionInfo;
import org.apache.flink.runtime.deployment.PartitionConsumerDeploymentDescriptor;
import org.apache.flink.runtime.deployment.PartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.PartitionInfo;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobEdge;
@@ -38,9 +38,8 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.state.OperatorState;
+import org.apache.flink.runtime.state.StateHandle;
import org.slf4j.Logger;
-
import scala.concurrent.duration.FiniteDuration;
import java.io.Serializable;
@@ -48,13 +47,9 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import static com.google.common.base.Preconditions.checkElementIndex;
-import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
-import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
-import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
/**
* The ExecutionVertex is a parallel subtask of the execution. It may be executed once, or several times, each of
@@ -91,7 +86,7 @@ public class ExecutionVertex implements Serializable {
private volatile boolean scheduleLocalOnly;
- private Map<String,OperatorState<?>> operatorState;
+ private StateHandle operatorState;
// --------------------------------------------------------------------------------------------
@@ -199,11 +194,11 @@ public class ExecutionVertex implements Serializable {
return currentExecution.getAssignedResourceLocation();
}
- public void setOperatorState(Map<String,OperatorState<?>> operatorState) {
+ public void setOperatorState(StateHandle operatorState) {
this.operatorState = operatorState;
}
- public Map<String,OperatorState<?>> getOperatorState() {
+ public StateHandle getOperatorState() {
return operatorState;
}
@@ -382,7 +377,8 @@ public class ExecutionVertex implements Serializable {
Execution execution = currentExecution;
ExecutionState state = execution.getState();
- if (state == FINISHED || state == CANCELED || state == FAILED) {
+ if (state == ExecutionState.FINISHED || state == ExecutionState.CANCELED
+ || state == ExecutionState.FAILED) {
priorExecutions.add(execution);
currentExecution = new Execution(this, execution.getAttemptNumber()+1,
System.currentTimeMillis(), timeout);
@@ -394,7 +390,7 @@ public class ExecutionVertex implements Serializable {
if(operatorState!=null)
{
- execution.setOperatorStates(operatorState);
+ execution.setOperatorState(operatorState);
}
}
@@ -440,8 +436,9 @@ public class ExecutionVertex implements Serializable {
ExecutionState state = execution.getState();
// sanity check
- if (!(state == FINISHED || state == CANCELED || state == FAILED)) {
- throw new IllegalStateException("Cannot archive ExecutionVertex that is not in a finished state.");
+ if (!(state == ExecutionState.FINISHED || state == ExecutionState.CANCELED || state == ExecutionState.FAILED)) {
+ throw new IllegalStateException(
+ "Cannot archive ExecutionVertex that is not in a finished state.");
}
// prepare the current execution for archiving
http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java
index c56da62..0a8642e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java
@@ -18,10 +18,24 @@
package org.apache.flink.runtime.jobgraph.tasks;
+/**
+ * A BarrierTransceiver describes an operator's barrier checkpointing behavior used for
+ * fault tolerance. In the most common case [[broadcastBarrier]] is being expected to be called
+ * periodically upon receiving a checkpoint barrier. Furthermore, a [[confirmBarrier]] method should
+ * be implemented and used for acknowledging a specific checkpoint checkpoint.
+ */
public interface BarrierTransceiver {
+ /**
+ * A callback for notifying an operator of a new checkpoint barrier.
+ * @param barrierID
+ */
public void broadcastBarrier(long barrierID);
-
+
+ /**
+ * A callback for confirming that a barrier checkpoint is complete
+ * @param barrierID
+ */
public void confirmBarrier(long barrierID);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
index e8b6d6b..670dc3f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
@@ -18,12 +18,15 @@
package org.apache.flink.runtime.jobgraph.tasks;
-import org.apache.flink.runtime.state.OperatorState;
-
-import java.util.Map;
+import org.apache.flink.runtime.state.StateHandle;
+/**
+ * This is an interface meant to be implemented by any invokable that has to support state recovery.
+ * It is mainly used by the TaskManager to identify operators that support state recovery in order
+ * to inject their initial state upon creation.
+ */
public interface OperatorStateCarrier {
-
- public void injectStates(Map<String, OperatorState<?>> state);
-
+
+ public void injectState(StateHandle stateHandle);
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
new file mode 100644
index 0000000..ac40bf8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+
+import java.util.Map;
+
+/**
+ * A StateHandle that includes a copy of the state itself. This state handle is recommended for
+ * cases where the operatorState is lightweight enough to pass throughout the network.
+ *
+ */
+public class LocalStateHandle implements StateHandle{
+
+ private final Map<String, OperatorState<?>> state;
+
+ public LocalStateHandle(Map<String,OperatorState<?>> state) {
+ this.state = state;
+ }
+
+ @Override
+ public Map<String,OperatorState<?>> getState() {
+ return state;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java
new file mode 100644
index 0000000..ddc8038
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * StateHandle is a general handle interface meant to abstract operator state fetching.
+ * A StateHandle implementation can for example include the state itself in cases where the state
+ * is lightweight or fetching it lazily from some external storage when the state is too large.
+ *
+ */
+public interface StateHandle extends Serializable{
+
+ /**
+ * getState should retrieve and return the state managed the handle.
+ *
+ * @return
+ */
+ public Map<String,OperatorState<?>> getState();
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
index 7ab6a6f..fee69b5 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
@@ -26,38 +26,47 @@ import org.apache.flink.runtime.execution.ExecutionState.RUNNING
import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph, ExecutionVertex}
import org.apache.flink.runtime.jobgraph.JobStatus._
import org.apache.flink.runtime.jobgraph.{JobID, JobVertexID}
-import org.apache.flink.runtime.state.OperatorState
+import org.apache.flink.runtime.state.StateHandle
import scala.collection.JavaConversions._
import scala.collection.immutable.TreeMap
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.{FiniteDuration, _}
-object StreamCheckpointCoordinator {
-
- def spawn(context: ActorContext,executionGraph: ExecutionGraph,
- interval: FiniteDuration = 5 seconds): ActorRef = {
-
- val vertices: Iterable[ExecutionVertex] = getExecutionVertices(executionGraph)
- val monitor = context.system.actorOf(Props(new StreamCheckpointCoordinator(executionGraph,
- vertices,vertices.map(x => ((x.getJobVertex.getJobVertexId,x.getParallelSubtaskIndex),
- List.empty[Long])).toMap, Map() ,interval,0L,-1L)))
- monitor ! InitBarrierScheduler
- monitor
- }
-
- private def getExecutionVertices(executionGraph: ExecutionGraph): Iterable[ExecutionVertex] = {
- for((_,execJobVertex) <- executionGraph.getAllVertices;
- execVertex: ExecutionVertex <- execJobVertex.getTaskVertices)
- yield execVertex
- }
-}
+/**
+ * The StreamCheckpointCoordinator is responsible for operator state management and checkpoint
+ * coordination in streaming jobs. It periodically sends checkpoint barriers to the sources of a
+ * running job and constantly collects acknowledgements from operators while the barriers are being
+ * disseminated throughout the execution graph. Upon time intervals it finds the last globally
+ * acknowledged checkpoint barrier to be used for a consistent recovery and loads all associated
+ * state handles to the respected execution vertices.
+ *
+ * The following messages describe this actor's expected behavior:
+ *
+ * - [[InitBarrierScheduler]] initiates the actor and schedules the periodic [[BarrierTimeout]]
+ * and [[CompactAndUpdate]] messages that are used for maintaining the state checkpointing logic.
+ *
+ * - [[BarrierTimeout]] is periodically triggered upon initiation in order to start a new
+ * checkpoint barrier. That is when the barriers are being disseminated to the source vertices.
+ *
+ * - [[BarrierAck]] is being sent by each operator upon the completion of a state checkpoint. All
+ * such acknowledgements are being collected and inspected upon [[CompactAndUpdate]] handling in
+ * order to find out the last consistent checkpoint.
+ *
+ * - [[StateBarrierAck]] describes an acknowledgement such as the case of a [[BarrierAck]] that
+ * additionally carries operatorState with it.
+ *
+ * - [[CompactAndUpdate]] marks the last globally consistent checkpoint barrier to be used for
+ * recovery purposes and removes all older states and acknowledgements up to that barrier.
+ * Furthermore, it updates the current ExecutionGraph with the current operator state handles
+ *
+ */
class StreamCheckpointCoordinator(val executionGraph: ExecutionGraph,
val vertices: Iterable[ExecutionVertex],
var acks: Map[(JobVertexID,Int),List[Long]],
var states: Map[(JobVertexID, Integer, Long),
- java.util.Map[String,OperatorState[_]]],
+ StateHandle],
val interval: FiniteDuration,var curId: Long,var ackId: Long)
extends Actor with ActorLogMessages with ActorLogging {
@@ -95,8 +104,6 @@ class StreamCheckpointCoordinator(val executionGraph: ExecutionGraph,
}
log.debug(acks.toString)
-
-
case CompactAndUpdate =>
val barrierCount = acks.values.foldLeft(TreeMap[Long,Int]().withDefaultValue(0))((dict,myList)
=> myList.foldLeft(dict)((dict2,elem) => dict2.updated(elem,dict2(elem) + 1)))
@@ -108,7 +115,26 @@ class StreamCheckpointCoordinator(val executionGraph: ExecutionGraph,
executionGraph.loadOperatorStates(states)
}
-
+}
+
+object StreamCheckpointCoordinator {
+
+ def spawn(context: ActorContext,executionGraph: ExecutionGraph,
+ interval: FiniteDuration = 5 seconds): ActorRef = {
+
+ val vertices: Iterable[ExecutionVertex] = getExecutionVertices(executionGraph)
+ val monitor = context.system.actorOf(Props(new StreamCheckpointCoordinator(executionGraph,
+ vertices,vertices.map(x => ((x.getJobVertex.getJobVertexId,x.getParallelSubtaskIndex),
+ List.empty[Long])).toMap, Map() ,interval,0L,-1L)))
+ monitor ! InitBarrierScheduler
+ monitor
+ }
+
+ private def getExecutionVertices(executionGraph: ExecutionGraph): Iterable[ExecutionVertex] = {
+ for((_,execJobVertex) <- executionGraph.getAllVertices;
+ execVertex: ExecutionVertex <- execJobVertex.getTaskVertices)
+ yield execVertex
+ }
}
case class BarrierTimeout()
@@ -122,7 +148,7 @@ case class BarrierReq(attemptID: ExecutionAttemptID,checkpointID: Long)
case class BarrierAck(jobID: JobID,jobVertexID: JobVertexID,instanceID: Int,checkpointID: Long)
case class StateBarrierAck(jobID: JobID, jobVertexID: JobVertexID, instanceID: Integer,
- checkpointID: Long, states: java.util.Map[String,OperatorState[_]])
+ checkpointID: Long, states: StateHandle)
http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 3de917b..53c45ce 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -443,7 +443,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
{
val vertex = task.getEnvironment.getInvokable match {
case opStateCarrier: OperatorStateCarrier =>
- opStateCarrier.injectStates(tdd.getOperatorStates)
+ opStateCarrier.injectState(tdd.getOperatorStates)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index eb0d6ed..3548712 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -28,6 +28,8 @@ import org.apache.flink.runtime.jobgraph.tasks.BarrierTransceiver;
import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
import org.apache.flink.runtime.jobmanager.BarrierAck;
import org.apache.flink.runtime.jobmanager.StateBarrierAck;
+import org.apache.flink.runtime.state.LocalStateHandle;
+import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.runtime.state.OperatorState;
import org.apache.flink.streaming.api.StreamConfig;
@@ -112,7 +114,8 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
if (configuration.getStateMonitoring() && !states.isEmpty()) {
getEnvironment().getJobManager().tell(
new StateBarrierAck(getEnvironment().getJobID(), getEnvironment()
- .getJobVertexId(), context.getIndexOfThisSubtask(), barrierID, states),
+ .getJobVertexId(), context.getIndexOfThisSubtask(), barrierID,
+ new LocalStateHandle(states)),
ActorRef.noSender());
} else {
getEnvironment().getJobManager().tell(
@@ -284,8 +287,8 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
* Re-injects the user states into the map
*/
@Override
- public void injectStates(Map<String, OperatorState<?>> states) {
- this.states.putAll(states);
+ public void injectState(StateHandle stateHandle) {
+ this.states.putAll(stateHandle.getState());
}
private class SuperstepEventListener implements EventListener<TaskEvent> {