You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/12/02 19:36:32 UTC
flink git commit: [FLINK-5229] [tm] Discard completed checkpoints if
a subsequent operator's checkpoint fails
Repository: flink
Updated Branches:
refs/heads/release-1.1 388acbca9 -> 020da2ce1
[FLINK-5229] [tm] Discard completed checkpoints if a subsequent operator's checkpoint fails
In case of a failure of any StreamOperator#snapshotState method, all up to this point created
StreamTaskStates are discarded. This ensures that a failing checkpoint operation of a chained
operator won't leave orphaned checkpoint data behind.
This closes #2924.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/020da2ce
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/020da2ce
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/020da2ce
Branch: refs/heads/release-1.1
Commit: 020da2ce1c8be83789252d0db959896a761d7513
Parents: 388acbc
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Dec 2 15:33:06 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Dec 2 19:10:46 2016 +0100
----------------------------------------------------------------------
.../streaming/state/RocksDBStateBackend.java | 214 ++++++++++++-------
.../org/apache/flink/util/ExceptionUtils.java | 50 +++++
.../runtime/checkpoint/PendingCheckpoint.java | 14 +-
.../state/AsynchronousKvStateSnapshot.java | 5 -
.../runtime/state/AsynchronousStateHandle.java | 5 -
.../streaming/runtime/tasks/StreamTask.java | 69 ++++--
.../runtime/tasks/StreamTaskState.java | 28 ++-
.../tasks/StreamTaskAsyncCheckpointTest.java | 5 +
.../streaming/runtime/tasks/StreamTaskTest.java | 105 +++++++++
9 files changed, 377 insertions(+), 118 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/020da2ce/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 1561afc..3d75bde 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -77,6 +77,7 @@ import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
+import org.rocksdb.Snapshot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -638,11 +639,15 @@ public class RocksDBStateBackend extends AbstractStateBackend {
*/
private static class SemiAsyncSnapshot extends AsynchronousKvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> {
private static final long serialVersionUID = 1L;
+
+ private final SerializableObject lock = new SerializableObject();
private final File localBackupPath;
private final URI backupUri;
private final List<StateDescriptor> stateDescriptors;
private final long checkpointId;
+ private volatile boolean discarded;
+
private SemiAsyncSnapshot(File localBackupPath,
URI backupUri,
List<StateDescriptor> columnFamilies,
@@ -651,22 +656,45 @@ public class RocksDBStateBackend extends AbstractStateBackend {
this.backupUri = backupUri;
this.stateDescriptors = columnFamilies;
this.checkpointId = checkpointId;
+ this.discarded = false;
}
@Override
public KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> materialize() throws Exception {
- try {
- long startTime = System.currentTimeMillis();
- HDFSCopyFromLocal.copyFromLocal(localBackupPath, backupUri);
- long endTime = System.currentTimeMillis();
- LOG.info("RocksDB materialization from " + localBackupPath + " to " + backupUri + " (asynchronous part) took " + (endTime - startTime) + " ms.");
- return new FinalSemiAsyncSnapshot(backupUri, checkpointId, stateDescriptors);
- } catch (Exception e) {
- FileSystem fs = FileSystem.get(backupUri, HadoopFileSystem.getHadoopConfiguration());
- fs.delete(new org.apache.hadoop.fs.Path(backupUri), true);
- throw e;
- } finally {
- FileUtils.deleteQuietly(localBackupPath);
+ synchronized (lock) {
+ if (discarded) {
+ throw new Exception("The SemiAsyncSnapshot has already been discarded.");
+ } else {
+ try {
+ long startTime = System.currentTimeMillis();
+ HDFSCopyFromLocal.copyFromLocal(localBackupPath, backupUri);
+ long endTime = System.currentTimeMillis();
+
+ LOG.info("RocksDB materialization from {} to {} (asynchronous part) took {} ms.", localBackupPath, backupUri, (endTime - startTime));
+
+ return new FinalSemiAsyncSnapshot(backupUri, checkpointId, stateDescriptors);
+ } catch (Exception e) {
+ FileSystem fs = FileSystem.get(backupUri, HadoopFileSystem.getHadoopConfiguration());
+ fs.delete(new org.apache.hadoop.fs.Path(backupUri), true);
+ throw e;
+ } finally {
+ discardState();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void discardState() throws Exception {
+ if (!discarded) {
+ synchronized (lock) {
+ if (!discarded) {
+ discarded = true;
+ if (!FileUtils.deleteQuietly(localBackupPath)) {
+ LOG.warn("Could not delete the local backup file stored at {}.", localBackupPath);
+ }
+ }
+ }
}
}
}
@@ -732,10 +760,13 @@ public class RocksDBStateBackend extends AbstractStateBackend {
private transient org.rocksdb.Snapshot snapshot;
private transient AbstractStateBackend backend;
+ private final SerializableObject lock = new SerializableObject();
private final URI backupUri;
private final Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> columnFamilies;
private final long checkpointId;
+ private volatile boolean discarded;
+
private FullyAsyncSnapshot(org.rocksdb.Snapshot snapshot,
AbstractStateBackend backend,
URI backupUri,
@@ -746,99 +777,122 @@ public class RocksDBStateBackend extends AbstractStateBackend {
this.backupUri = backupUri;
this.columnFamilies = columnFamilies;
this.checkpointId = checkpointId;
+ this.discarded = false;
}
@Override
public KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> materialize() throws Exception {
- long startTime = System.currentTimeMillis();
- CheckpointStateOutputView outputView;
-
- try {
- try {
- outputView = backend.createCheckpointStateOutputView(checkpointId, startTime);
- } catch (Exception e) {
- throw new Exception("Could not create a checkpoint state output view to " +
- "materialize the checkpoint data into.", e);
- }
-
- try {
- outputView.writeInt(columnFamilies.size());
+ synchronized (lock) {
+ if (discarded) {
+ throw new Exception("FullyAsyncSnapshot has already been discarded.");
+ } else {
+ long startTime = System.currentTimeMillis();
+ CheckpointStateOutputView outputView;
- // we don't know how many key/value pairs there are in each column family.
- // We prefix every written element with a byte that signifies to which
- // column family it belongs, this way we can restore the column families
- byte count = 0;
- Map<String, Byte> columnFamilyMapping = new HashMap<>();
- for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column : columnFamilies.entrySet()) {
- columnFamilyMapping.put(column.getKey(), count);
+ try {
+ try {
+ outputView = backend.createCheckpointStateOutputView(checkpointId, startTime);
+ } catch (Exception e) {
+ throw new Exception("Could not create a checkpoint state output view to " +
+ "materialize the checkpoint data into.", e);
+ }
- outputView.writeByte(count);
+ try {
+ outputView.writeInt(columnFamilies.size());
- ObjectOutputStream ooOut = new ObjectOutputStream(outputView);
- ooOut.writeObject(column.getValue().f1);
- ooOut.flush();
+ // we don't know how many key/value pairs there are in each column family.
+ // We prefix every written element with a byte that signifies to which
+ // column family it belongs, this way we can restore the column families
+ byte count = 0;
+ Map<String, Byte> columnFamilyMapping = new HashMap<>();
+ for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column : columnFamilies.entrySet()) {
+ columnFamilyMapping.put(column.getKey(), count);
- count++;
- }
+ outputView.writeByte(count);
- ReadOptions readOptions = new ReadOptions();
- readOptions.setSnapshot(snapshot);
+ ObjectOutputStream ooOut = new ObjectOutputStream(outputView);
+ ooOut.writeObject(column.getValue().f1);
+ ooOut.flush();
- for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column : columnFamilies.entrySet()) {
- byte columnByte = columnFamilyMapping.get(column.getKey());
+ count++;
+ }
- synchronized (dbCleanupLock) {
- if (db == null) {
- throw new RuntimeException("RocksDB instance was disposed. This happens " +
- "when we are in the middle of a checkpoint and the job fails.");
+ ReadOptions readOptions = new ReadOptions();
+ readOptions.setSnapshot(snapshot);
+
+ for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column : columnFamilies.entrySet()) {
+ byte columnByte = columnFamilyMapping.get(column.getKey());
+
+ synchronized (dbCleanupLock) {
+ if (db == null) {
+ throw new RuntimeException("RocksDB instance was disposed. This happens " +
+ "when we are in the middle of a checkpoint and the job fails.");
+ }
+ RocksIterator iterator = db.newIterator(column.getValue().f0, readOptions);
+ iterator.seekToFirst();
+ while (iterator.isValid()) {
+ outputView.writeByte(columnByte);
+ BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.key(),
+ outputView);
+ BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.value(),
+ outputView);
+ iterator.next();
+ }
+ }
}
- RocksIterator iterator = db.newIterator(column.getValue().f0, readOptions);
- iterator.seekToFirst();
- while (iterator.isValid()) {
- outputView.writeByte(columnByte);
- BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.key(),
- outputView);
- BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.value(),
- outputView);
- iterator.next();
+ } catch (Exception e) {
+ try {
+ // closing the output view deletes the underlying data
+ outputView.close();
+ } catch (Exception closingException) {
+ LOG.warn("Could not close the checkpoint state output view. The " +
+ "written data might not be deleted.", closingException);
}
+
+ throw new Exception("Could not write the checkpoint data into the checkpoint " +
+ "state output view.", e);
}
+ } finally {
+ discardState();
}
- } catch (Exception e) {
+
+ StateHandle<DataInputView> stateHandle;
+
try {
- // closing the output view deletes the underlying data
- outputView.close();
- } catch (Exception closingException) {
- LOG.warn("Could not close the checkpoint state output view. The " +
- "written data might not be deleted.", closingException);
+ stateHandle = outputView.closeAndGetHandle();
+ } catch (Exception ioE) {
+ throw new Exception("Could not close the checkpoint state output view and " +
+ "obtain the state handle.", ioE);
}
- throw new Exception("Could not write the checkpoint data into the checkpoint " +
- "state output view.", e);
+ long endTime = System.currentTimeMillis();
+ LOG.info("Fully asynchronous RocksDB materialization to {} (asynchronous part) took {} ms.", backupUri, (endTime - startTime));
+ return new FinalFullyAsyncSnapshot(stateHandle, checkpointId);
+ }
+ }
+ }
+
+ @Override
+ public void discardState() throws Exception {
+ if (!discarded) {
+ final Snapshot snapshotToRelease = snapshot;
+
+ synchronized (lock) {
+ if (discarded) {
+ return;
+ } else {
+ discarded = true;
+ snapshot = null;
+ }
}
- } finally {
+
synchronized (dbCleanupLock) {
if (db != null) {
- db.releaseSnapshot(snapshot);
+ db.releaseSnapshot(snapshotToRelease);
}
}
- snapshot = null;
- }
-
- StateHandle<DataInputView> stateHandle;
-
- try {
- stateHandle = outputView.closeAndGetHandle();
- } catch (Exception ioE) {
- throw new Exception("Could not close the checkpoint state output view and " +
- "obtain the state handle.", ioE);
}
-
- long endTime = System.currentTimeMillis();
- LOG.info("Fully asynchronous RocksDB materialization to {} (asynchronous part) took {} ms.", backupUri, (endTime - startTime));
- return new FinalFullyAsyncSnapshot(stateHandle, checkpointId);
}
-
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/020da2ce/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 7227006..516cc1d 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -26,10 +26,13 @@ package org.apache.flink.util;
import org.apache.flink.annotation.Internal;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
@Internal
public final class ExceptionUtils {
@@ -142,6 +145,53 @@ public final class ExceptionUtils {
}
/**
+ * Adds a new exception as a {@link Throwable#addSuppressed(Throwable) suppressed exception}
+ * to a prior exception, or returns the new exception, if no prior exception exists.
+ *
+ * <pre>{@code
+ *
+ * public void closeAllThings() throws Exception {
+ * Exception ex = null;
+ * try {
+ * component.shutdown();
+ * } catch (Exception e) {
+ * ex = firstOrSuppressed(e, ex);
+ * }
+ * try {
+ * anotherComponent.stop();
+ * } catch (Exception e) {
+ * ex = firstOrSuppressed(e, ex);
+ * }
+ * try {
+ * lastComponent.shutdown();
+ * } catch (Exception e) {
+ * ex = firstOrSuppressed(e, ex);
+ * }
+ *
+ * if (ex != null) {
+ * throw ex;
+ * }
+ * }
+ * }</pre>
+ *
+ * @param newException The newly occurred exception
+ * @param previous The previously occurred exception, possibly null.
+ *
+ * @return The new exception, if no previous exception exists, or the previous exception with the
+ * new exception in the list of suppressed exceptions.
+ */
+ public static <T extends Throwable> T firstOrSuppressed(T newException, @Nullable T previous) {
+ checkNotNull(newException, "newException");
+
+ if (previous == null) {
+ return newException;
+ } else {
+ previous.addSuppressed(newException);
+ return previous;
+ }
+ }
+
+ /**
* Private constructor to prevent instantiation.
*/
private ExceptionUtils() {
http://git-wip-us.apache.org/repos/asf/flink/blob/020da2ce/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 6f185bd..ca35417 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -237,21 +237,23 @@ public class PendingCheckpoint {
executor.execute(new Runnable() {
@Override
public void run() {
- try {
- for (TaskState taskState: taskStates.values()) {
+ for (TaskState taskState: taskStates.values()) {
+ try {
taskState.discard(userClassLoader);
+ } catch (Exception e) {
+ LOG.warn("Could not properly dispose the task state " +
+ "belonging to vertex {} of checkpoint {} and job {}.",
+ taskState.getJobVertexID(), checkpointId, jobId, e);
}
- } catch (Exception e) {
- LOG.warn("Could not properly dispose the pending checkpoint " +
- "{} of job {}.", checkpointId, jobId, e);
}
+
+ taskStates.clear();
}
});
}
} finally {
discarded = true;
- taskStates.clear();
notYetAcknowledgedTasks.clear();
acknowledgedTasks.clear();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/020da2ce/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousKvStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousKvStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousKvStateSnapshot.java
index c2fc8a4..db9c273 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousKvStateSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousKvStateSnapshot.java
@@ -52,11 +52,6 @@ public abstract class AsynchronousKvStateSnapshot<K, N, S extends State, SD exte
}
@Override
- public void discardState() throws Exception {
- throw new RuntimeException("This should never be called and probably points to a bug.");
- }
-
- @Override
public long getStateSize() throws Exception {
throw new RuntimeException("This should never be called and probably points to a bug.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/020da2ce/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousStateHandle.java
index fee1efe..6c77c16 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousStateHandle.java
@@ -35,9 +35,4 @@ public abstract class AsynchronousStateHandle<T> implements StateHandle<T> {
public final T getState(ClassLoader userCodeClassLoader) throws Exception {
throw new UnsupportedOperationException("This must not be called. This is likely an internal bug.");
}
-
- @Override
- public final void discardState() throws Exception {
- throw new UnsupportedOperationException("This must not be called. This is likely an internal bug.");
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/020da2ce/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 9531974..d7204a9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -642,7 +642,24 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
for (int i = 0; i < states.length; i++) {
StreamOperator<?> operator = allOperators[i];
if (operator != null) {
- StreamTaskState state = operator.snapshotOperatorState(checkpointId, timestamp);
+ StreamTaskState state;
+ try {
+ state = operator.snapshotOperatorState(checkpointId, timestamp);
+ } catch (Exception exception) {
+ for (int j = 0; j < i; j++) {
+ if (states[j] != null) {
+ try {
+ states[j].discardState();
+ } catch (Exception discardException) {
+ LOG.warn("Could not discard " + j + "th operator state.", discardException);
+ }
+ }
+ }
+
+ throw new Exception("Could not perform the checkpoint for " + i +
+ "th operator in chain.", exception);
+ }
+
if (state.getOperatorState() instanceof AsynchronousStateHandle) {
hasAsyncStates = true;
}
@@ -876,7 +893,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
// ------------------------------------------------------------------------
- private static class AsyncCheckpointThread extends Thread implements Closeable {
+ static class AsyncCheckpointThread extends Thread implements Closeable {
private final StreamTask<?, ?> owner;
@@ -900,29 +917,43 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
@Override
public void run() {
try {
- for (StreamTaskState state : states) {
+ for (int i = 0; i < states.length; i++) {
+ StreamTaskState state = states[i];
+
if (state != null) {
- if (state.getFunctionState() instanceof AsynchronousStateHandle) {
- AsynchronousStateHandle<Serializable> asyncState = (AsynchronousStateHandle<Serializable>) state.getFunctionState();
- state.setFunctionState(asyncState.materialize());
- }
- if (state.getOperatorState() instanceof AsynchronousStateHandle) {
- AsynchronousStateHandle<?> asyncState = (AsynchronousStateHandle<?>) state.getOperatorState();
- state.setOperatorState(asyncState.materialize());
- }
- if (state.getKvStates() != null) {
- Set<String> keys = state.getKvStates().keySet();
- HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> kvStates = state.getKvStates();
- for (String key: keys) {
- if (kvStates.get(key) instanceof AsynchronousKvStateSnapshot) {
- AsynchronousKvStateSnapshot<?, ?, ?, ?, ?> asyncHandle = (AsynchronousKvStateSnapshot<?, ?, ?, ?, ?>) kvStates.get(key);
- kvStates.put(key, asyncHandle.materialize());
+ try {
+ if (state.getFunctionState() instanceof AsynchronousStateHandle) {
+ AsynchronousStateHandle<Serializable> asyncState = (AsynchronousStateHandle<Serializable>) state.getFunctionState();
+ state.setFunctionState(asyncState.materialize());
+ }
+ if (state.getOperatorState() instanceof AsynchronousStateHandle) {
+ AsynchronousStateHandle<?> asyncState = (AsynchronousStateHandle<?>) state.getOperatorState();
+ state.setOperatorState(asyncState.materialize());
+ }
+ if (state.getKvStates() != null) {
+ Set<String> keys = state.getKvStates().keySet();
+ HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> kvStates = state.getKvStates();
+ for (String key : keys) {
+ if (kvStates.get(key) instanceof AsynchronousKvStateSnapshot) {
+ AsynchronousKvStateSnapshot<?, ?, ?, ?, ?> asyncHandle = (AsynchronousKvStateSnapshot<?, ?, ?, ?, ?>) kvStates.get(key);
+ kvStates.put(key, asyncHandle.materialize());
+ }
+ }
+ }
+ } catch (Exception exception) {
+ for (int j = 0; j < states.length; j++) {
+ try {
+ states[j].discardState();
+ } catch (Exception discardException) {
+ LOG.warn("Could not discard the " + j + "th operator state.", discardException);
}
}
- }
+ throw new Exception("Could not materialize the " + i + "th operator state.", exception);
+ }
}
}
+
StreamTaskStateList allStates = new StreamTaskStateList(states);
owner.lastCheckpointSize = allStates.getStateSize();
owner.getEnvironment().acknowledgeCheckpoint(checkpointId, allStates);
http://git-wip-us.apache.org/repos/asf/flink/blob/020da2ce/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
index 925dd8c..2ffb489 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
@@ -102,12 +102,24 @@ public class StreamTaskState implements Serializable, Closeable {
this.operatorState = null;
this.functionState = null;
this.kvStates = null;
+
+ Exception discardException = null;
if (operatorState != null) {
- operatorState.discardState();
+ try {
+ operatorState.discardState();
+ } catch (Exception exception) {
+ discardException = new Exception("Could not discard operator state.", exception);
+ }
}
if (functionState != null) {
- functionState.discardState();
+ try {
+ functionState.discardState();
+ } catch (Exception exception) {
+ Exception newException = new Exception("Could not discard function state.", exception);
+
+ ExceptionUtils.firstOrSuppressed(newException, discardException);
+ }
}
if (kvStates != null) {
while (kvStates.size() > 0) {
@@ -115,7 +127,13 @@ public class StreamTaskState implements Serializable, Closeable {
Iterator<KvStateSnapshot<?, ?, ?, ?, ?>> values = kvStates.values().iterator();
while (values.hasNext()) {
KvStateSnapshot<?, ?, ?, ?, ?> s = values.next();
- s.discardState();
+ try {
+ s.discardState();
+ } catch (Exception exception) {
+ Exception newException = new Exception("Could not discard key value state.", exception);
+
+ ExceptionUtils.firstOrSuppressed(newException, discardException);
+ }
values.remove();
}
}
@@ -124,6 +142,10 @@ public class StreamTaskState implements Serializable, Closeable {
}
}
}
+
+ if(discardException != null) {
+ throw discardException;
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/020da2ce/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
index cfaeaad..c7c957f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
@@ -194,6 +194,11 @@ public class StreamTaskAsyncCheckpointTest {
}
@Override
+ public void discardState() throws Exception {
+ // noop
+ }
+
+ @Override
public long getStateSize() {
return 0;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/020da2ce/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index e8315c7..f2dad46 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -43,8 +43,10 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.AsynchronousStateHandle;
import org.apache.flink.runtime.state.StateBackendFactory;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
@@ -53,6 +55,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.ExceptionUtils;
@@ -60,6 +63,7 @@ import org.apache.flink.util.SerializedValue;
import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
@@ -69,12 +73,17 @@ import java.io.IOException;
import java.io.ObjectInputStream;
import java.net.URL;
import java.util.Collections;
+import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.junit.Assert.assertEquals;
@@ -191,6 +200,73 @@ public class StreamTaskTest {
assertEquals(ExecutionState.CANCELED, task.getExecutionState());
}
+ /**
+ * Tests that all created StreamTaskStates are properly cleaned up when a snapshotting method
+ * of an operator fails.
+ */
+ @Test
+ public void testStateCleanupWhenFailingCheckpoint() throws Exception {
+ final long checkpointId = 1L;
+ final long timestamp = 42L;
+
+ StreamTask<Integer, StreamOperator<Integer>> streamTask = new TestingStreamTask();
+ streamTask.setEnvironment(new DummyEnvironment("test task", 1, 0));
+
+ OperatorChain<Integer> operatorChain = mock(OperatorChain.class);
+
+ StreamOperator<Integer> firstOperator = mock(StreamOperator.class);
+ StreamTaskState firstStreamTaskState = mock(StreamTaskState.class);
+ StreamOperator<Integer> secondOperator = mock(StreamOperator.class);
+
+ doReturn(firstStreamTaskState).when(firstOperator).snapshotOperatorState(anyLong(), anyLong());
+ doThrow(new Exception("Test Exception")).when(secondOperator).snapshotOperatorState(anyLong(), anyLong());
+
+ doReturn(new StreamOperator<?>[]{firstOperator, secondOperator}).when(operatorChain).getAllOperators();
+
+ Whitebox.setInternalState(streamTask, "operatorChain", operatorChain);
+ Whitebox.setInternalState(streamTask, "isRunning", true);
+
+ try {
+ streamTask.triggerCheckpoint(checkpointId, timestamp);
+ fail("Expected exception here.");
+ } catch (Exception expected) {
+ // expected failing trigger checkpoint here
+ }
+
+ verify(firstStreamTaskState).discardState();
+ }
+
+ /**
+ * Tests that the AsyncCheckpointThread discards the given StreamTaskStates in case a failure
+ * occurs while materializing the asynchronous state handles.
+ */
+ @Test
+ public void testAsyncCheckpointThreadStateCleanup() throws Exception {
+ final long checkpointId = 1L;
+ StreamTaskState firstState = mock(StreamTaskState.class);
+ StreamTaskState secondState = mock(StreamTaskState.class);
+ AsynchronousStateHandle<Integer> functionStateHandle = mock(AsynchronousStateHandle.class);
+
+ doReturn(functionStateHandle).when(firstState).getFunctionState();
+ doThrow(new Exception("Test exception")).when(functionStateHandle).materialize();
+
+ StreamTask<Integer, StreamOperator<Integer>> owner = mock(StreamTask.class);
+ StreamTaskState[] states = {firstState, secondState};
+
+ StreamTask.AsyncCheckpointThread asyncCheckpointThread = new StreamTask.AsyncCheckpointThread(
+ "AsyncCheckpointThread",
+ owner,
+ new HashSet<Closeable>(),
+ states,
+ checkpointId);
+
+ asyncCheckpointThread.run();
+
+ for (StreamTaskState streamTaskState : states) {
+ verify(streamTaskState).discardState();
+ }
+ }
+
// ------------------------------------------------------------------------
// Test Utilities
// ------------------------------------------------------------------------
@@ -489,4 +565,33 @@ public class StreamTaskTest {
interrupt();
}
}
+
+ // ------------------------------------------------------------------------
+ // ------------------------------------------------------------------------
+
+ /**
+ * Testing class for StreamTask methods
+ */
+ private static class TestingStreamTask extends StreamTask<Integer, StreamOperator<Integer>> {
+
+ @Override
+ protected void init() throws Exception {
+
+ }
+
+ @Override
+ protected void run() throws Exception {
+
+ }
+
+ @Override
+ protected void cleanup() throws Exception {
+
+ }
+
+ @Override
+ protected void cancelTask() throws Exception {
+
+ }
+ }
}