You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/23 19:10:31 UTC
[3/7] flink git commit: [FLINK-5763] [checkpoints] Add
CheckpointOptions
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
index 46f228a..e407443 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -155,7 +156,9 @@ public class BarrierBufferAlignmentLimitTest {
check(sequence[21], buffer.getNextNonBlocked());
// no call for a completed checkpoint must have happened
- verify(toNotify, times(0)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class),
+ verify(toNotify, times(0)).triggerCheckpointOnBarrier(
+ any(CheckpointMetaData.class),
+ any(CheckpointOptions.class),
any(CheckpointMetrics.class));
assertNull(buffer.getNextNonBlocked());
@@ -242,7 +245,8 @@ public class BarrierBufferAlignmentLimitTest {
// checkpoint 4 completed - check and validate buffered replay
check(sequence[9], buffer.getNextNonBlocked());
validateAlignmentTime(startTs, buffer);
- verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(4L)), any(CheckpointMetrics.class));
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(
+ argThat(new CheckpointMatcher(4L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
check(sequence[10], buffer.getNextNonBlocked());
check(sequence[15], buffer.getNextNonBlocked());
@@ -254,7 +258,8 @@ public class BarrierBufferAlignmentLimitTest {
check(sequence[21], buffer.getNextNonBlocked());
// only checkpoint 4 was successfully completed, not checkpoint 3
- verify(toNotify, times(0)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)), any(CheckpointMetrics.class));
+ verify(toNotify, times(0)).triggerCheckpointOnBarrier(
+ argThat(new CheckpointMatcher(3L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
assertNull(buffer.getNextNonBlocked());
assertNull(buffer.getNextNonBlocked());
@@ -284,7 +289,7 @@ public class BarrierBufferAlignmentLimitTest {
}
private static BufferOrEvent createBarrier(long id, int channel) {
- return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis()), channel);
+ return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis(), CheckpointOptions.forFullCheckpoint()), channel);
}
private static void check(BufferOrEvent expected, BufferOrEvent present) {
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index 0cf866a..6e088f6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.runtime.io;
import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
@@ -151,7 +152,7 @@ public class BarrierBufferMassiveRandomTest {
if (barrierGens[currentChannel].isNextBarrier()) {
return new BufferOrEvent(
- new CheckpointBarrier(++currentBarriers[currentChannel], System.currentTimeMillis()),
+ new CheckpointBarrier(++currentBarriers[currentChannel], System.currentTimeMillis(), CheckpointOptions.forFullCheckpoint()),
currentChannel);
} else {
Buffer buffer = bufferPools[currentChannel].requestBuffer();
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index 869d1fe..d6056d2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -566,7 +567,7 @@ public class BarrierBufferTest {
// checkpoint done - replay buffered
check(sequence[5], buffer.getNextNonBlocked());
validateAlignmentTime(startTs, buffer);
- verify(toNotify).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointMetrics.class));
+ verify(toNotify).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
check(sequence[6], buffer.getNextNonBlocked());
check(sequence[9], buffer.getNextNonBlocked());
@@ -1008,14 +1009,14 @@ public class BarrierBufferTest {
check(sequence[0], buffer.getNextNonBlocked());
check(sequence[2], buffer.getNextNonBlocked());
- verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointMetrics.class));
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
assertEquals(0L, buffer.getAlignmentDurationNanos());
check(sequence[6], buffer.getNextNonBlocked());
assertEquals(5L, buffer.getCurrentCheckpointId());
- verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)), any(CheckpointMetrics.class));
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L), any(CheckpointDeclineOnCancellationBarrierException.class));
- verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointMetrics.class));
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
assertEquals(0L, buffer.getAlignmentDurationNanos());
check(sequence[8], buffer.getNextNonBlocked());
@@ -1078,7 +1079,7 @@ public class BarrierBufferTest {
check(sequence[2], buffer.getNextNonBlocked());
startTs = System.nanoTime();
check(sequence[5], buffer.getNextNonBlocked());
- verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointMetrics.class));
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
validateAlignmentTime(startTs, buffer);
check(sequence[6], buffer.getNextNonBlocked());
@@ -1097,7 +1098,7 @@ public class BarrierBufferTest {
check(sequence[16], buffer.getNextNonBlocked());
startTs = System.nanoTime();
check(sequence[20], buffer.getNextNonBlocked());
- verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)), any(CheckpointMetrics.class));
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
validateAlignmentTime(startTs, buffer);
check(sequence[21], buffer.getNextNonBlocked());
@@ -1114,7 +1115,7 @@ public class BarrierBufferTest {
// a simple successful checkpoint
startTs = System.nanoTime();
check(sequence[32], buffer.getNextNonBlocked());
- verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointMetrics.class));
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
validateAlignmentTime(startTs, buffer);
check(sequence[33], buffer.getNextNonBlocked());
@@ -1175,7 +1176,7 @@ public class BarrierBufferTest {
// finished first checkpoint
check(sequence[3], buffer.getNextNonBlocked());
- verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointMetrics.class));
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
validateAlignmentTime(startTs, buffer);
check(sequence[5], buffer.getNextNonBlocked());
@@ -1198,7 +1199,7 @@ public class BarrierBufferTest {
assertEquals(0L, buffer.getAlignmentDurationNanos());
// no further checkpoint (abort) notifications
- verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointMetrics.class));
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointOptions.class), any(CheckpointMetrics.class));
verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(CheckpointDeclineOnCancellationBarrierException.class));
// all done
@@ -1280,7 +1281,7 @@ public class BarrierBufferTest {
// checkpoint done
check(sequence[7], buffer.getNextNonBlocked());
validateAlignmentTime(startTs, buffer);
- verify(toNotify).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)), any(CheckpointMetrics.class));
+ verify(toNotify).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
// queued data
check(sequence[10], buffer.getNextNonBlocked());
@@ -1299,7 +1300,7 @@ public class BarrierBufferTest {
checkNoTempFilesRemain();
// check overall notifications
- verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointMetrics.class));
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointOptions.class), any(CheckpointMetrics.class));
verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(Throwable.class));
}
@@ -1364,7 +1365,7 @@ public class BarrierBufferTest {
// checkpoint finished
check(sequence[7], buffer.getNextNonBlocked());
validateAlignmentTime(startTs, buffer);
- verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointMetrics.class));
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
check(sequence[11], buffer.getNextNonBlocked());
// remaining data
@@ -1380,7 +1381,7 @@ public class BarrierBufferTest {
checkNoTempFilesRemain();
// check overall notifications
- verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointMetrics.class));
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointOptions.class), any(CheckpointMetrics.class));
verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(Throwable.class));
}
@@ -1389,7 +1390,7 @@ public class BarrierBufferTest {
// ------------------------------------------------------------------------
private static BufferOrEvent createBarrier(long checkpointId, int channel) {
- return new BufferOrEvent(new CheckpointBarrier(checkpointId, System.currentTimeMillis()), channel);
+ return new BufferOrEvent(new CheckpointBarrier(checkpointId, System.currentTimeMillis(), CheckpointOptions.forFullCheckpoint()), channel);
}
private static BufferOrEvent createCancellationBarrier(long checkpointId, int channel) {
@@ -1487,12 +1488,12 @@ public class BarrierBufferTest {
}
@Override
- public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception {
+ public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
throw new UnsupportedOperationException("should never be called");
}
@Override
- public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
+ public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
assertTrue("wrong checkpoint id",
nextExpectedCheckpointId == -1L ||
nextExpectedCheckpointId == checkpointMetaData.getCheckpointId());
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index da322f6..05f7da6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -470,7 +471,7 @@ public class BarrierTrackerTest {
// ------------------------------------------------------------------------
private static BufferOrEvent createBarrier(long id, int channel) {
- return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis()), channel);
+ return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis(), CheckpointOptions.forFullCheckpoint()), channel);
}
private static BufferOrEvent createCancellationBarrier(long id, int channel) {
@@ -502,12 +503,12 @@ public class BarrierTrackerTest {
}
@Override
- public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception {
+ public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
throw new UnsupportedOperationException("should never be called");
}
@Override
- public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
+ public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
assertTrue("More checkpoints than expected", i < checkpointIDs.length);
final long expectedId = checkpointIDs[i++];
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
index 5c0f0cf..51294ce 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -174,6 +175,11 @@ public class BlockingCheckpointsTest {
}
@Override
+ public CheckpointStreamFactory createSavepointStreamFactory(JobID jobId, String operatorIdentifier, String targetLocation) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env, JobID jobID, String operatorIdentifier,
TypeSerializer<K> keySerializer, int numberOfKeyGroups,
@@ -276,7 +282,7 @@ public class BlockingCheckpointsTest {
@Override
protected void run() throws Exception {
- triggerCheckpointOnBarrier(new CheckpointMetaData(11L, System.currentTimeMillis()), new CheckpointMetrics());
+ triggerCheckpointOnBarrier(new CheckpointMetaData(11L, System.currentTimeMillis()), CheckpointOptions.forFullCheckpoint(), new CheckpointMetrics());
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 69c2c88..e22bf86 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -408,7 +409,7 @@ public class OneInputStreamTaskTest extends TestLogger {
testHarness.invoke();
testHarness.waitForTaskRunning();
- testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 0);
+ testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 0);
// These elements should be buffered until we receive barriers from
// all inputs
@@ -427,14 +428,14 @@ public class OneInputStreamTaskTest extends TestLogger {
// we should not yet see the barrier, only the two elements from non-blocked input
TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
- testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1);
- testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0);
- testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 1);
+ testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 1);
+ testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 0);
+ testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 1);
testHarness.waitForInputProcessing();
// now we should see the barrier and after that the buffered elements
- expectedOutput.add(new CheckpointBarrier(0, 0));
+ expectedOutput.add(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()));
expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
@@ -467,7 +468,7 @@ public class OneInputStreamTaskTest extends TestLogger {
testHarness.invoke();
testHarness.waitForTaskRunning();
- testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 0);
+ testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 0);
// These elements should be buffered until we receive barriers from
// all inputs
@@ -488,15 +489,15 @@ public class OneInputStreamTaskTest extends TestLogger {
// Now give a later barrier to all inputs, this should unblock the first channel,
// thereby allowing the two blocked elements through
- testHarness.processEvent(new CheckpointBarrier(1, 1), 0, 0);
- testHarness.processEvent(new CheckpointBarrier(1, 1), 0, 1);
- testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 0);
- testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 1);
+ testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 0, 0);
+ testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 0, 1);
+ testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 1, 0);
+ testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 1, 1);
expectedOutput.add(new CancelCheckpointMarker(0));
expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
- expectedOutput.add(new CheckpointBarrier(1, 1));
+ expectedOutput.add(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()));
testHarness.waitForInputProcessing();
@@ -504,9 +505,9 @@ public class OneInputStreamTaskTest extends TestLogger {
// Then give the earlier barrier, these should be ignored
- testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1);
- testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0);
- testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 1);
+ testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 1);
+ testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 0);
+ testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 1);
testHarness.waitForInputProcessing();
@@ -557,7 +558,7 @@ public class OneInputStreamTaskTest extends TestLogger {
CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, checkpointTimestamp);
- while(!streamTask.triggerCheckpoint(checkpointMetaData));
+ while(!streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint()));
// since no state was set, there shouldn't be restore calls
assertEquals(0, TestingStreamOperator.numberRestoreCalls);
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index 0773699..1a6fa8f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -231,7 +232,7 @@ public class SourceStreamTaskTest {
for (int i = 0; i < numCheckpoints; i++) {
long currentCheckpointId = checkpointId.getAndIncrement();
CheckpointMetaData checkpointMetaData = new CheckpointMetaData(currentCheckpointId, 0L);
- sourceTask.triggerCheckpoint(checkpointMetaData);
+ sourceTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint());
Thread.sleep(checkpointInterval);
}
return true;
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
index c2d4aaa..53f77ca 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
@@ -56,7 +57,8 @@ public class StreamTaskCancellationBarrierTest {
testHarness.invoke();
// tell the task to commence a checkpoint
- boolean result = task.triggerCheckpoint(new CheckpointMetaData(41L, System.currentTimeMillis()));
+ boolean result = task.triggerCheckpoint(new CheckpointMetaData(41L, System.currentTimeMillis()),
+ CheckpointOptions.forFullCheckpoint());
assertFalse("task triggered checkpoint though not ready", result);
// a cancellation barrier should be downstream
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 1e74c3e..3d01fdd 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -305,18 +306,18 @@ public class StreamTaskTest extends TestLogger {
final Exception testException = new Exception("Test exception");
- when(streamOperator1.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult1);
- when(streamOperator2.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult2);
- when(streamOperator3.snapshotState(anyLong(), anyLong())).thenThrow(testException);
+ when(streamOperator1.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult1);
+ when(streamOperator2.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult2);
+ when(streamOperator3.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenThrow(testException);
// mock the returned legacy snapshots
StreamStateHandle streamStateHandle1 = mock(StreamStateHandle.class);
StreamStateHandle streamStateHandle2 = mock(StreamStateHandle.class);
StreamStateHandle streamStateHandle3 = mock(StreamStateHandle.class);
- when(streamOperator1.snapshotLegacyOperatorState(anyLong(), anyLong())).thenReturn(streamStateHandle1);
- when(streamOperator2.snapshotLegacyOperatorState(anyLong(), anyLong())).thenReturn(streamStateHandle2);
- when(streamOperator3.snapshotLegacyOperatorState(anyLong(), anyLong())).thenReturn(streamStateHandle3);
+ when(streamOperator1.snapshotLegacyOperatorState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle1);
+ when(streamOperator2.snapshotLegacyOperatorState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle2);
+ when(streamOperator3.snapshotLegacyOperatorState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle3);
// set up the task
@@ -332,7 +333,7 @@ public class StreamTaskTest extends TestLogger {
Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
try {
- streamTask.triggerCheckpoint(checkpointMetaData);
+ streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint());
fail("Expected test exception here.");
} catch (Exception e) {
assertEquals(testException, e.getCause());
@@ -380,18 +381,18 @@ public class StreamTaskTest extends TestLogger {
when(operatorSnapshotResult3.getOperatorStateRawFuture()).thenReturn(failingFuture);
- when(streamOperator1.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult1);
- when(streamOperator2.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult2);
- when(streamOperator3.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult3);
+ when(streamOperator1.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult1);
+ when(streamOperator2.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult2);
+ when(streamOperator3.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult3);
// mock the legacy state snapshot
StreamStateHandle streamStateHandle1 = mock(StreamStateHandle.class);
StreamStateHandle streamStateHandle2 = mock(StreamStateHandle.class);
StreamStateHandle streamStateHandle3 = mock(StreamStateHandle.class);
- when(streamOperator1.snapshotLegacyOperatorState(anyLong(), anyLong())).thenReturn(streamStateHandle1);
- when(streamOperator2.snapshotLegacyOperatorState(anyLong(), anyLong())).thenReturn(streamStateHandle2);
- when(streamOperator3.snapshotLegacyOperatorState(anyLong(), anyLong())).thenReturn(streamStateHandle3);
+ when(streamOperator1.snapshotLegacyOperatorState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle1);
+ when(streamOperator2.snapshotLegacyOperatorState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle2);
+ when(streamOperator3.snapshotLegacyOperatorState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle3);
StreamOperator<?>[] streamOperators = {streamOperator1, streamOperator2, streamOperator3};
@@ -405,7 +406,7 @@ public class StreamTaskTest extends TestLogger {
Whitebox.setInternalState(streamTask, "asyncOperationsThreadPool", new DirectExecutorService());
Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
- streamTask.triggerCheckpoint(checkpointMetaData);
+ streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint());
verify(streamTask).handleAsyncException(anyString(), any(Throwable.class));
@@ -468,7 +469,7 @@ public class StreamTaskTest extends TestLogger {
new DoneFuture<>(managedOperatorStateHandle),
new DoneFuture<>(rawOperatorStateHandle));
- when(streamOperator.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult);
+ when(streamOperator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult);
StreamOperator<?>[] streamOperators = {streamOperator};
@@ -495,7 +496,7 @@ public class StreamTaskTest extends TestLogger {
Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
Whitebox.setInternalState(streamTask, "stateBackend", mockStateBackend);
- streamTask.triggerCheckpoint(checkpointMetaData);
+ streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint());
acknowledgeCheckpointLatch.await();
@@ -584,7 +585,7 @@ public class StreamTaskTest extends TestLogger {
new DoneFuture<>(managedOperatorStateHandle),
new DoneFuture<>(rawOperatorStateHandle));
- when(streamOperator.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult);
+ when(streamOperator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult);
StreamOperator<?>[] streamOperators = {streamOperator};
@@ -613,7 +614,7 @@ public class StreamTaskTest extends TestLogger {
Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
Whitebox.setInternalState(streamTask, "stateBackend", mockStateBackend);
- streamTask.triggerCheckpoint(checkpointMetaData);
+ streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint());
createSubtask.await();
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index c0a1638..d465619 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
@@ -225,7 +226,7 @@ public class TwoInputStreamTaskTest {
testHarness.invoke();
testHarness.waitForTaskRunning();
- testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 0);
+ testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 0);
// This element should be buffered since we received a checkpoint barrier on
// this input
@@ -262,16 +263,16 @@ public class TwoInputStreamTaskTest {
expectedOutput,
testHarness.getOutput());
- testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1);
- testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0);
- testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 1);
+ testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 1);
+ testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 0);
+ testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 1);
testHarness.waitForInputProcessing();
testHarness.endInput();
testHarness.waitForTaskCompletion();
// now we should see the barrier and after that the buffered elements
- expectedOutput.add(new CheckpointBarrier(0, 0));
+ expectedOutput.add(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()));
expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
TestHarnessUtil.assertOutputEquals("Output was not correct.",
@@ -306,7 +307,7 @@ public class TwoInputStreamTaskTest {
testHarness.invoke();
testHarness.waitForTaskRunning();
- testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 0);
+ testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 0);
// These elements should be buffered until we receive barriers from
// all inputs
@@ -329,15 +330,15 @@ public class TwoInputStreamTaskTest {
// Now give a later barrier to all inputs, this should unblock the first channel,
// thereby allowing the two blocked elements through
- testHarness.processEvent(new CheckpointBarrier(1, 1), 0, 0);
- testHarness.processEvent(new CheckpointBarrier(1, 1), 0, 1);
- testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 0);
- testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 1);
+ testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 0, 0);
+ testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 0, 1);
+ testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 1, 0);
+ testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 1, 1);
expectedOutput.add(new CancelCheckpointMarker(0));
expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
- expectedOutput.add(new CheckpointBarrier(1, 1));
+ expectedOutput.add(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()));
testHarness.waitForInputProcessing();
@@ -347,9 +348,9 @@ public class TwoInputStreamTaskTest {
// Then give the earlier barrier, these should be ignored
- testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1);
- testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0);
- testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 1);
+ testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 1);
+ testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 0);
+ testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 1);
testHarness.waitForInputProcessing();
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 01afec6..07424f7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -29,6 +29,7 @@ import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0Serializer;
import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.migration.util.MigrationInstantiationUtil;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.OperatorStateRepartitioner;
import org.apache.flink.runtime.checkpoint.RoundRobinOperatorStateRepartitioner;
import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
@@ -478,11 +479,16 @@ public class AbstractStreamOperatorTestHarness<OUT> {
}
/**
- * Calls {@link StreamOperator#snapshotState(long, long)}.
+ * Calls {@link StreamOperator#snapshotState(long, long, CheckpointOptions)}.
*/
public OperatorStateHandles snapshot(long checkpointId, long timestamp) throws Exception {
- OperatorSnapshotResult operatorStateResult = operator.snapshotState(checkpointId, timestamp);
+ CheckpointStreamFactory streamFactory = stateBackend.createStreamFactory(new JobID(), "test_op");
+
+ OperatorSnapshotResult operatorStateResult = operator.snapshotState(
+ checkpointId,
+ timestamp,
+ CheckpointOptions.forFullCheckpoint());
KeyGroupsStateHandle keyedManaged = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateManagedFuture());
KeyGroupsStateHandle keyedRaw = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateRawFuture());
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index cde5780..effb44c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
@@ -143,9 +144,11 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
}
if (keyedStateBackend != null) {
- RunnableFuture<KeyGroupsStateHandle> keyedSnapshotRunnable = keyedStateBackend.snapshot(checkpointId,
+ RunnableFuture<KeyGroupsStateHandle> keyedSnapshotRunnable = keyedStateBackend.snapshot(
+ checkpointId,
timestamp,
- streamFactory);
+ streamFactory,
+ CheckpointOptions.forFullCheckpoint());
if(!keyedSnapshotRunnable.isDone()) {
Thread runner = new Thread(keyedSnapshotRunnable);
runner.start();
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 128522b..ac37009 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -23,6 +23,7 @@ import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
+import java.io.FileNotFoundException;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.MapFunction;
@@ -160,21 +161,21 @@ public class SavepointITCase extends TestLogger {
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
final File checkpointDir = new File(tmpDir, "checkpoints");
- final File savepointDir = new File(tmpDir, "savepoints");
+ final File savepointRootDir = new File(tmpDir, "savepoints");
- if (!checkpointDir.mkdir() || !savepointDir.mkdirs()) {
+ if (!checkpointDir.mkdir() || !savepointRootDir.mkdirs()) {
fail("Test setup failed: failed to create temporary directories.");
}
LOG.info("Created temporary checkpoint directory: " + checkpointDir + ".");
- LOG.info("Created temporary savepoint directory: " + savepointDir + ".");
+ LOG.info("Created temporary savepoint directory: " + savepointRootDir + ".");
config.setString(CoreOptions.STATE_BACKEND, "filesystem");
config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY,
checkpointDir.toURI().toString());
config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0");
config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY,
- savepointDir.toURI().toString());
+ savepointRootDir.toURI().toString());
LOG.info("Flink configuration: " + config + ".");
@@ -217,14 +218,6 @@ public class SavepointITCase extends TestLogger {
.result(savepointPathFuture, deadline.timeLeft())).savepointPath();
LOG.info("Retrieved savepoint path: " + savepointPath + ".");
- // Only one savepoint should exist
- File[] files = savepointDir.listFiles();
- if (files != null) {
- assertEquals("Savepoint not created in expected directory", 1, files.length);
- } else {
- fail("Savepoint not created in expected directory");
- }
-
// Retrieve the savepoint from the testing job manager
LOG.info("Requesting the savepoint.");
Future<Object> savepointFuture = jobManager.ask(new RequestSavepoint(savepointPath), deadline.timeLeft());
@@ -240,15 +233,33 @@ public class SavepointITCase extends TestLogger {
// - Verification START -------------------------------------------
+ // Only one savepoint should exist
+ File[] files = savepointRootDir.listFiles();
+
+ if (files != null) {
+ assertEquals("Savepoint not created in expected directory", 1, files.length);
+ assertTrue("Savepoint did not create self-contained directory", files[0].isDirectory());
+
+ File savepointDir = files[0];
+ File[] savepointFiles = savepointDir.listFiles();
+ assertNotNull(savepointFiles);
+ assertTrue("Did not write savepoint files to directory",savepointFiles.length > 1);
+ } else {
+ fail("Savepoint not created in expected directory");
+ }
+
// Only one checkpoint of the savepoint should exist
// We currently have the following directory layout: checkpointDir/jobId/chk-ID
- files = checkpointDir.listFiles();
- assertNotNull("Checkpoint directory empty", files);
- assertEquals("Checkpoints directory cleaned up, but needed for savepoint.", 1, files.length);
- assertEquals("No job-specific base directory", jobGraph.getJobID().toString(), files[0].getName());
+ File jobCheckpoints = new File(checkpointDir, jobId.toString());
+
+ if (jobCheckpoints.exists()) {
+ files = jobCheckpoints.listFiles();
+ assertNotNull("Checkpoint directory empty", files);
+ assertEquals("Checkpoints directory not cleaned up: " + Arrays.toString(files), 0, files.length);
+ }
// Only one savepoint should exist
- files = savepointDir.listFiles();
+ files = savepointRootDir.listFiles();
assertNotNull("Savepoint directory empty", files);
assertEquals("No savepoint found in savepoint directory", 1, files.length);
@@ -399,8 +410,8 @@ public class SavepointITCase extends TestLogger {
// All savepoints should have been cleaned up
errMsg = "Savepoints directory not cleaned up properly: " +
- Arrays.toString(savepointDir.listFiles()) + ".";
- assertEquals(errMsg, 0, savepointDir.listFiles().length);
+ Arrays.toString(savepointRootDir.listFiles()) + ".";
+ assertEquals(errMsg, 0, savepointRootDir.listFiles().length);
// - Verification END ---------------------------------------------
} finally {
@@ -468,7 +479,7 @@ public class SavepointITCase extends TestLogger {
flink.submitJobAndWait(jobGraph, false);
} catch (Exception e) {
assertEquals(JobExecutionException.class, e.getClass());
- assertEquals(IllegalArgumentException.class, e.getCause().getClass());
+ assertEquals(FileNotFoundException.class, e.getCause().getClass());
}
} finally {
if (flink != null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
index ec6a8f5..79665dd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
@@ -86,7 +86,6 @@ public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
}
}
-
public static class FailingStateBackend extends AbstractStateBackend {
private static final long serialVersionUID = 1L;
@@ -97,6 +96,12 @@ public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
}
@Override
+ public CheckpointStreamFactory createSavepointStreamFactory(JobID jobId,
+ String operatorIdentifier, String targetLocation) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,