You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/05/12 23:03:16 UTC
[05/10] flink git commit: [streaming] Integrate new checkpointed
interface with StreamTask, StreamOperator, and PersistentKafkaSource
[streaming] Integrate new checkpointed interface with StreamTask, StreamOperator, and PersistentKafkaSource
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3b2ee23b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3b2ee23b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3b2ee23b
Branch: refs/heads/master
Commit: 3b2ee23b65544a5f517e77cb55911bacae4117c6
Parents: ededb6b
Author: Stephan Ewen <se...@apache.org>
Authored: Tue May 5 16:08:50 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue May 12 21:35:58 2015 +0200
----------------------------------------------------------------------
.../jobgraph/tasks/BarrierTransceiver.java | 43 ------
.../tasks/CheckpointCommittingOperator.java | 2 +-
.../jobgraph/tasks/CheckpointedOperator.java | 2 +-
.../jobgraph/tasks/OperatorStateCarrier.java | 2 +-
.../flink/runtime/state/LocalStateHandle.java | 14 +-
.../apache/flink/runtime/state/StateUtils.java | 4 +-
.../apache/flink/runtime/taskmanager/Task.java | 19 +--
.../checkpoint/CheckpointStateRestoreTest.java | 7 +-
.../kafka/api/simple/PersistentKafkaSource.java | 27 ++--
.../streaming/api/checkpoint/Checkpointed.java | 20 ++-
.../streaming/api/operators/StreamOperator.java | 53 ++++++-
.../streaming/runtime/tasks/OutputHandler.java | 4 +-
.../streaming/runtime/tasks/StreamTask.java | 150 ++++++++++---------
.../runtime/tasks/StreamingRuntimeContext.java | 60 +-------
.../runtime/tasks/StreamingSuperstep.java | 37 ++++-
.../streaming/runtime/io/BarrierBufferTest.java | 4 +-
.../ProcessFailureStreamingRecoveryITCase.java | 31 ++--
17 files changed, 229 insertions(+), 250 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3b2ee23b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java
deleted file mode 100644
index 1dd6a90..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobgraph.tasks;
-
-
-import java.io.IOException;
-
-/**
- * A BarrierTransceiver describes an operator's barrier checkpointing behavior used for
- * fault tolerance. In the most common case [[broadcastBarrier]] is being expected to be called
- * periodically upon receiving a checkpoint barrier. Furthermore, a [[confirmBarrier]] method should
- * be implemented and used for acknowledging a specific checkpoint checkpoint.
- */
-public interface BarrierTransceiver {
-
- /**
- * A callback for notifying an operator of a new checkpoint barrier.
- * @param barrierID
- */
- public void broadcastBarrierFromSource(long barrierID);
-
- /**
- * A callback for confirming that a barrier checkpoint is complete
- * @param barrierID
- */
- public void confirmBarrier(long barrierID) throws IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/3b2ee23b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java
index 69cb1f8..be203d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java
@@ -20,5 +20,5 @@ package org.apache.flink.runtime.jobgraph.tasks;
public interface CheckpointCommittingOperator {
- void confirmCheckpoint(long checkpointId, long timestamp);
+ void confirmCheckpoint(long checkpointId, long timestamp) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3b2ee23b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java
index d07b07e..17ba947 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java
@@ -20,5 +20,5 @@ package org.apache.flink.runtime.jobgraph.tasks;
public interface CheckpointedOperator {
- void triggerCheckpoint(long checkpointId, long timestamp);
+ void triggerCheckpoint(long checkpointId, long timestamp) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3b2ee23b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
index 576edb6..fb5e63f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
@@ -33,6 +33,6 @@ public interface OperatorStateCarrier<T extends StateHandle<?>> {
*
* @param stateHandle The handle to the state.
*/
- public void setInitialState(T stateHandle);
+ public void setInitialState(T stateHandle) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3b2ee23b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
index f47b054..fa0c515 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
@@ -18,23 +18,23 @@
package org.apache.flink.runtime.state;
-import java.util.Map;
+import java.io.Serializable;
/**
* A StateHandle that includes a map of operator states directly.
*/
-public class LocalStateHandle implements StateHandle<Map<String, OperatorState<?>>> {
+public class LocalStateHandle implements StateHandle<Serializable> {
private static final long serialVersionUID = 2093619217898039610L;
- private final Map<String, OperatorState<?>> stateMap;
+ private final Serializable state;
- public LocalStateHandle(Map<String,OperatorState<?>> state) {
- this.stateMap = state;
+ public LocalStateHandle(Serializable state) {
+ this.state = state;
}
@Override
- public Map<String,OperatorState<?>> getState() {
- return stateMap;
+ public Serializable getState() {
+ return state;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3b2ee23b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
index 2cdfef3..fbd76ba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
@@ -37,7 +37,9 @@ public class StateUtils {
* @param state The state handle.
* @param <T> Type bound for the
*/
- public static <T extends StateHandle<?>> void setOperatorState(OperatorStateCarrier<?> op, StateHandle<?> state) {
+ public static <T extends StateHandle<?>> void setOperatorState(OperatorStateCarrier<?> op, StateHandle<?> state)
+ throws Exception
+ {
@SuppressWarnings("unchecked")
OperatorStateCarrier<T> typedOp = (OperatorStateCarrier<T>) op;
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/flink/blob/3b2ee23b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 1578e4b..8630edf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -45,7 +45,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.BarrierTransceiver;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCommittingOperator;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator;
import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
@@ -57,6 +56,7 @@ import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.state.StateUtils;
import org.apache.flink.runtime.util.SerializedValue;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -872,23 +872,6 @@ public class Task implements Runnable {
};
executeAsyncCallRunnable(runnable, "Checkpoint Trigger");
}
- else if (invokable instanceof BarrierTransceiver) {
- final BarrierTransceiver barrierTransceiver = (BarrierTransceiver) invokable;
- final Logger logger = LOG;
-
- Runnable runnable = new Runnable() {
- @Override
- public void run() {
- try {
- barrierTransceiver.broadcastBarrierFromSource(checkpointID);
- }
- catch (Throwable t) {
- logger.error("Error while triggering checkpoint barriers", t);
- }
- }
- };
- executeAsyncCallRunnable(runnable, "Checkpoint Trigger");
- }
else {
LOG.error("Task received a checkpoint request, but is not a checkpointing task - "
+ taskNameWithSubtask);
http://git-wip-us.apache.org/repos/asf/flink/blob/3b2ee23b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 51c4890..e1cf061 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -27,15 +27,14 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.state.LocalStateHandle;
-import org.apache.flink.runtime.state.OperatorState;
import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.util.SerializableObject;
import org.apache.flink.runtime.util.SerializedValue;
import org.junit.Test;
import org.mockito.Mockito;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -52,7 +51,7 @@ public class CheckpointStateRestoreTest {
public void testSetState() {
try {
final SerializedValue<StateHandle<?>> serializedState = new SerializedValue<StateHandle<?>>(
- new LocalStateHandle(Collections.<String,OperatorState<?>>emptyMap()));
+ new LocalStateHandle(new SerializableObject()));
final JobID jid = new JobID();
final JobVertexID statefulId = new JobVertexID();
@@ -121,7 +120,7 @@ public class CheckpointStateRestoreTest {
public void testStateOnlyPartiallyAvailable() {
try {
final SerializedValue<StateHandle<?>> serializedState = new SerializedValue<StateHandle<?>>(
- new LocalStateHandle(Collections.<String,OperatorState<?>>emptyMap()));
+ new LocalStateHandle(new SerializableObject()));
final JobID jid = new JobID();
final JobVertexID statefulId = new JobVertexID();
http://git-wip-us.apache.org/repos/asf/flink/blob/3b2ee23b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
index 2af8fb2..a842160 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
@@ -28,6 +28,7 @@ import com.google.common.base.Preconditions;
import kafka.consumer.ConsumerConfig;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.OperatorState;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.ConnectorSource;
import org.apache.flink.streaming.connectors.kafka.api.simple.iterator.KafkaConsumerIterator;
@@ -51,7 +52,7 @@ import org.slf4j.LoggerFactory;
* @param <OUT>
* Type of the messages on the topic.
*/
-public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
+public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> implements Checkpointed<HashMap<Integer, KafkaOffset>> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(PersistentKafkaSource.class);
@@ -202,13 +203,13 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
if (indexOfSubtask >= numberOfPartitions) {
LOG.info("Creating idle consumer because this subtask ({}) is higher than the number partitions ({})", indexOfSubtask + 1, numberOfPartitions);
iterator = new KafkaIdleConsumerIterator();
- } else {
- if (context.containsState("kafka")) {
+ }
+ else {
+ if (partitionOffsets != null) {
+ // we have restored state
LOG.info("Initializing PersistentKafkaSource from existing state.");
- kafkaOffSetOperatorState = (OperatorState<Map<Integer, KafkaOffset>>) context.getState("kafka");
-
- partitionOffsets = kafkaOffSetOperatorState.getState();
- } else {
+ }
+ else {
LOG.info("No existing state found. Creating new");
partitionOffsets = new HashMap<Integer, KafkaOffset>();
@@ -217,8 +218,6 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
}
kafkaOffSetOperatorState = new OperatorState<Map<Integer, KafkaOffset>>(partitionOffsets);
-
- context.registerState("kafka", kafkaOffSetOperatorState);
}
iterator = new KafkaMultiplePartitionsIterator(topicId, partitionOffsets, kafkaTopicUtils, this.consumerConfig);
@@ -272,4 +271,14 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
Properties props = (Properties) in.readObject();
consumerConfig = new ConsumerConfig(props);
}
+
+ @Override
+ public HashMap<Integer, KafkaOffset> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+ return new HashMap<Integer, KafkaOffset>(this.partitionOffsets);
+ }
+
+ @Override
+ public void restoreState(HashMap<Integer, KafkaOffset> state) {
+ this.partitionOffsets = state;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3b2ee23b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
index f491dd3..2cab7a3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
@@ -18,7 +18,7 @@
package org.apache.flink.streaming.api.checkpoint;
-import org.apache.flink.runtime.state.OperatorState;
+import java.io.Serializable;
/**
* This method must be implemented by functions that have state that needs to be
@@ -31,12 +31,14 @@ import org.apache.flink.runtime.state.OperatorState;
* continue to work and mutate the state, even while the state snapshot is being accessed,
* can implement the {@link org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously}
* interface.</p>
+ *
+ * @param <T> The type of the operator state.
*/
-public interface Checkpointed {
+public interface Checkpointed<T extends Serializable> {
/**
- * Gets the current operator state as a checkpoint. The state must reflect all operations
- * from all prior operations if this function.
+ * Gets the current state of the function of operator. The state must reflect the result of all
+ * prior invocations to this function.
*
* @param checkpointId The ID of the checkpoint.
* @param checkpointTimestamp The timestamp of the checkpoint, as derived by
@@ -49,5 +51,13 @@ public interface Checkpointed {
* recovery), or to discard this checkpoint attempt and to continue running
* and to try again with the next checkpoint attempt.
*/
- OperatorState<?> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception;
+ T snapshotState(long checkpointId, long checkpointTimestamp) throws Exception;
+
+ /**
+ * Restores the state of the function or operator to that of a previous checkpoint.
+ * This method is invoked when a function is executed as part of a recovery run.
+ * *
+ * @param state The state to be restored.
+ */
+ void restoreState(T state);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3b2ee23b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index a361b7a..f66e394 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -26,6 +26,8 @@ import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointCommitter;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.runtime.io.IndexedReaderIterator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
@@ -107,16 +109,14 @@ public abstract class StreamOperator<IN, OUT> implements Serializable {
return nextRecord;
} catch (IOException e) {
if (isRunning) {
- throw new RuntimeException("Could not read next record due to: "
- + StringUtils.stringifyException(e));
+ throw new RuntimeException("Could not read next record", e);
} else {
// Task already cancelled do nothing
return null;
}
} catch (IllegalStateException e) {
if (isRunning) {
- throw new RuntimeException("Could not read next record due to: "
- + StringUtils.stringifyException(e));
+ throw new RuntimeException("Could not read next record", e);
} else {
// Task already cancelled do nothing
return null;
@@ -215,4 +215,49 @@ public abstract class StreamOperator<IN, OUT> implements Serializable {
public Function getUserFunction() {
return userFunction;
}
+
+ // ------------------------------------------------------------------------
+ // Checkpoints and Checkpoint Confirmations
+ // ------------------------------------------------------------------------
+
+ // NOTE - ALL OF THIS CODE WORKS ONLY FOR THE FIRST OPERATOR IN THE CHAIN
+ // IT NEEDS TO BE EXTENDED TO SUPPORT CHAINS
+
+ public void restoreInitialState(Serializable state) throws Exception {
+ if (userFunction instanceof Checkpointed) {
+ setStateOnFunction(state, userFunction);
+ }
+ else {
+ throw new IllegalStateException("Trying to restore state of a non-checkpointed function");
+ }
+ }
+
+ public Serializable getStateSnapshotFromFunction(long checkpointId, long timestamp) throws Exception {
+ if (userFunction instanceof Checkpointed) {
+ return ((Checkpointed<?>) userFunction).snapshotState(checkpointId, timestamp);
+ }
+ else {
+ return null;
+ }
+ }
+
+ public void confirmCheckpointCompleted(long checkpointId, long timestamp) throws Exception {
+ if (userFunction instanceof CheckpointCommitter) {
+ try {
+ ((CheckpointCommitter) userFunction).commitCheckpoint(checkpointId);
+ }
+ catch (Exception e) {
+ throw new Exception("Error while confirming checkpoint " + checkpointId + " to the stream function", e);
+ }
+ }
+ }
+
+ private static <T extends Serializable> void setStateOnFunction(Serializable state, Function function) {
+ @SuppressWarnings("unchecked")
+ T typedState = (T) state;
+ @SuppressWarnings("unchecked")
+ Checkpointed<T> typedFunction = (Checkpointed<T>) function;
+
+ typedFunction.restoreState(typedState);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3b2ee23b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
index c579c3a..55c0551 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
@@ -88,8 +88,8 @@ public class OutputHandler<OUT> {
this.outerCollector = createChainedCollector(configuration);
}
- public void broadcastBarrier(long id) throws IOException, InterruptedException {
- StreamingSuperstep barrier = new StreamingSuperstep(id);
+ public void broadcastBarrier(long id, long timestamp) throws IOException, InterruptedException {
+ StreamingSuperstep barrier = new StreamingSuperstep(id, timestamp);
for (StreamOutput<?> streamOutput : outputMap.values()) {
streamOutput.broadcastEvent(barrier);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3b2ee23b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index b4a11aa..930c9b4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -18,18 +18,15 @@
package org.apache.flink.streaming.runtime.tasks;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
+import java.io.Serializable;
import org.apache.flink.runtime.event.task.TaskEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.BarrierTransceiver;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCommittingOperator;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator;
import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
import org.apache.flink.runtime.state.LocalStateHandle;
-import org.apache.flink.runtime.state.OperatorState;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.ChainableStreamOperator;
@@ -40,16 +37,18 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.StringUtils;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StreamTask<IN, OUT> extends AbstractInvokable implements StreamTaskContext<OUT>,
- OperatorStateCarrier<LocalStateHandle>, CheckpointedOperator, CheckpointCommittingOperator,
- BarrierTransceiver {
+ OperatorStateCarrier<LocalStateHandle>, CheckpointedOperator, CheckpointCommittingOperator {
private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
+ private final Object checkpointLock = new Object();
+
private static int numTasks;
protected StreamConfig configuration;
@@ -62,7 +61,6 @@ public class StreamTask<IN, OUT> extends AbstractInvokable implements StreamTask
protected volatile boolean isRunning = false;
private StreamingRuntimeContext context;
- private Map<String, OperatorState<?>> states;
protected ClassLoader userClassLoader;
@@ -90,32 +88,7 @@ public class StreamTask<IN, OUT> extends AbstractInvokable implements StreamTask
protected void initialize() {
this.userClassLoader = getUserCodeClassLoader();
this.configuration = new StreamConfig(getTaskConfiguration());
- this.states = new HashMap<String, OperatorState<?>>();
- this.context = createRuntimeContext(getEnvironment().getTaskName(), this.states);
- }
-
- @Override
- public void broadcastBarrierFromSource(long id) {
- // Only called at input vertices
- if (LOG.isDebugEnabled()) {
- LOG.debug("Received barrier from jobmanager: " + id);
- }
- actOnBarrier(id);
- }
-
- /**
- * This method is called to confirm that a barrier has been fully processed.
- * It sends an acknowledgment to the jobmanager. In the current version if
- * there is user state it also checkpoints the state to the jobmanager.
- */
- @Override
- public void confirmBarrier(long barrierID) throws IOException {
- if (configuration.getStateMonitoring() && !states.isEmpty()) {
- getEnvironment().acknowledgeCheckpoint(barrierID, new LocalStateHandle(states));
- }
- else {
- getEnvironment().acknowledgeCheckpoint(barrierID);
- }
+ this.context = createRuntimeContext(getEnvironment().getTaskName());
}
public void setInputsOutputs() {
@@ -136,11 +109,10 @@ public class StreamTask<IN, OUT> extends AbstractInvokable implements StreamTask
return instanceID;
}
- public StreamingRuntimeContext createRuntimeContext(String taskName,
- Map<String, OperatorState<?>> states) {
+ public StreamingRuntimeContext createRuntimeContext(String taskName) {
Environment env = getEnvironment();
return new StreamingRuntimeContext(taskName, env, getUserCodeClassLoader(),
- getExecutionConfig(), states);
+ getExecutionConfig());
}
@Override
@@ -272,62 +244,98 @@ public class StreamTask<IN, OUT> extends AbstractInvokable implements StreamTask
return this.superstepListener;
}
+ // ------------------------------------------------------------------------
+ // Checkpoint and Restore
+ // ------------------------------------------------------------------------
+
/**
- * Method to be called when a barrier is received from all the input
- * channels. It should broadcast the barrier to the output operators,
- * checkpoint the state and send an ack.
- *
- * @param id
+ * Re-injects the user states into the map. Also set the state on the functions.
*/
- private synchronized void actOnBarrier(long id) {
- if (isRunning) {
- try {
- outputHandler.broadcastBarrier(id);
- confirmBarrier(id);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Superstep " + id + " processed: " + StreamTask.this);
- }
- } catch (Exception e) {
- // Only throw any exception if the vertex is still running
- if (isRunning) {
- throw new RuntimeException(e);
- }
- }
- }
- }
-
@Override
- public String toString() {
- return getEnvironment().getTaskNameWithSubtasks();
+ public void setInitialState(LocalStateHandle stateHandle) throws Exception {
+ // here, we later resolve the state handle into the actual state by
+ // loading the state described by the handle from the backup store
+ Serializable state = stateHandle.getState();
+ streamOperator.restoreInitialState(state);
}
/**
- * Re-injects the user states into the map
+ * This method is either called directly by the checkpoint coordinator, or called
+ * when all incoming channels have reported a barrier
+ *
+ * @param checkpointId
+ * @param timestamp
+ * @throws Exception
*/
@Override
- public void setInitialState(LocalStateHandle stateHandle) {
- this.states.putAll(stateHandle.getState());
+ public void triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
+
+ synchronized (checkpointLock) {
+ if (isRunning) {
+ try {
+ LOG.info("Starting checkpoint " + checkpointId);
+
+ // first draw the state that should go into checkpoint
+ LocalStateHandle state;
+ try {
+ Serializable userState = streamOperator.getStateSnapshotFromFunction(checkpointId, timestamp);
+ state = userState == null ? null : new LocalStateHandle(userState);
+ }
+ catch (Exception e) {
+ throw new Exception("Error while drawing snapshot of the user state.");
+ }
+
+ // now emit the checkpoint barriers
+ outputHandler.broadcastBarrier(checkpointId, timestamp);
+
+ // now confirm the checkpoint
+ if (state == null) {
+ getEnvironment().acknowledgeCheckpoint(checkpointId);
+ } else {
+ getEnvironment().acknowledgeCheckpoint(checkpointId, state);
+ }
+ }
+ catch (Exception e) {
+ if (isRunning) {
+ throw e;
+ }
+ }
+ }
+ }
}
@Override
- public void triggerCheckpoint(long checkpointId, long timestamp) {
- broadcastBarrierFromSource(checkpointId);
+ public void confirmCheckpoint(long checkpointId, long timestamp) throws Exception {
+ // we do nothing here so far. this should call commit on the source function, for example
+ synchronized (checkpointLock) {
+ streamOperator.confirmCheckpointCompleted(checkpointId, timestamp);
+ }
}
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
@Override
- public void confirmCheckpoint(long checkpointId, long timestamp) {
- // we do nothing here so far. this should call commit on the source function, for example
+ public String toString() {
+ return getEnvironment().getTaskNameWithSubtasks();
}
-
-
+ // ------------------------------------------------------------------------
private class SuperstepEventListener implements EventListener<TaskEvent> {
@Override
public void onEvent(TaskEvent event) {
- actOnBarrier(((StreamingSuperstep) event).getId());
+ try {
+ StreamingSuperstep sStep = (StreamingSuperstep) event;
+ triggerCheckpoint(sStep.getId(), sStep.getTimestamp());
+ }
+ catch (Exception e) {
+ throw new RuntimeException(
+ "Error triggering a checkpoint as the result of receiving checkpoint barrier", e);
+ }
}
-
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3b2ee23b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
index 8ae2ced..6112e03 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
@@ -18,17 +18,13 @@
package org.apache.flink.streaming.runtime.tasks;
-import java.util.Map;
-
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.runtime.state.OperatorState;
/**
* Implementation of the {@link RuntimeContext}, created by runtime stream UDF
@@ -37,65 +33,13 @@ import org.apache.flink.runtime.state.OperatorState;
public class StreamingRuntimeContext extends RuntimeUDFContext {
private final Environment env;
- private final Map<String, OperatorState<?>> operatorStates;
+
public StreamingRuntimeContext(String name, Environment env, ClassLoader userCodeClassLoader,
- ExecutionConfig executionConfig, Map<String, OperatorState<?>> operatorStates) {
+ ExecutionConfig executionConfig) {
super(name, env.getNumberOfSubtasks(), env.getIndexInSubtaskGroup(), userCodeClassLoader,
executionConfig, env.getDistributedCacheEntries());
this.env = env;
- this.operatorStates = operatorStates;
- }
-
- /**
- * Returns the operator state registered by the given name for the operator.
- *
- * @param name
- * Name of the operator state to be returned.
- * @return The operator state.
- */
- public OperatorState<?> getState(String name) {
- if (operatorStates == null) {
- throw new RuntimeException("No state has been registered for this operator.");
- } else {
- OperatorState<?> state = operatorStates.get(name);
- if (state != null) {
- return state;
- } else {
- throw new RuntimeException("No state has been registered for the name: " + name);
- }
- }
- }
-
- /**
- * Returns whether there is a state stored by the given name
- */
- public boolean containsState(String name) {
- return operatorStates.containsKey(name);
- }
-
- /**
- * This is a beta feature </br></br> Register an operator state for this
- * operator by the given name. This name can be used to retrieve the state
- * during runtime using {@link StreamingRuntimeContext#getState(String)}. To
- * obtain the {@link StreamingRuntimeContext} from the user-defined function
- * use the {@link RichFunction#getRuntimeContext()} method.
- *
- * @param name
- * The name of the operator state.
- * @param state
- * The state to be registered for this name.
- */
- public void registerState(String name, OperatorState<?> state) {
- if (state == null) {
- throw new RuntimeException("Cannot register null state");
- } else {
- if (operatorStates.containsKey(name)) {
- throw new RuntimeException("State is already registered");
- } else {
- operatorStates.put(name, state);
- }
- }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/3b2ee23b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingSuperstep.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingSuperstep.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingSuperstep.java
index 4b3419b..f749773 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingSuperstep.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingSuperstep.java
@@ -27,34 +27,57 @@ import org.apache.flink.runtime.event.task.TaskEvent;
public class StreamingSuperstep extends TaskEvent {
protected long id;
+ protected long timestamp;
- public StreamingSuperstep() {
+ public StreamingSuperstep() {}
+ public StreamingSuperstep(long id, long timestamp) {
+ this.id = id;
+ this.timestamp = timestamp;
}
- public StreamingSuperstep(long id) {
- this.id = id;
+ public long getId() {
+ return id;
}
+ public long getTimestamp() {
+ return id;
+ }
+
+ // ------------------------------------------------------------------------
+
@Override
public void write(DataOutputView out) throws IOException {
out.writeLong(id);
+ out.writeLong(timestamp);
}
@Override
public void read(DataInputView in) throws IOException {
id = in.readLong();
+ timestamp = in.readLong();
}
+
+ // ------------------------------------------------------------------------
- public long getId() {
- return id;
+ @Override
+ public int hashCode() {
+ return (int) (id ^ (id >>> 32) ^ timestamp ^(timestamp >>> 32));
}
+ @Override
public boolean equals(Object other) {
if (other == null || !(other instanceof StreamingSuperstep)) {
return false;
- } else {
- return ((StreamingSuperstep) other).id == this.id;
}
+ else {
+ StreamingSuperstep that = (StreamingSuperstep) other;
+ return that.id == this.id && that.timestamp == this.timestamp;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.format("StreamingSuperstep %d @ %d", id, timestamp);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3b2ee23b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index b8af4ed..0afe8b5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -33,8 +33,8 @@ import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.streaming.runtime.io.BarrierBuffer;
import org.apache.flink.streaming.runtime.tasks.StreamingSuperstep;
+
import org.junit.Test;
public class BarrierBufferTest {
@@ -201,7 +201,7 @@ public class BarrierBufferTest {
}
protected static BufferOrEvent createSuperstep(long id, int channel) {
- return new BufferOrEvent(new StreamingSuperstep(id), channel);
+ return new BufferOrEvent(new StreamingSuperstep(id, System.currentTimeMillis()), channel);
}
protected static BufferOrEvent createBuffer(int channel) {
http://git-wip-us.apache.org/repos/asf/flink/blob/3b2ee23b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
index aeaad5c..65e39a2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
@@ -22,7 +22,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.OperatorState;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -145,12 +145,15 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
}
}
- public static class SleepyDurableGenerateSequence extends RichParallelSourceFunction<Long> {
+ public static class SleepyDurableGenerateSequence extends RichParallelSourceFunction<Long>
+ implements Checkpointed<Long> {
private static final long SLEEP_TIME = 50;
private final File coordinateDir;
private final long end;
+
+ private long collected;
public SleepyDurableGenerateSequence(File coordinateDir, long end) {
this.coordinateDir = coordinateDir;
@@ -162,23 +165,10 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
public void run(Collector<Long> collector) throws Exception {
StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
- OperatorState<Long> collectedState;
- if (context.containsState("collected")) {
- collectedState = (OperatorState<Long>) context.getState("collected");
-
-// if (collected == 0) {
-// throw new RuntimeException("The state did not capture a completed checkpoint");
-// }
- }
- else {
- collectedState = new OperatorState<Long>(0L);
- context.registerState("collected", collectedState);
- }
final long stepSize = context.getNumberOfParallelSubtasks();
final long congruence = context.getIndexOfThisSubtask();
final long toCollect = (end % stepSize > congruence) ? (end / stepSize + 1) : (end / stepSize);
- long collected = collectedState.getState();
final File proceedFile = new File(coordinateDir, PROCEED_MARKER_FILE);
boolean checkForProceedFile = true;
@@ -196,13 +186,22 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
}
collector.collect(collected * stepSize + congruence);
- collectedState.update(collected);
collected++;
}
}
@Override
public void cancel() {}
+
+ @Override
+ public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+ return collected;
+ }
+
+ @Override
+ public void restoreState(Long state) {
+ collected = state;
+ }
}