You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/01 14:51:33 UTC
[1/2] flink git commit: [FLINK-5216] [checkpoints] 'Min Time Between
Checkpoints' references timestamp after checkpoint
Repository: flink
Updated Branches:
refs/heads/release-1.1 7b5d769ad -> 59f61bf6c
[FLINK-5216] [checkpoints] 'Min Time Between Checkpoints' references timestamp after checkpoint
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e475eb2d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e475eb2d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e475eb2d
Branch: refs/heads/release-1.1
Commit: e475eb2d9705b8948ce862f25adf91e25a4948b0
Parents: 7b5d769
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 30 20:31:07 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Dec 1 14:01:40 2016 +0100
----------------------------------------------------------------------
.../checkpoint/CheckpointCoordinator.java | 43 ++++--
.../checkpoint/CheckpointCoordinatorTest.java | 133 +++++++++----------
2 files changed, 97 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e475eb2d/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 24cc3cb..0d09922 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -117,7 +117,7 @@ public class CheckpointCoordinator {
/** The min time(in ms) to delay after a checkpoint could be triggered. Allows to
* enforce minimum processing time between checkpoint attempts */
- private final long minPauseBetweenCheckpoints;
+ private final long minPauseBetweenCheckpointsNanos;
/** The maximum number of checkpoints that may be in progress at the same time */
private final int maxConcurrentCheckpointAttempts;
@@ -133,7 +133,8 @@ public class CheckpointCoordinator {
private ScheduledTrigger currentPeriodicTrigger;
- private long lastTriggeredCheckpoint;
+ /** The timestamp (via {@link System#nanoTime()}) when the last checkpoint completed */
+ private long lastCheckpointCompletionNanos;
/** Flag whether a triggered checkpoint should immediately schedule the next checkpoint.
* Non-volatile, because only accessed in synchronized scope */
@@ -201,10 +202,16 @@ public class CheckpointCoordinator {
checkArgument(minPauseBetweenCheckpoints >= 0, "minPauseBetweenCheckpoints must be >= 0");
checkArgument(maxConcurrentCheckpointAttempts >= 1, "maxConcurrentCheckpointAttempts must be >= 1");
+ // max "in between duration" can be one year - this is to prevent numeric overflows
+ if (minPauseBetweenCheckpoints > 365L * 24 * 60 * 60 * 1_000) {
+ minPauseBetweenCheckpoints = 365L * 24 * 60 * 60 * 1_000;
+ LOG.warn("Reducing minimum pause between checkpoints to " + minPauseBetweenCheckpoints + " ms (1 year)");
+ }
+
this.job = checkNotNull(job);
this.baseInterval = baseInterval;
this.checkpointTimeout = checkpointTimeout;
- this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
+ this.minPauseBetweenCheckpointsNanos = minPauseBetweenCheckpoints * 1_000_000;
this.maxConcurrentCheckpointAttempts = maxConcurrentCheckpointAttempts;
this.tasksToTrigger = checkNotNull(tasksToTrigger);
this.tasksToWaitFor = checkNotNull(tasksToWaitFor);
@@ -417,13 +424,16 @@ public class CheckpointCoordinator {
return false;
}
- //make sure the minimum interval between checkpoints has passed
- if (lastTriggeredCheckpoint + minPauseBetweenCheckpoints > timestamp && baseInterval != Long.MAX_VALUE) {
+ // make sure the minimum interval between checkpoints has passed
+ final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;
+ final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000;
+
+ if (durationTillNextMillis > 0 && baseInterval != Long.MAX_VALUE) {
if (currentPeriodicTrigger != null) {
currentPeriodicTrigger.cancel();
}
currentPeriodicTrigger = new ScheduledTrigger();
- timer.scheduleAtFixedRate(currentPeriodicTrigger, minPauseBetweenCheckpoints, baseInterval);
+ timer.scheduleAtFixedRate(currentPeriodicTrigger, durationTillNextMillis, baseInterval);
return false;
}
}
@@ -458,8 +468,6 @@ public class CheckpointCoordinator {
}
// we will actually trigger this checkpoint!
-
- lastTriggeredCheckpoint = timestamp;
final long checkpointID;
if (nextCheckpointId < 0) {
try {
@@ -532,6 +540,19 @@ public class CheckpointCoordinator {
return false;
}
+ // make sure the minimum interval between checkpoints has passed
+ final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;
+ final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000;
+
+ if (durationTillNextMillis > 0 && baseInterval != Long.MAX_VALUE) {
+ if (currentPeriodicTrigger != null) {
+ currentPeriodicTrigger.cancel();
+ }
+ currentPeriodicTrigger = new ScheduledTrigger();
+ timer.scheduleAtFixedRate(currentPeriodicTrigger, durationTillNextMillis, baseInterval);
+ return false;
+ }
+
pendingCheckpoints.put(checkpointID, checkpoint);
timer.schedule(canceller, checkpointTimeout);
}
@@ -682,8 +703,10 @@ public class CheckpointCoordinator {
switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getState(), message.getStateSize(), null)) {
case SUCCESS:
- // TODO: Give KV-state to the acknowledgeTask method
+
if (checkpoint.isFullyAcknowledged()) {
+
+ lastCheckpointCompletionNanos = System.nanoTime();
completed = checkpoint.finalizeCheckpoint();
completedCheckpointStore.addCheckpoint(completed);
@@ -805,6 +828,8 @@ public class CheckpointCoordinator {
* <p>NOTE: The caller of this method must hold the lock when invoking the method!
*/
private void triggerQueuedRequests() {
+ assert Thread.holdsLock(lock);
+
if (triggerRequestQueued) {
triggerRequestQueued = false;
http://git-wip-us.apache.org/repos/asf/flink/blob/e475eb2d/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 35cce85..9159711 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -41,6 +41,8 @@ import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -50,6 +52,7 @@ import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
@@ -1193,102 +1196,92 @@ public class CheckpointCoordinatorTest {
* another is triggered.
*/
@Test
- public void testMinInterval() {
- try {
- final JobID jid = new JobID();
+ public void testMinTimeBetweenCheckpointsInterval() throws Exception {
+ final JobID jid = new JobID();
- // create some mock execution vertices and trigger some checkpoint
- final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
- ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+ // create some mock execution vertices and trigger some checkpoint
+ final ExecutionAttemptID attemptID = new ExecutionAttemptID();
+ ExecutionVertex vertex = mockExecutionVertex(attemptID);
- final AtomicInteger numCalls = new AtomicInteger();
+ final BlockingQueue<TriggerCheckpoint> calls = new LinkedBlockingQueue<>();
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) throws Throwable {
- if (invocation.getArguments()[0] instanceof TriggerCheckpoint) {
- numCalls.incrementAndGet();
- }
- return null;
- }
- }).when(vertex1).sendMessageToCurrentExecution(any(Serializable.class), any(ExecutionAttemptID.class));
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ calls.add((TriggerCheckpoint) invocation.getArguments()[0]);
+ return null;
+ }
+ }).when(vertex).sendMessageToCurrentExecution(isA(TriggerCheckpoint.class), eq(attemptID));
- CheckpointCoordinator coord = new CheckpointCoordinator(
- jid,
- 10, // periodic interval is 10 ms
- 200000, // timeout is very long (200 s)
- 500, // 500ms delay between checkpoints
- 10,
- 42,
- new ExecutionVertex[] { vertex1 },
- new ExecutionVertex[] { vertex1 },
- new ExecutionVertex[] { vertex1 },
- cl,
- new StandaloneCheckpointIDCounter(),
- new StandaloneCompletedCheckpointStore(2, cl),
- RecoveryMode.STANDALONE,
- new DisabledCheckpointStatsTracker(),
- TestExecutors.directExecutor());
+ final long delay = 50;
+ final CheckpointCoordinator coord = new CheckpointCoordinator(
+ jid,
+ 2, // periodic interval is 2 ms
+ 200_000, // timeout is very long (200 s)
+ delay, // 50ms delay between checkpoints
+ 1,
+ 42,
+ new ExecutionVertex[] { vertex },
+ new ExecutionVertex[] { vertex },
+ new ExecutionVertex[] { vertex },
+ cl,
+ new StandaloneCheckpointIDCounter(),
+ new StandaloneCompletedCheckpointStore(2, cl),
+ RecoveryMode.STANDALONE,
+ new DisabledCheckpointStatsTracker(),
+ TestExecutors.directExecutor());
+
+ try {
coord.startCheckpointScheduler();
- //wait until the first checkpoint was triggered
- for (int x=0; x<20; x++) {
- Thread.sleep(100);
- if (numCalls.get() > 0) {
- break;
- }
- }
+ // wait until the first checkpoint was triggered
+ TriggerCheckpoint call = calls.take();
+ assertEquals(1L, call.getCheckpointId());
+ assertEquals(jid, call.getJob());
+ assertEquals(attemptID, call.getTaskExecutionId());
- if (numCalls.get() == 0) {
- fail("No checkpoint was triggered within the first 2000 ms.");
- }
-
- long start = System.currentTimeMillis();
-
- for (int x = 0; x < 20; x++) {
- Thread.sleep(100);
- int triggeredCheckpoints = numCalls.get();
- long curT = System.currentTimeMillis();
-
- /**
- * Within a given time-frame T only T/500 checkpoints may be triggered due to the configured minimum
- * interval between checkpoints. This value however does not not take the first triggered checkpoint
- * into account (=> +1). Furthermore we have to account for the mis-alignment between checkpoints
- * being triggered and our time measurement (=> +1); for T=1200 a total of 3-4 checkpoints may have been
- * triggered depending on whether the end of the minimum interval for the first checkpoints ends before
- * or after T=200.
- */
- long maxAllowedCheckpoints = (curT - start) / 500 + 2;
- assertTrue(maxAllowedCheckpoints >= triggeredCheckpoints);
- }
+ // tell the coordinator that the checkpoint is done
+ final long ackTime = System.nanoTime();
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID, 1L));
- coord.stopCheckpointScheduler();
+ // wait until the next checkpoint is triggered
+ TriggerCheckpoint nextCall = calls.take();
+ final long nextCheckpointTime = System.nanoTime();
+ assertEquals(2L, nextCall.getCheckpointId());
+ assertEquals(jid, nextCall.getJob());
+ assertEquals(attemptID, nextCall.getTaskExecutionId());
+
+ final long delayMillis = (nextCheckpointTime - ackTime) / 1_000_000;
+
+ // we need to add one ms here to account for rounding errors
+ if (delayMillis + 1 < delay) {
+ fail("checkpoint came too early: delay was " + delayMillis + " but should have been at least " + delay);
+ }
+ }
+ finally {
+ coord.stopCheckpointScheduler();
coord.shutdown();
}
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
}
@Test
public void testMaxConcurrentAttempts1() {
- testMaxConcurrentAttemps(1);
+ testMaxConcurrentAttempts(1);
}
@Test
public void testMaxConcurrentAttempts2() {
- testMaxConcurrentAttemps(2);
+ testMaxConcurrentAttempts(2);
}
@Test
public void testMaxConcurrentAttempts5() {
- testMaxConcurrentAttemps(5);
+ testMaxConcurrentAttempts(5);
}
- private void testMaxConcurrentAttemps(int maxConcurrentAttempts) {
+ private void testMaxConcurrentAttempts(int maxConcurrentAttempts) {
try {
final JobID jid = new JobID();
[2/2] flink git commit: [FLINK-5218] [state backends] Eagerly close
pending FsCheckpointStateOutputStream on task cancellation
Posted by se...@apache.org.
[FLINK-5218] [state backends] Eagerly close pending FsCheckpointStateOutputStream on task cancellation
This fix contains two modifications:
1. State backends implement 'Closeable' and register themselves at the 'canceleables'
2. The FsStateBackend tracks all it unclosed FsCheckpointOutputStreams and closes them on 'close()'
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/59f61bf6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/59f61bf6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/59f61bf6
Branch: refs/heads/release-1.1
Commit: 59f61bf6cb8351cec9369e2de39c6eeffbda10ea
Parents: e475eb2
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 30 22:38:23 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Dec 1 15:09:43 2016 +0100
----------------------------------------------------------------------
.../streaming/state/RocksDBStateBackend.java | 86 ++++--
.../runtime/state/AbstractStateBackend.java | 58 +++-
.../state/filesystem/FsStateBackend.java | 159 ++++++++--
.../state/memory/MemoryStateBackend.java | 2 +-
.../FsCheckpointStateOutputStreamTest.java | 132 ++++++++-
.../state/FsStateBackendClosingTest.java | 65 +++++
.../streaming/runtime/tasks/StreamTask.java | 7 +
.../runtime/tasks/BlockingCheckpointsTest.java | 290 +++++++++++++++++++
.../streaming/runtime/StateBackendITCase.java | 2 +-
9 files changed, 731 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index deba9f9..0412a4a 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -312,50 +312,74 @@ public class RocksDBStateBackend extends AbstractStateBackend {
}
@Override
- public void dispose() {
- super.dispose();
- nonPartitionedStateBackend.dispose();
+ public void dispose() throws Exception {
+ Throwable exception = null;
- // we have to lock because we might have an asynchronous checkpoint going on
- synchronized (dbCleanupLock) {
- if (db != null) {
- if (this.dbOptions != null) {
- this.dbOptions.dispose();
- this.dbOptions = null;
- }
-
- for (Tuple2<ColumnFamilyHandle, StateDescriptor> column : kvStateInformation.values()) {
- column.f0.dispose();
- }
+ // make sure the individual states are disposed
+ try {
+ super.dispose();
+ }
+ catch (Throwable t) {
+ exception = t;
+ }
- db.dispose();
- db = null;
+ try {
+ nonPartitionedStateBackend.dispose();
+ }
+ catch (Throwable t) {
+ if (exception == null) {
+ exception = t;
+ } else {
+ exception.addSuppressed(t);
}
}
- }
-
- @Override
- public void close() throws Exception {
- nonPartitionedStateBackend.close();
// we have to lock because we might have an asynchronous checkpoint going on
- synchronized (dbCleanupLock) {
- if (db != null) {
- if (this.dbOptions != null) {
- this.dbOptions.dispose();
- this.dbOptions = null;
- }
+ // this must also happen in any case, regardless of earlier exceptions
+ try {
+ synchronized (dbCleanupLock) {
+ if (db != null) {
+ if (this.dbOptions != null) {
+ this.dbOptions.dispose();
+ this.dbOptions = null;
+ }
+
+ for (Tuple2<ColumnFamilyHandle, StateDescriptor> column : kvStateInformation.values()) {
+ column.f0.dispose();
+ }
- for (Tuple2<ColumnFamilyHandle, StateDescriptor> column : kvStateInformation.values()) {
- column.f0.dispose();
+ db.dispose();
+ db = null;
}
+ }
+ }
+ catch (Throwable t) {
+ if (exception == null) {
+ exception = t;
+ } else {
+ exception.addSuppressed(t);
+ }
+ }
- db.dispose();
- db = null;
+ if (exception != null) {
+ if (exception instanceof Exception) {
+ throw (Exception) exception;
+ } else if (exception instanceof Error) {
+ throw (Error) exception;
+ } else {
+ throw new Exception(exception.getMessage(), exception);
}
}
}
+ @Override
+ public void close() throws IOException {
+ // we only close all I/O streams here and do not yet dispose of the native resources
+ // otherwise this can lead to SEGFAULT problems
+ // native resources must only be released in the 'dispose()' method.
+ nonPartitionedStateBackend.close();
+ }
+
private File getDbPath(String stateName) {
return new File(new File(new File(getNextStoragePath(), jobId.toString()), operatorIdentifier), stateName);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index b86688b..ab9854c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -39,6 +39,7 @@ import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.execution.Environment;
+import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
@@ -51,8 +52,8 @@ import java.util.Map;
/**
* A state backend defines how state is stored and snapshotted during checkpoints.
*/
-public abstract class AbstractStateBackend implements java.io.Serializable {
-
+public abstract class AbstractStateBackend implements java.io.Serializable, Closeable {
+
private static final long serialVersionUID = 4620413814639220247L;
protected transient TypeSerializer<?> keySerializer;
@@ -102,23 +103,61 @@ public abstract class AbstractStateBackend implements java.io.Serializable {
public abstract void disposeAllStateForCurrentJob() throws Exception;
/**
- * Closes the state backend, releasing all internal resources, but does not delete any persistent
- * checkpoint data.
+ * Closes the state backend, dropping and aborting all I/O operations that are currently
+ * pending.
*
- * @throws Exception Exceptions can be forwarded and will be logged by the system
+ * @throws IOException Exceptions can be forwarded and will be logged by the system
*/
- public abstract void close() throws Exception;
+ public abstract void close() throws IOException;
+
+ /**
+ * Releases all resources held by this state backend.
+ *
+ * <p>This method must make sure that all resources are disposed, even if an exception happens
+ * on the way.
+ *
+ * @throws Exception This method should report exceptions that occur.
+ */
+ public void dispose() throws Exception {
+ Throwable exception = null;
+
+ // make sure things are closed
+ try {
+ close();
+ }
+ catch (Throwable t) {
+ exception = t;
+ }
- public void dispose() {
+ // now actually dispose things
lastName = null;
lastState = null;
if (keyValueStates != null) {
for (KvState<?, ?, ?, ?, ?> state : keyValueStates) {
- state.dispose();
+ try {
+ state.dispose();
+ }
+ catch (Throwable t) {
+ if (exception == null) {
+ exception = t;
+ } else {
+ exception.addSuppressed(t);
+ }
+ }
}
}
keyValueStates = null;
keyValueStatesByName = null;
+
+ if (exception != null) {
+ if (exception instanceof Exception) {
+ throw (Exception) exception;
+ } else if (exception instanceof Error) {
+ throw (Error) exception;
+ } else {
+ throw new Exception(exception.getMessage(), exception);
+ }
+ }
}
// ------------------------------------------------------------------------
@@ -444,6 +483,9 @@ public abstract class AbstractStateBackend implements java.io.Serializable {
* @throws IOException Thrown, if the stream cannot be closed.
*/
public StateHandle<DataInputView> closeAndGetHandle() throws IOException {
+ // we do not flush() here, because that forces the 'CheckpointStateOutputStream' to files,
+ // even when it could stay in a 'small chunk' memory handle.
+ // the 'DataOutputViewStreamWrapper' does not buffer data anyways
return new DataInputViewHandle(out.closeAndGetHandle());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 8a8a26d..446f3ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -33,9 +33,10 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.state.AbstractStateBackend;
-
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.util.ExceptionUtils;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,9 +45,13 @@ import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.UUID;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* The file state backend is a state backend that stores the state of streaming jobs in a file system.
*
@@ -86,6 +91,10 @@ public class FsStateBackend extends AbstractStateBackend {
/** Cached handle to the file system for file operations */
private transient FileSystem filesystem;
+ /** Set of currently open streams */
+ private transient HashSet<FsCheckpointStateOutputStream> openStreams;
+
+ private transient volatile boolean closed;
/**
* Creates a new state backend that stores its checkpoint data in the file system and location
@@ -236,9 +245,11 @@ public class FsStateBackend extends AbstractStateBackend {
// ------------------------------------------------------------------------
@Override
- public void initializeForJob(Environment env,
- String operatorIdentifier,
- TypeSerializer<?> keySerializer) throws Exception {
+ public void initializeForJob(
+ Environment env,
+ String operatorIdentifier,
+ TypeSerializer<?> keySerializer) throws Exception {
+
super.initializeForJob(env, operatorIdentifier, keySerializer);
Path dir = new Path(basePath, env.getJobID().toString());
@@ -249,6 +260,7 @@ public class FsStateBackend extends AbstractStateBackend {
filesystem.mkdirs(dir);
checkpointDirectory = dir;
+ openStreams = new HashSet<>();
}
@Override
@@ -267,7 +279,42 @@ public class FsStateBackend extends AbstractStateBackend {
}
@Override
- public void close() throws Exception {}
+ public void close() throws IOException {
+ closed = true;
+
+ // cache a copy on the heap for safety
+ final HashSet<FsCheckpointStateOutputStream> openStreams = this.openStreams;
+ if (openStreams != null) {
+
+ // we need to draw a copy of the set, since the closing concurrently modifies the set
+ final ArrayList<FsCheckpointStateOutputStream> streams;
+
+ //noinspection SynchronizationOnLocalVariableOrMethodParameter
+ synchronized (openStreams) {
+ streams = new ArrayList<>(openStreams);
+ openStreams.clear();
+ }
+
+ // close all the streams, collect exceptions and record all but the first as suppressed
+ Throwable exception = null;
+ for (FsCheckpointStateOutputStream stream : streams) {
+ try {
+ stream.close();
+ }
+ catch (Throwable t) {
+ if (exception == null) {
+ exception = t;
+ } else {
+ exception.addSuppressed(t);
+ }
+ }
+ }
+
+ if (exception != null) {
+ ExceptionUtils.rethrowIOException(exception);
+ }
+ }
+ }
// ------------------------------------------------------------------------
// state backend operations
@@ -299,15 +346,22 @@ public class FsStateBackend extends AbstractStateBackend {
S state, long checkpointID, long timestamp) throws Exception
{
checkFileSystemInitialized();
-
+
Path checkpointDir = createCheckpointDirPath(checkpointID);
int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, fileStateThreshold);
- FsCheckpointStateOutputStream stream =
- new FsCheckpointStateOutputStream(checkpointDir, filesystem, bufferSize, fileStateThreshold);
-
- try (ObjectOutputStream os = new ObjectOutputStream(stream)) {
+ try (FsCheckpointStateOutputStream stream = new FsCheckpointStateOutputStream(
+ checkpointDir, filesystem, openStreams, bufferSize, fileStateThreshold))
+ {
+ // perform the closing double-check AFTER! the creation of the stream
+ if (closed) {
+ throw new IOException("closed");
+ }
+
+ ObjectOutputStream os = new ObjectOutputStream(stream);
os.writeObject(state);
+ os.flush();
+
return stream.closeAndGetHandle().toSerializableHandle();
}
}
@@ -318,7 +372,16 @@ public class FsStateBackend extends AbstractStateBackend {
Path checkpointDir = createCheckpointDirPath(checkpointID);
int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, fileStateThreshold);
- return new FsCheckpointStateOutputStream(checkpointDir, filesystem, bufferSize, fileStateThreshold);
+ FsCheckpointStateOutputStream stream = new FsCheckpointStateOutputStream(
+ checkpointDir, filesystem, openStreams, bufferSize, fileStateThreshold);
+
+ // perform the closing double-check AFTER! the creation of the stream
+ if (closed) {
+ stream.close();
+ throw new IOException("closed");
+ }
+
+ return stream;
}
// ------------------------------------------------------------------------
@@ -426,29 +489,40 @@ public class FsStateBackend extends AbstractStateBackend {
private int pos;
private FSDataOutputStream outStream;
-
+
private final int localStateThreshold;
private final Path basePath;
private final FileSystem fs;
-
+
+ private final HashSet<FsCheckpointStateOutputStream> openStreams;
+
private Path statePath;
-
- private boolean closed;
+
+ private volatile boolean closed;
public FsCheckpointStateOutputStream(
- Path basePath, FileSystem fs,
- int bufferSize, int localStateThreshold)
+ Path basePath,
+ FileSystem fs,
+ HashSet<FsCheckpointStateOutputStream> openStreams,
+ int bufferSize,
+ int localStateThreshold)
{
if (bufferSize < localStateThreshold) {
throw new IllegalArgumentException();
}
-
+
this.basePath = basePath;
this.fs = fs;
+ this.openStreams = checkNotNull(openStreams);
this.writeBuffer = new byte[bufferSize];
this.localStateThreshold = localStateThreshold;
+
+ //noinspection SynchronizationOnLocalVariableOrMethodParameter
+ synchronized (openStreams) {
+ openStreams.add(this);
+ }
}
@@ -519,6 +593,9 @@ public class FsStateBackend extends AbstractStateBackend {
pos = 0;
}
}
+ else {
+ throw new IOException("stream is closed");
+ }
}
/**
@@ -530,6 +607,16 @@ public class FsStateBackend extends AbstractStateBackend {
public void close() {
if (!closed) {
closed = true;
+
+ // make sure that the write() methods cannot succeed any more
+ pos = writeBuffer.length;
+
+ // remove the stream from the open streams set
+ synchronized (openStreams) {
+ openStreams.remove(this);
+ }
+
+ // close all resources
if (outStream != null) {
try {
outStream.close();
@@ -551,15 +638,24 @@ public class FsStateBackend extends AbstractStateBackend {
public StreamStateHandle closeAndGetHandle() throws IOException {
synchronized (this) {
if (!closed) {
+
+ // remove the stream from the open streams set
+ synchronized (openStreams) {
+ openStreams.remove(this);
+ }
+
+ // close all resources
if (outStream == null && pos <= localStateThreshold) {
closed = true;
byte[] bytes = Arrays.copyOf(writeBuffer, pos);
+ pos = writeBuffer.length;
return new ByteStreamStateHandle(bytes);
}
else {
flush();
outStream.close();
closed = true;
+ pos = writeBuffer.length;
return new FileStreamStateHandle(statePath);
}
}
@@ -577,9 +673,17 @@ public class FsStateBackend extends AbstractStateBackend {
public Path closeAndGetPath() throws IOException {
synchronized (this) {
if (!closed) {
- closed = true;
+
+ // remove the stream from the open streams set
+ synchronized (openStreams) {
+ openStreams.remove(this);
+ }
+
+ // close all resources
flush();
outStream.close();
+ closed = true;
+ pos = writeBuffer.length;
return statePath;
}
else {
@@ -587,5 +691,22 @@ public class FsStateBackend extends AbstractStateBackend {
}
}
}
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ // we need referential identity on the streams for the closing set to work
+ // properly, so we implement that via final methods here
+
+ @Override
+ public final int hashCode() {
+ return super.hashCode();
+ }
+
+ @Override
+ public final boolean equals(Object obj) {
+ return this == obj;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index 7b9d21b..b155244 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -78,7 +78,7 @@ public class MemoryStateBackend extends AbstractStateBackend {
}
@Override
- public void close() throws Exception {}
+ public void close() {}
// ------------------------------------------------------------------------
// State backend operations
http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java
index 5964b72..3aba9e1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java
@@ -22,34 +22,36 @@ import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend.FsCheckpointStateOutputStream;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.junit.Test;
import java.io.File;
+import java.io.IOException;
import java.io.InputStream;
+import java.util.HashSet;
import java.util.Random;
import static org.junit.Assert.*;
public class FsCheckpointStateOutputStreamTest {
-
+
/** The temp dir, obtained in a platform neutral way */
private static final Path TEMP_DIR_PATH = new Path(new File(System.getProperty("java.io.tmpdir")).toURI());
-
-
+
+
@Test(expected = IllegalArgumentException.class)
public void testWrongParameters() {
// this should fail
new FsStateBackend.FsCheckpointStateOutputStream(
- TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 4000, 5000);
+ TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), new HashSet<FsCheckpointStateOutputStream>(), 4000, 5000);
}
-
@Test
public void testEmptyState() throws Exception {
AbstractStateBackend.CheckpointStateOutputStream stream = new FsStateBackend.FsCheckpointStateOutputStream(
- TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 1024, 512);
+ TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), new HashSet<FsCheckpointStateOutputStream>(), 1024, 512);
StreamStateHandle handle = stream.closeAndGetHandle();
assertTrue(handle instanceof ByteStreamStateHandle);
@@ -57,7 +59,101 @@ public class FsCheckpointStateOutputStreamTest {
InputStream inStream = handle.getState(ClassLoader.getSystemClassLoader());
assertEquals(-1, inStream.read());
}
-
+
+ @Test
+ public void testCloseAndGetPath() throws Exception {
+ FsCheckpointStateOutputStream stream = new FsCheckpointStateOutputStream(
+ TEMP_DIR_PATH,
+ FileSystem.getLocalFileSystem(),
+ new HashSet<FsCheckpointStateOutputStream>(),
+ 1024,
+ 512);
+
+ stream.write(1);
+
+ Path path = stream.closeAndGetPath();
+ assertNotNull(path);
+
+ // cleanup
+ FileSystem.getLocalFileSystem().delete(path, false);
+ }
+
+ @Test
+ public void testWriteFailsFastWhenClosed() throws Exception {
+ final HashSet<FsCheckpointStateOutputStream> openStreams = new HashSet<>();
+
+ FsCheckpointStateOutputStream stream1 = new FsCheckpointStateOutputStream(
+ TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
+
+ FsCheckpointStateOutputStream stream2 = new FsCheckpointStateOutputStream(
+ TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
+
+ FsCheckpointStateOutputStream stream3 = new FsCheckpointStateOutputStream(
+ TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
+
+ assertFalse(stream1.isClosed());
+ assertFalse(stream2.isClosed());
+ assertFalse(stream3.isClosed());
+
+ // simple close
+ stream1.close();
+
+ // close with handle
+ StreamStateHandle handle = stream2.closeAndGetHandle();
+
+ // close with path
+ Path path = stream3.closeAndGetPath();
+
+ assertTrue(stream1.isClosed());
+ assertTrue(stream2.isClosed());
+ assertTrue(stream3.isClosed());
+
+ validateStreamNotWritable(stream1);
+ validateStreamNotWritable(stream2);
+ validateStreamNotWritable(stream3);
+
+ // clean up
+ handle.discardState();
+ FileSystem.getLocalFileSystem().delete(path, false);
+ }
+
+ @Test
+ public void testAddAndRemoveFromOpenStreamsSet() throws Exception {
+ final HashSet<FsCheckpointStateOutputStream> openStreams = new HashSet<>();
+
+ FsCheckpointStateOutputStream stream1 = new FsCheckpointStateOutputStream(
+ TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
+
+ FsCheckpointStateOutputStream stream2 = new FsCheckpointStateOutputStream(
+ TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
+
+ FsCheckpointStateOutputStream stream3 = new FsCheckpointStateOutputStream(
+ TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
+
+ assertTrue(openStreams.contains(stream1));
+ assertTrue(openStreams.contains(stream2));
+ assertTrue(openStreams.contains(stream3));
+ assertEquals(3, openStreams.size());
+
+ // simple close
+ stream1.close();
+
+ // close with handle
+ StreamStateHandle handle = stream2.closeAndGetHandle();
+
+ // close with path
+ Path path = stream3.closeAndGetPath();
+
+ assertFalse(openStreams.contains(stream1));
+ assertFalse(openStreams.contains(stream2));
+ assertFalse(openStreams.contains(stream3));
+ assertEquals(0, openStreams.size());
+
+ // clean up
+ handle.discardState();
+ FileSystem.getLocalFileSystem().delete(path, false);
+ }
+
@Test
public void testStateBlowMemThreshold() throws Exception {
runTest(222, 999, 512, false);
@@ -72,16 +168,17 @@ public class FsCheckpointStateOutputStreamTest {
public void testStateAboveMemThreshold() throws Exception {
runTest(576446, 259, 17, true);
}
-
+
@Test
public void testZeroThreshold() throws Exception {
runTest(16678, 4096, 0, true);
}
-
+
private void runTest(int numBytes, int bufferSize, int threshold, boolean expectFile) throws Exception {
AbstractStateBackend.CheckpointStateOutputStream stream =
new FsStateBackend.FsCheckpointStateOutputStream(
- TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), bufferSize, threshold);
+ TEMP_DIR_PATH, FileSystem.getLocalFileSystem(),
+ new HashSet<FsCheckpointStateOutputStream>(), bufferSize, threshold);
Random rnd = new Random();
byte[] original = new byte[numBytes];
@@ -125,4 +222,19 @@ public class FsCheckpointStateOutputStreamTest {
handle.discardState();
}
+
+ private void validateStreamNotWritable(FsCheckpointStateOutputStream stream) {
+ try {
+ stream.write(1);
+ fail();
+ } catch (IOException e) {
+ // expected
+ }
+ try {
+ stream.write(new byte[4], 1, 2);
+ fail();
+ } catch (IOException e) {
+ // expected
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsStateBackendClosingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsStateBackendClosingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsStateBackendClosingTest.java
new file mode 100644
index 0000000..6df488d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsStateBackendClosingTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend.FsCheckpointStateOutputStream;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class FsStateBackendClosingTest {
+
+ @Test
+ public void testStateBackendClosesStreams() throws Exception {
+ final URI tempFolder = new File(EnvironmentInformation.getTemporaryFileDirectory()).toURI();
+ final FsStateBackend backend = new FsStateBackend(tempFolder);
+
+ backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
+
+ FsCheckpointStateOutputStream stream = backend.createCheckpointStateOutputStream(17L, 12345L);
+
+ // stream is open, this should succeed
+ assertFalse(stream.isClosed());
+ stream.write(1);
+
+ // close the backend - that should close the stream
+ backend.close();
+
+ assertTrue(stream.isClosed());
+
+ try {
+ stream.write(2);
+ fail("stream is closed, 'write(int)' should fail with an exception");
+ }
+ catch (IOException e) {
+ // expected
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index aaaead0..99df060 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -772,7 +772,14 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
}
}
}
+
stateBackend.initializeForJob(getEnvironment(), operatorIdentifier, keySerializer);
+
+ // make sure the state backend is closed eagerly in case of cancellation
+ synchronized (cancelables) {
+ cancelables.add(stateBackend);
+ }
+
return stateBackend;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
new file mode 100644
index 0000000..ed993c7
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.librarycache.FallbackLibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.StreamFilter;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Test;
+
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URL;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+
+/**
+ * This test checks that task checkpoints that block and do not react to thread interrupts
+ * are
+ */
+public class BlockingCheckpointsTest {
+
+ private static final OneShotLatch IN_CHECKPOINT_LATCH = new OneShotLatch();
+
+ @Test
+ public void testBlockingNonInterruptibleCheckpoint() throws Exception {
+
+ Configuration taskConfig = new Configuration();
+ StreamConfig cfg = new StreamConfig(taskConfig);
+ cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+ cfg.setStreamOperator(new TestOperator());
+ cfg.setStateBackend(new LockingStreamStateBackend());
+
+ Task task = createTask(taskConfig);
+
+ // start the task and wait until it is in "restore"
+ task.startTaskThread();
+ IN_CHECKPOINT_LATCH.await();
+
+ // cancel the task and wait. unless cancellation properly closes
+ // the streams, this will never terminate
+ task.cancelExecution();
+ task.getExecutingThread().join();
+
+ assertEquals(ExecutionState.CANCELED, task.getExecutionState());
+ assertNull(task.getFailureCause());
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ private static Task createTask(Configuration taskConfig) throws IOException {
+
+ JobInformation jobInformation = new JobInformation(
+ new JobID(),
+ "test job name",
+ new SerializedValue<>(new ExecutionConfig()),
+ new Configuration(),
+ Collections.<BlobKey>emptyList(),
+ Collections.<URL>emptyList());
+
+ TaskInformation taskInformation = new TaskInformation(
+ new JobVertexID(),
+ "test task name",
+ 1,
+ TestStreamTask.class.getName(),
+ taskConfig);
+
+ return new Task(
+ jobInformation,
+ taskInformation,
+ new ExecutionAttemptID(),
+ 0,
+ 0,
+ Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+ Collections.<InputGateDeploymentDescriptor>emptyList(),
+ 0,
+ null,
+ mock(MemoryManager.class),
+ mock(IOManager.class),
+ mock(NetworkEnvironment.class),
+ mock(BroadcastVariableManager.class),
+ mock(ActorGateway.class),
+ mock(ActorGateway.class),
+ new FiniteDuration(10, TimeUnit.SECONDS),
+ new FallbackLibraryCacheManager(),
+ new FileCache(new Configuration()),
+ new TaskManagerRuntimeInfo(
+ "localhost", new Configuration(), EnvironmentInformation.getTemporaryFileDirectory()),
+ new UnregisteredTaskMetricsGroup());
+
+ }
+
+ // ------------------------------------------------------------------------
+ // state backend with locking output stream
+ // ------------------------------------------------------------------------
+
+ private static class LockingStreamStateBackend extends AbstractStateBackend {
+
+ private static final long serialVersionUID = 1L;
+
+ private final LockingOutputStream out = new LockingOutputStream();
+
+ @Override
+ public void disposeAllStateForCurrentJob() {}
+
+ @Override
+ public void close() throws IOException {
+ out.close();
+ }
+
+ @Override
+ public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
+ return out;
+ }
+
+ @Override
+ protected <N, T> ValueState<T> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<T> stateDesc) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer, ListStateDescriptor<T> stateDesc) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer, ReducingStateDescriptor<T> stateDesc) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer, FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <S extends Serializable> StateHandle<S> checkpointStateSerializable(S state, long checkpointID, long timestamp) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private static final class LockingOutputStream extends CheckpointStateOutputStream implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final SerializableObject lock = new SerializableObject();
+ private volatile boolean closed;
+
+ @Override
+ public StreamStateHandle closeAndGetHandle() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ // this needs to not react to interrupts until the handle is closed
+ synchronized (lock) {
+ while (!closed) {
+ try {
+ lock.wait();
+ }
+ catch (InterruptedException ignored) {}
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ synchronized (lock) {
+ closed = true;
+ lock.notifyAll();
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // test source operator that calls into the locking checkpoint output stream
+ // ------------------------------------------------------------------------
+
+ @SuppressWarnings("serial")
+ private static final class TestOperator extends StreamFilter<Object> {
+ private static final long serialVersionUID = 1L;
+
+ public TestOperator() {
+ super(new FilterFunction<Object>() {
+ @Override
+ public boolean filter(Object value) {
+ return false;
+ }
+ });
+ }
+
+ @Override
+ public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
+ AbstractStateBackend stateBackend = getStateBackend();
+ CheckpointStateOutputStream outStream = stateBackend.createCheckpointStateOutputStream(checkpointId, timestamp);
+
+ IN_CHECKPOINT_LATCH.trigger();
+
+ // this should lock
+ outStream.write(1);
+
+ // this should be unreachable
+ return null;
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // stream task that simply triggers a checkpoint
+ // ------------------------------------------------------------------------
+
+ public static final class TestStreamTask extends OneInputStreamTask<Object, Object> {
+
+ @Override
+ public void init() {}
+
+ @Override
+ protected void run() throws Exception {
+ triggerCheckpointOnBarrier(11L, System.currentTimeMillis());
+ }
+
+ @Override
+ protected void cleanup() {}
+
+ @Override
+ protected void cancelTask() {}
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
index 6288946..4eb8b4a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
@@ -104,7 +104,7 @@ public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
public void disposeAllStateForCurrentJob() throws Exception {}
@Override
- public void close() throws Exception {}
+ public void close() {}
@Override
protected <N, T> ValueState<T> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<T> stateDesc) throws Exception {