You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/23 19:10:32 UTC
[4/7] flink git commit: [FLINK-5763] [checkpoints] Add
CheckpointOptions
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index c2ada3b..d8e46fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -313,8 +313,8 @@ public class CheckpointCoordinatorTest {
assertFalse(checkpoint.isFullyAcknowledged());
// check that the vertices received the trigger checkpoint message
- verify(vertex1.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, timestamp);
- verify(vertex2.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, timestamp);
+ verify(vertex1.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, timestamp, CheckpointOptions.forFullCheckpoint());
+ verify(vertex2.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, timestamp, CheckpointOptions.forFullCheckpoint());
CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
@@ -428,14 +428,14 @@ public class CheckpointCoordinatorTest {
// check that the vertices received the trigger checkpoint message
{
- verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint1Id), eq(timestamp));
- verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint1Id), eq(timestamp));
+ verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint1Id), eq(timestamp), any(CheckpointOptions.class));
+ verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint1Id), eq(timestamp), any(CheckpointOptions.class));
}
// check that the vertices received the trigger checkpoint message for the second checkpoint
{
- verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint2Id), eq(timestamp + 2));
- verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint2Id), eq(timestamp + 2));
+ verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint2Id), eq(timestamp + 2), any(CheckpointOptions.class));
+ verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint2Id), eq(timestamp + 2), any(CheckpointOptions.class));
}
// decline checkpoint from one of the tasks, this should cancel the checkpoint
@@ -529,8 +529,8 @@ public class CheckpointCoordinatorTest {
// check that the vertices received the trigger checkpoint message
{
- verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp));
- verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp));
+ verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), any(CheckpointOptions.class));
+ verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), any(CheckpointOptions.class));
}
// acknowledge from one of the tasks
@@ -558,8 +558,8 @@ public class CheckpointCoordinatorTest {
// validate that the relevant tasks got a confirmation message
{
- verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp));
- verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp));
+ verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), any(CheckpointOptions.class));
+ verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), any(CheckpointOptions.class));
}
CompletedCheckpoint success = coord.getSuccessfulCheckpoints().get(0);
@@ -589,8 +589,8 @@ public class CheckpointCoordinatorTest {
// validate that the relevant tasks got a confirmation message
{
- verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew));
- verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew));
+ verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew), any(CheckpointOptions.class));
+ verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew), any(CheckpointOptions.class));
verify(vertex1.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointIdNew), eq(timestampNew));
verify(vertex2.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointIdNew), eq(timestampNew));
@@ -660,8 +660,8 @@ public class CheckpointCoordinatorTest {
long checkpointId1 = pending1.getCheckpointId();
// trigger messages should have been sent
- verify(triggerVertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1));
- verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1));
+ verify(triggerVertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1), any(CheckpointOptions.class));
+ verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1), any(CheckpointOptions.class));
CheckpointMetaData checkpointMetaData1 = new CheckpointMetaData(checkpointId1, 0L);
@@ -687,8 +687,8 @@ public class CheckpointCoordinatorTest {
CheckpointMetaData checkpointMetaData2 = new CheckpointMetaData(checkpointId2, 0L);
// trigger messages should have been sent
- verify(triggerVertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2));
- verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2));
+ verify(triggerVertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2), any(CheckpointOptions.class));
+ verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2), any(CheckpointOptions.class));
// we acknowledge the remaining two tasks from the first
// checkpoint and two tasks from the second checkpoint
@@ -794,8 +794,8 @@ public class CheckpointCoordinatorTest {
long checkpointId1 = pending1.getCheckpointId();
// trigger messages should have been sent
- verify(triggerVertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1));
- verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1));
+ verify(triggerVertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1), any(CheckpointOptions.class));
+ verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1), any(CheckpointOptions.class));
CheckpointMetaData checkpointMetaData1 = new CheckpointMetaData(checkpointId1, 0L);
@@ -819,8 +819,8 @@ public class CheckpointCoordinatorTest {
long checkpointId2 = pending2.getCheckpointId();
// trigger messages should have been sent
- verify(triggerVertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2));
- verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2));
+ verify(triggerVertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2), any(CheckpointOptions.class));
+ verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2), any(CheckpointOptions.class));
// we acknowledge one more task from the first checkpoint and the second
// checkpoint completely. The second checkpoint should then subsume the first checkpoint
@@ -1142,7 +1142,7 @@ public class CheckpointCoordinatorTest {
numCalls.incrementAndGet();
return null;
}
- }).when(execution).triggerCheckpoint(anyLong(), anyLong());
+ }).when(execution).triggerCheckpoint(anyLong(), anyLong(), any(CheckpointOptions.class));
CheckpointCoordinator coord = new CheckpointCoordinator(
jid,
@@ -1232,7 +1232,7 @@ public class CheckpointCoordinatorTest {
triggerCalls.add((Long) invocation.getArguments()[0]);
return null;
}
- }).when(executionAttempt).triggerCheckpoint(anyLong(), anyLong());
+ }).when(executionAttempt).triggerCheckpoint(anyLong(), anyLong(), any(CheckpointOptions.class));
final long delay = 50;
@@ -1398,7 +1398,6 @@ public class CheckpointCoordinatorTest {
assertFalse(savepointFuture.isDone());
long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
- CheckpointMetaData checkpointMetaDataNew = new CheckpointMetaData(checkpointIdNew, 0L);
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointIdNew));
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointIdNew));
@@ -1414,8 +1413,8 @@ public class CheckpointCoordinatorTest {
// validate that the relevant tasks got a confirmation message
{
- verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew));
- verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew));
+ verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew), any(CheckpointOptions.class));
+ verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew), any(CheckpointOptions.class));
verify(vertex1.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointIdNew), eq(timestampNew));
verify(vertex2.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointIdNew), eq(timestampNew));
@@ -1537,7 +1536,7 @@ public class CheckpointCoordinatorTest {
numCalls.incrementAndGet();
return null;
}
- }).when(execution).triggerCheckpoint(anyLong(), anyLong());
+ }).when(execution).triggerCheckpoint(anyLong(), anyLong(), any(CheckpointOptions.class));
doAnswer(new Answer<Void>() {
@Override
@@ -1578,7 +1577,7 @@ public class CheckpointCoordinatorTest {
assertEquals(maxConcurrentAttempts, numCalls.get());
verify(triggerVertex.getCurrentExecutionAttempt(), times(maxConcurrentAttempts))
- .triggerCheckpoint(anyLong(), anyLong());
+ .triggerCheckpoint(anyLong(), anyLong(), any(CheckpointOptions.class));
// now, once we acknowledge one checkpoint, it should trigger the next one
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, 1L));
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
new file mode 100644
index 0000000..6788338
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType;
+import org.junit.Test;
+
+public class CheckpointOptionsTest {
+
+ @Test
+ public void testFullCheckpoint() throws Exception {
+ CheckpointOptions options = CheckpointOptions.forFullCheckpoint();
+ assertEquals(CheckpointType.FULL_CHECKPOINT, options.getCheckpointType());
+ assertNull(options.getTargetLocation());
+ }
+
+ @Test
+ public void testSavepoint() throws Exception {
+ String location = "asdasdadasdasdja7931481398123123123kjhasdkajsd";
+ CheckpointOptions options = CheckpointOptions.forSavepoint(location);
+ assertEquals(CheckpointType.SAVEPOINT, options.getCheckpointType());
+ assertEquals(location, options.getTargetLocation());
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testSavepointNullCheck() throws Exception {
+ CheckpointOptions.forSavepoint(null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
index 3c373f1..95a31d4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
@@ -184,6 +184,7 @@ public class CheckpointStatsHistoryTest {
when(completed.getStatus()).thenReturn(CheckpointStatsStatus.COMPLETED);
when(completed.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
when(completed.getCheckpointId()).thenReturn(checkpointId);
+ when(completed.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
return completed;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
index 512768d..6ab8620 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
@@ -183,7 +183,7 @@ public class MigrationV0ToV1Test {
} finally {
// Dispose
- SavepointStore.removeSavepoint(path.toString());
+ SavepointStore.removeSavepointFile(path.toString());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
index 6471d6f..c66b29d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
@@ -64,12 +64,12 @@ public class SavepointLoaderTest {
Map<JobVertexID, TaskState> taskStates = new HashMap<>();
taskStates.put(vertexId, state);
+ JobID jobId = new JobID();
+
// Store savepoint
SavepointV1 savepoint = new SavepointV1(checkpointId, taskStates.values());
String path = SavepointStore.storeSavepoint(tmp.getAbsolutePath(), savepoint);
- JobID jobId = new JobID();
-
ExecutionJobVertex vertex = mock(ExecutionJobVertex.class);
when(vertex.getParallelism()).thenReturn(parallelism);
when(vertex.getMaxParallelism()).thenReturn(parallelism);
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
index 3398341..dc19e47 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
@@ -18,6 +18,9 @@
package org.apache.flink.runtime.checkpoint.savepoint;
+import java.io.File;
+import java.util.Arrays;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
@@ -38,6 +41,7 @@ import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
@@ -54,14 +58,22 @@ public class SavepointStoreTest {
*/
@Test
public void testStoreLoadDispose() throws Exception {
- String target = tmp.getRoot().getAbsolutePath();
+ String root = tmp.getRoot().getAbsolutePath();
+ File rootFile = new File(root);
- assertEquals(0, tmp.getRoot().listFiles().length);
+ File[] list = rootFile.listFiles();
+
+ assertNotNull(list);
+ assertEquals(0, list.length);
// Store
+ String savepointDirectory = SavepointStore.createSavepointDirectory(root, new JobID());
SavepointV1 stored = new SavepointV1(1929292, SavepointV1Test.createTaskStates(4, 24));
- String path = SavepointStore.storeSavepoint(target, stored);
- assertEquals(1, tmp.getRoot().listFiles().length);
+ String path = SavepointStore.storeSavepoint(savepointDirectory, stored);
+
+ list = rootFile.listFiles();
+ assertNotNull(list);
+ assertEquals(1, list.length);
// Load
Savepoint loaded = SavepointStore.loadSavepoint(path, Thread.currentThread().getContextClassLoader());
@@ -70,9 +82,11 @@ public class SavepointStoreTest {
loaded.dispose();
// Dispose
- SavepointStore.removeSavepoint(path);
+ SavepointStore.deleteSavepointDirectory(path);
- assertEquals(0, tmp.getRoot().listFiles().length);
+ list = rootFile.listFiles();
+ assertNotNull(list);
+ assertEquals(0, list.length);
}
/**
@@ -108,8 +122,8 @@ public class SavepointStoreTest {
assertTrue(serializers.size() >= 1);
- String target = tmp.getRoot().getAbsolutePath();
- assertEquals(0, tmp.getRoot().listFiles().length);
+ String root = tmp.getRoot().getAbsolutePath();
+ File rootFile = new File(root);
// New savepoint type for test
int version = ThreadLocalRandom.current().nextInt();
@@ -118,14 +132,24 @@ public class SavepointStoreTest {
// Add serializer
serializers.put(version, NewSavepointSerializer.INSTANCE);
+ String savepointDirectory1 = SavepointStore.createSavepointDirectory(root, new JobID());
TestSavepoint newSavepoint = new TestSavepoint(version, checkpointId);
- String pathNewSavepoint = SavepointStore.storeSavepoint(target, newSavepoint);
- assertEquals(1, tmp.getRoot().listFiles().length);
+ String pathNewSavepoint = SavepointStore.storeSavepoint(savepointDirectory1, newSavepoint);
+
+ File[] list = rootFile.listFiles();
+
+ assertNotNull(list);
+ assertEquals(1, list.length);
// Savepoint v0
+ String savepointDirectory2 = SavepointStore.createSavepointDirectory(root, new JobID());
Savepoint savepoint = new SavepointV1(checkpointId, SavepointV1Test.createTaskStates(4, 32));
- String pathSavepoint = SavepointStore.storeSavepoint(target, savepoint);
- assertEquals(2, tmp.getRoot().listFiles().length);
+ String pathSavepoint = SavepointStore.storeSavepoint(savepointDirectory2, savepoint);
+
+ list = rootFile.listFiles();
+
+ assertNotNull(list);
+ assertEquals(2, list.length);
// Load
Savepoint loaded = SavepointStore.loadSavepoint(pathNewSavepoint, Thread.currentThread().getContextClassLoader());
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
new file mode 100644
index 0000000..dd5b0b6
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.util.DataInputDeserializer;
+import org.apache.flink.runtime.util.DataOutputSerializer;
+import org.junit.Test;
+
+public class CheckpointBarrierTest {
+
+ /**
+ * Test serialization of the checkpoint barrier.
+ */
+ @Test
+ public void testSerialization() throws Exception {
+ long id = Integer.MAX_VALUE + 123123L;
+ long timestamp = Integer.MAX_VALUE + 1228L;
+
+ CheckpointOptions checkpoint = CheckpointOptions.forFullCheckpoint();
+ testSerialization(id, timestamp, checkpoint);
+
+ CheckpointOptions savepoint = CheckpointOptions.forSavepoint("1289031838919123");
+ testSerialization(id, timestamp, savepoint);
+ }
+
+ private void testSerialization(long id, long timestamp, CheckpointOptions options) throws IOException {
+ CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, options);
+
+ DataOutputSerializer out = new DataOutputSerializer(1024);
+ barrier.write(out);
+
+ DataInputDeserializer in = new DataInputDeserializer(out.wrapAsByteBuffer());
+ CheckpointBarrier deserialized = new CheckpointBarrier();
+ deserialized.read(in);
+
+ assertEquals(id, deserialized.getId());
+ assertEquals(timestamp, deserialized.getTimestamp());
+ assertEquals(options.getCheckpointType(), deserialized.getCheckpointOptions().getCheckpointType());
+ assertEquals(options.getTargetLocation(), deserialized.getCheckpointOptions().getTargetLocation());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
index 271d0d2..e674eb7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
@@ -18,6 +18,14 @@
package org.apache.flink.runtime.io.network.api.serialization;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -25,25 +33,42 @@ import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.util.TestTaskEvent;
-
import org.junit.Test;
-import java.io.IOException;
-import java.nio.ByteBuffer;
+public class EventSerializerTest {
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+ @Test
+ public void testCheckpointBarrierSerialization() throws Exception {
+ long id = Integer.MAX_VALUE + 123123L;
+ long timestamp = Integer.MAX_VALUE + 1228L;
-public class EventSerializerTest {
+ CheckpointOptions checkpoint = CheckpointOptions.forFullCheckpoint();
+ testCheckpointBarrierSerialization(id, timestamp, checkpoint);
+
+ CheckpointOptions savepoint = CheckpointOptions.forSavepoint("1289031838919123");
+ testCheckpointBarrierSerialization(id, timestamp, savepoint);
+ }
+
+ private void testCheckpointBarrierSerialization(long id, long timestamp, CheckpointOptions options) throws IOException {
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+
+ CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, options);
+ ByteBuffer serialized = EventSerializer.toSerializedEvent(barrier);
+ CheckpointBarrier deserialized = (CheckpointBarrier) EventSerializer.fromSerializedEvent(serialized, cl);
+ assertFalse(serialized.hasRemaining());
+
+ assertEquals(id, deserialized.getId());
+ assertEquals(timestamp, deserialized.getTimestamp());
+ assertEquals(options.getCheckpointType(), deserialized.getCheckpointOptions().getCheckpointType());
+ assertEquals(options.getTargetLocation(), deserialized.getCheckpointOptions().getTargetLocation());
+ }
@Test
public void testSerializeDeserializeEvent() throws Exception {
AbstractEvent[] events = {
EndOfPartitionEvent.INSTANCE,
EndOfSuperstepEvent.INSTANCE,
- new CheckpointBarrier(1678L, 4623784L),
+ new CheckpointBarrier(1678L, 4623784L, CheckpointOptions.forFullCheckpoint()),
new TestTaskEvent(Math.random(), 12361231273L),
new CancelCheckpointMarker(287087987329842L)
};
@@ -94,7 +119,7 @@ public class EventSerializerTest {
AbstractEvent[] events = {
EndOfPartitionEvent.INSTANCE,
EndOfSuperstepEvent.INSTANCE,
- new CheckpointBarrier(1678L, 4623784L),
+ new CheckpointBarrier(1678L, 4623784L, CheckpointOptions.forFullCheckpoint()),
new TestTaskEvent(Math.random(), 12361231273L),
new CancelCheckpointMarker(287087987329842L)
};
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 63175ed..900b5c3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
@@ -327,7 +328,7 @@ public class RecordWriterTest {
ResultPartitionWriter partitionWriter = createCollectingPartitionWriter(queues, bufferProvider);
RecordWriter<ByteArrayIO> writer = new RecordWriter<>(partitionWriter, new RoundRobin<ByteArrayIO>());
- CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 919192L, Integer.MAX_VALUE + 18828228L);
+ CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 919192L, Integer.MAX_VALUE + 18828228L, CheckpointOptions.forFullCheckpoint());
// No records emitted yet, broadcast should not request a buffer
writer.broadcastEvent(barrier);
@@ -363,7 +364,7 @@ public class RecordWriterTest {
ResultPartitionWriter partitionWriter = createCollectingPartitionWriter(queues, bufferProvider);
RecordWriter<ByteArrayIO> writer = new RecordWriter<>(partitionWriter, new RoundRobin<ByteArrayIO>());
- CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 1292L, Integer.MAX_VALUE + 199L);
+ CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 1292L, Integer.MAX_VALUE + 199L, CheckpointOptions.forFullCheckpoint());
// Emit records on some channels first (requesting buffers), then
// broadcast the event. The record buffers should be emitted first, then
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index de54d1f..5a38be2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.blob.BlobService;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
@@ -601,7 +602,7 @@ public class JobManagerHARecoveryTest {
}
@Override
- public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception {
+ public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
ByteStreamStateHandle byteStreamStateHandle = new TestByteStreamStateHandleDeepCompare(
String.valueOf(UUID.randomUUID()),
InstantiationUtil.serializeObject(checkpointMetaData.getCheckpointId()));
@@ -619,7 +620,7 @@ public class JobManagerHARecoveryTest {
}
@Override
- public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
+ public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
throw new UnsupportedOperationException("should not be called!");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
index db45231..bc420cc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -49,7 +50,7 @@ public class CheckpointMessagesTest {
NotifyCheckpointComplete cc = new NotifyCheckpointComplete(new JobID(), new ExecutionAttemptID(), 45287698767345L, 467L);
testSerializabilityEqualsHashCode(cc);
- TriggerCheckpoint tc = new TriggerCheckpoint(new JobID(), new ExecutionAttemptID(), 347652734L, 7576752L);
+ TriggerCheckpoint tc = new TriggerCheckpoint(new JobID(), new ExecutionAttemptID(), 347652734L, 7576752L, CheckpointOptions.forFullCheckpoint());
testSerializabilityEqualsHashCode(tc);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index 5bd085f..94df524 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.junit.Test;
@@ -165,7 +166,7 @@ public class OperatorStateBackendTest {
listState3.add(20);
CheckpointStreamFactory streamFactory = abstractStateBackend.createStreamFactory(new JobID(), "testOperator");
- OperatorStateHandle stateHandle = operatorStateBackend.snapshot(1, 1, streamFactory).get();
+ OperatorStateHandle stateHandle = operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forFullCheckpoint()).get();
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 3b0350d..f2416b9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -41,6 +41,7 @@ import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
@@ -191,7 +192,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
assertEquals("1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
// draw a snapshot
- KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+ KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
// make some more modifications
backend.setCurrentKey(1);
@@ -202,7 +203,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
state.update("u3");
// draw another snapshot
- KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory));
+ KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
// validate the original state
backend.setCurrentKey(1);
@@ -403,7 +404,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
assertEquals(13, (int) state2.value());
// draw a snapshot
- KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+ KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
backend.dispose();
backend = restoreKeyedBackend(
@@ -476,7 +477,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
assertEquals(42L, (long) state.value());
// draw a snapshot
- KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+ KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
backend.dispose();
backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1);
@@ -521,7 +522,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
assertEquals("1", joiner.join(getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
// draw a snapshot
- KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+ KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
// make some more modifications
backend.setCurrentKey(1);
@@ -532,7 +533,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
state.add("u3");
// draw another snapshot
- KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory));
+ KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
// validate the original state
backend.setCurrentKey(1);
@@ -620,7 +621,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
assertEquals("1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
// draw a snapshot
- KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+ KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
// make some more modifications
backend.setCurrentKey(1);
@@ -631,7 +632,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
state.add("u3");
// draw another snapshot
- KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory));
+ KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
// validate the original state
backend.setCurrentKey(1);
@@ -722,7 +723,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
assertEquals("Fold-Initial:,1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
// draw a snapshot
- KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+ KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
// make some more modifications
backend.setCurrentKey(1);
@@ -734,7 +735,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
state.add(103);
// draw another snapshot
- KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory));
+ KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
// validate the original state
backend.setCurrentKey(1);
@@ -829,7 +830,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
// draw a snapshot
- KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+ KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
// make some more modifications
backend.setCurrentKey(1);
@@ -841,7 +842,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
state.putAll(new HashMap<Integer, String>() {{ put(1031, "1031"); put(1032, "1032"); }});
// draw another snapshot
- KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory));
+ KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
// validate the original state
backend.setCurrentKey(1);
@@ -1163,7 +1164,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
state.update("ShouldBeInSecondHalf");
- KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(0, 0, streamFactory));
+ KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(0, 0, streamFactory, CheckpointOptions.forFullCheckpoint()));
List<KeyGroupsStateHandle> firstHalfKeyGroupStates = StateAssignmentOperation.getKeyGroupsStateHandles(
Collections.singletonList(snapshot),
@@ -1230,7 +1231,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
state.update("2");
// draw a snapshot
- KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+ KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
backend.dispose();
// restore the first snapshot and validate it
@@ -1281,7 +1282,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
state.add("2");
// draw a snapshot
- KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+ KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
backend.dispose();
// restore the first snapshot and validate it
@@ -1334,7 +1335,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
state.add("2");
// draw a snapshot
- KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+ KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
backend.dispose();
// restore the first snapshot and validate it
@@ -1385,7 +1386,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
state.put("2", "Second");
// draw a snapshot
- KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+ KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
backend.dispose();
// restore the first snapshot and validate it
@@ -1661,7 +1662,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
eq(env.getJobID()), eq(env.getJobVertexId()), eq(expectedKeyGroupRange), eq("banana"), any(KvStateID.class));
- KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory));
+ KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
backend.dispose();
@@ -1692,7 +1693,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", String.class);
// draw a snapshot
- KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(682375462379L, 1, streamFactory));
+ KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(682375462379L, 1, streamFactory, CheckpointOptions.forFullCheckpoint()));
assertNull(snapshot);
backend.dispose();
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactoryTest.java
new file mode 100644
index 0000000..a29d29c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactoryTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.File;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class FsSavepointStreamFactoryTest {
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
+ /**
+ * Tests that the factory creates all files in the given directory without
+ * creating any sub directories.
+ */
+ @Test
+ public void testSavepointStreamDirectoryLayout() throws Exception {
+ File testRoot = folder.newFolder();
+ JobID jobId = new JobID();
+
+ FsSavepointStreamFactory savepointStreamFactory = new FsSavepointStreamFactory(
+ new Path(testRoot.getAbsolutePath()),
+ jobId,
+ 0);
+
+ File[] listed = testRoot.listFiles();
+ assertNotNull(listed);
+ assertEquals(0, listed.length);
+
+ FsCheckpointStateOutputStream stream = savepointStreamFactory
+ .createCheckpointStateOutputStream(1273, 19231);
+
+ stream.write(1);
+
+ FileStateHandle handle = (FileStateHandle) stream.closeAndGetHandle();
+
+ listed = testRoot.listFiles();
+ assertNotNull(listed);
+ assertEquals(1, listed.length);
+ assertEquals(handle.getFilePath().getPath(), listed[0].getPath());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 187163d..89ae5da 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -91,7 +92,7 @@ public class TaskAsyncCallTest {
awaitLatch.await();
for (int i = 1; i <= NUM_CALLS; i++) {
- task.triggerCheckpointBarrier(i, 156865867234L);
+ task.triggerCheckpointBarrier(i, 156865867234L, CheckpointOptions.forFullCheckpoint());
}
triggerLatch.await();
@@ -121,7 +122,7 @@ public class TaskAsyncCallTest {
awaitLatch.await();
for (int i = 1; i <= NUM_CALLS; i++) {
- task.triggerCheckpointBarrier(i, 156865867234L);
+ task.triggerCheckpointBarrier(i, 156865867234L, CheckpointOptions.forFullCheckpoint());
task.notifyCheckpointComplete(i);
}
@@ -226,7 +227,7 @@ public class TaskAsyncCallTest {
public void setInitialState(TaskStateHandles taskStateHandles) throws Exception {}
@Override
- public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) {
+ public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
lastCheckpointId++;
if (checkpointMetaData.getCheckpointId() == lastCheckpointId) {
if (lastCheckpointId == NUM_CALLS) {
@@ -243,7 +244,7 @@ public class TaskAsyncCallTest {
}
@Override
- public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
+ public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
throw new UnsupportedOperationException("Should not be called");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 144247f..05fda28 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.operators;
+import java.io.IOException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import org.apache.flink.annotation.PublicEvolving;
@@ -36,6 +37,8 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
@@ -340,17 +343,19 @@ public abstract class AbstractStreamOperator<OUT>
}
@Override
- public final OperatorSnapshotResult snapshotState(long checkpointId, long timestamp) throws Exception {
+ public final OperatorSnapshotResult snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws Exception {
KeyGroupRange keyGroupRange = null != keyedStateBackend ?
keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
OperatorSnapshotResult snapshotInProgress = new OperatorSnapshotResult();
+ CheckpointStreamFactory factory = getCheckpointStreamFactory(checkpointOptions);
+
try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
checkpointId,
timestamp,
- checkpointStreamFactory,
+ factory,
keyGroupRange,
getContainingTask().getCancelables())) {
@@ -361,12 +366,12 @@ public abstract class AbstractStreamOperator<OUT>
if (null != operatorStateBackend) {
snapshotInProgress.setOperatorStateManagedFuture(
- operatorStateBackend.snapshot(checkpointId, timestamp, checkpointStreamFactory));
+ operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
}
if (null != keyedStateBackend) {
snapshotInProgress.setKeyedStateManagedFuture(
- keyedStateBackend.snapshot(checkpointId, timestamp, checkpointStreamFactory));
+ keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
}
} catch (Exception snapshotException) {
try {
@@ -431,11 +436,12 @@ public abstract class AbstractStreamOperator<OUT>
@SuppressWarnings("deprecation")
@Deprecated
@Override
- public StreamStateHandle snapshotLegacyOperatorState(long checkpointId, long timestamp) throws Exception {
+ public StreamStateHandle snapshotLegacyOperatorState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws Exception {
if (this instanceof StreamCheckpointedOperator) {
+ CheckpointStreamFactory factory = getCheckpointStreamFactory(checkpointOptions);
final CheckpointStreamFactory.CheckpointStateOutputStream outStream =
- checkpointStreamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
+ factory.createCheckpointStateOutputStream(checkpointId, timestamp);
getContainingTask().getCancelables().registerClosable(outStream);
@@ -495,6 +501,31 @@ public abstract class AbstractStreamOperator<OUT>
@Override
public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {}
+ /**
+ * Returns a checkpoint stream factory for the provided options.
+ *
+ * <p>For {@link CheckpointType#FULL_CHECKPOINT} this returns the shared
+ * factory of this operator.
+ *
+ * <p>For {@link CheckpointType#SAVEPOINT} it creates a custom factory per
+ * savepoint.
+ *
+ * @param checkpointOptions Options for the checkpoint
+ * @return Checkpoint stream factory for the checkpoints
+ * @throws IOException Failures while creating a new stream factory are forwarded
+ */
+ @VisibleForTesting
+ CheckpointStreamFactory getCheckpointStreamFactory(CheckpointOptions checkpointOptions) throws IOException {
+ CheckpointType checkpointType = checkpointOptions.getCheckpointType();
+ if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
+ return checkpointStreamFactory;
+ } else if (checkpointType == CheckpointType.SAVEPOINT) {
+ return container.createSavepointStreamFactory(this, checkpointOptions.getTargetLocation());
+ } else {
+ throw new IllegalStateException("Unknown checkpoint type " + checkpointType);
+ }
+ }
+
// ------------------------------------------------------------------------
// Properties and Services
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
index 5a6c37b..83697ae 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
@@ -26,7 +26,7 @@ import org.apache.flink.util.ExceptionUtils;
import java.util.concurrent.RunnableFuture;
/**
- * Result of {@link AbstractStreamOperator#snapshotState}.
+ * Result of {@link StreamOperator#snapshotState}.
*/
public class OperatorSnapshotResult {
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index d8e4d08..006e910 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.operators;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -98,7 +99,10 @@ public interface StreamOperator<OUT> extends Serializable {
*
* @throws Exception exception that happened during snapshotting.
*/
- OperatorSnapshotResult snapshotState(long checkpointId, long timestamp) throws Exception;
+ OperatorSnapshotResult snapshotState(
+ long checkpointId,
+ long timestamp,
+ CheckpointOptions checkpointOptions) throws Exception;
/**
* Takes a snapshot of the legacy operator state defined via {@link StreamCheckpointedOperator}.
@@ -110,7 +114,10 @@ public interface StreamOperator<OUT> extends Serializable {
*/
@SuppressWarnings("deprecation")
@Deprecated
- StreamStateHandle snapshotLegacyOperatorState(long checkpointId, long timestamp) throws Exception;
+ StreamStateHandle snapshotLegacyOperatorState(
+ long checkpointId,
+ long timestamp,
+ CheckpointOptions checkpointOptions) throws Exception;
/**
* Provides state handles to restore the operator state.
@@ -142,4 +149,5 @@ public interface StreamOperator<OUT> extends Serializable {
void setChainingStrategy(ChainingStrategy strategy);
MetricGroup getMetricGroup();
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 611bd44..2da8389 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -368,7 +368,10 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
.setBytesBufferedInAlignment(bytesBuffered)
.setAlignmentDurationNanos(latestAlignmentDurationNanos);
- toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData, checkpointMetrics);
+ toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
+ checkpointMetaData,
+ checkpointBarrier.getCheckpointOptions(),
+ checkpointMetrics);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index 77608c6..8b1b65b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
@@ -132,7 +133,7 @@ public class BarrierTracker implements CheckpointBarrierHandler {
// fast path for single channel trackers
if (totalNumberOfInputChannels == 1) {
- notifyCheckpoint(barrierId, receivedBarrier.getTimestamp());
+ notifyCheckpoint(barrierId, receivedBarrier.getTimestamp(), receivedBarrier.getCheckpointOptions());
return;
}
@@ -170,7 +171,7 @@ public class BarrierTracker implements CheckpointBarrierHandler {
LOG.debug("Received all barriers for checkpoint {}", barrierId);
}
- notifyCheckpoint(receivedBarrier.getId(), receivedBarrier.getTimestamp());
+ notifyCheckpoint(receivedBarrier.getId(), receivedBarrier.getTimestamp(), receivedBarrier.getCheckpointOptions());
}
}
}
@@ -248,14 +249,14 @@ public class BarrierTracker implements CheckpointBarrierHandler {
}
}
- private void notifyCheckpoint(long checkpointId, long timestamp) throws Exception {
+ private void notifyCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws Exception {
if (toNotifyOnCheckpoint != null) {
CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
CheckpointMetrics checkpointMetrics = new CheckpointMetrics()
.setBytesBufferedInAlignment(0L)
.setAlignmentDurationNanos(0L);
- toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData, checkpointMetrics);
+ toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData, checkpointOptions, checkpointMetrics);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 4f07182..dd93592 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -164,9 +165,9 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
}
}
- public void broadcastCheckpointBarrier(long id, long timestamp) throws IOException {
+ public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException {
try {
- CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp);
+ CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);
for (RecordWriterOutput<?> streamOutput : streamOutputs) {
streamOutput.broadcastEvent(barrier);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 62cfb8f..938ffd2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
@@ -163,7 +164,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
private TaskStateHandles restoreStateHandles;
-
/** The currently active background materialization threads */
private final CloseableRegistry cancelables = new CloseableRegistry();
@@ -520,14 +520,14 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
@Override
- public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception {
+ public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
try {
// No alignment if we inject a checkpoint
CheckpointMetrics checkpointMetrics = new CheckpointMetrics()
.setBytesBufferedInAlignment(0L)
.setAlignmentDurationNanos(0L);
- return performCheckpoint(checkpointMetaData, checkpointMetrics);
+ return performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);
}
catch (Exception e) {
// propagate exceptions only if the task is still in "running" state
@@ -543,9 +543,13 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
@Override
- public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
+ public void triggerCheckpointOnBarrier(
+ CheckpointMetaData checkpointMetaData,
+ CheckpointOptions checkpointOptions,
+ CheckpointMetrics checkpointMetrics) throws Exception {
+
try {
- performCheckpoint(checkpointMetaData, checkpointMetrics);
+ performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);
}
catch (CancelTaskException e) {
throw new Exception("Operator " + getName() + " was cancelled while performing checkpoint " +
@@ -570,8 +574,13 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
}
- private boolean performCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
- LOG.debug("Starting checkpoint {} on task {}", checkpointMetaData.getCheckpointId(), getName());
+ private boolean performCheckpoint(
+ CheckpointMetaData checkpointMetaData,
+ CheckpointOptions checkpointOptions,
+ CheckpointMetrics checkpointMetrics) throws Exception {
+
+ LOG.debug("Starting checkpoint ({}) {} on task {}",
+ checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());
synchronized (lock) {
if (isRunning) {
@@ -582,9 +591,11 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
// Given this, we immediately emit the checkpoint barriers, so the downstream operators
// can start their checkpoint work as soon as possible
operatorChain.broadcastCheckpointBarrier(
- checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp());
+ checkpointMetaData.getCheckpointId(),
+ checkpointMetaData.getTimestamp(),
+ checkpointOptions);
- checkpointState(checkpointMetaData, checkpointMetrics);
+ checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
return true;
}
else {
@@ -637,8 +648,17 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
}
- private void checkpointState(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
- CheckpointingOperation checkpointingOperation = new CheckpointingOperation(this, checkpointMetaData, checkpointMetrics);
+ private void checkpointState(
+ CheckpointMetaData checkpointMetaData,
+ CheckpointOptions checkpointOptions,
+ CheckpointMetrics checkpointMetrics) throws Exception {
+
+ CheckpointingOperation checkpointingOperation = new CheckpointingOperation(
+ this,
+ checkpointMetaData,
+ checkpointOptions,
+ checkpointMetrics);
+
checkpointingOperation.executeCheckpointing();
}
@@ -814,7 +834,13 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
return stateBackend.createStreamFactory(
getEnvironment().getJobID(),
createOperatorIdentifier(operator, configuration.getVertexID()));
+ }
+ public CheckpointStreamFactory createSavepointStreamFactory(StreamOperator<?> operator, String targetLocation) throws IOException {
+ return stateBackend.createSavepointStreamFactory(
+ getEnvironment().getJobID(),
+ createOperatorIdentifier(operator, configuration.getVertexID()),
+ targetLocation);
}
private String createOperatorIdentifier(StreamOperator<?> operator, int vertexId) {
@@ -1048,6 +1074,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
private final StreamTask<?, ?> owner;
private final CheckpointMetaData checkpointMetaData;
+ private final CheckpointOptions checkpointOptions;
private final CheckpointMetrics checkpointMetrics;
private final StreamOperator<?>[] allOperators;
@@ -1060,9 +1087,15 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
private final List<StreamStateHandle> nonPartitionedStates;
private final List<OperatorSnapshotResult> snapshotInProgressList;
- public CheckpointingOperation(StreamTask<?, ?> owner, CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) {
+ public CheckpointingOperation(
+ StreamTask<?, ?> owner,
+ CheckpointMetaData checkpointMetaData,
+ CheckpointOptions checkpointOptions,
+ CheckpointMetrics checkpointMetrics) {
+
this.owner = Preconditions.checkNotNull(owner);
this.checkpointMetaData = Preconditions.checkNotNull(checkpointMetaData);
+ this.checkpointOptions = Preconditions.checkNotNull(checkpointOptions);
this.checkpointMetrics = Preconditions.checkNotNull(checkpointMetrics);
this.allOperators = owner.operatorChain.getAllOperators();
this.nonPartitionedStates = new ArrayList<>(allOperators.length);
@@ -1137,14 +1170,16 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
@SuppressWarnings("deprecation")
private void checkpointStreamOperator(StreamOperator<?> op) throws Exception {
if (null != op) {
- // first call the legacy checkpoint code paths
+ // first call the legacy checkpoint code paths
nonPartitionedStates.add(op.snapshotLegacyOperatorState(
checkpointMetaData.getCheckpointId(),
- checkpointMetaData.getTimestamp()));
+ checkpointMetaData.getTimestamp(),
+ checkpointOptions));
OperatorSnapshotResult snapshotInProgress = op.snapshotState(
checkpointMetaData.getCheckpointId(),
- checkpointMetaData.getTimestamp());
+ checkpointMetaData.getTimestamp(),
+ checkpointOptions);
snapshotInProgressList.add(snapshotInProgress);
} else {
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
index 6751617..51b9d9a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
@@ -100,4 +100,4 @@ public class ListCheckpointedTest {
return restored;
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index 274611a..8507200 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -17,12 +17,34 @@
*/
package org.apache.flink.streaming.api.operators;
+import static junit.framework.TestCase.assertTrue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.doReturn;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.spy;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.RunnableFuture;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
@@ -45,27 +67,6 @@ import org.mockito.internal.util.reflection.Whitebox;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.RunnableFuture;
-
-import static junit.framework.TestCase.assertTrue;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.contains;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.verify;
-import static org.powermock.api.mockito.PowerMockito.doReturn;
-import static org.powermock.api.mockito.PowerMockito.mock;
-import static org.powermock.api.mockito.PowerMockito.spy;
-import static org.powermock.api.mockito.PowerMockito.when;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
-
/**
* Tests for the facilities provided by {@link AbstractStreamOperator}. This mostly
* tests timers and state and whether they are correctly checkpointed/restored
@@ -495,10 +496,10 @@ public class AbstractStreamOperatorTest {
when(containingTask.getCancelables()).thenReturn(closeableRegistry);
AbstractStreamOperator<Void> operator = mock(AbstractStreamOperator.class);
- when(operator.snapshotState(anyLong(), anyLong())).thenCallRealMethod();
+ when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenCallRealMethod();
doReturn(containingTask).when(operator).getContainingTask();
- operator.snapshotState(checkpointId, timestamp);
+ operator.snapshotState(checkpointId, timestamp, CheckpointOptions.forFullCheckpoint());
verify(context).close();
}
@@ -524,14 +525,14 @@ public class AbstractStreamOperatorTest {
when(containingTask.getCancelables()).thenReturn(closeableRegistry);
AbstractStreamOperator<Void> operator = mock(AbstractStreamOperator.class);
- when(operator.snapshotState(anyLong(), anyLong())).thenCallRealMethod();
+ when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenCallRealMethod();
doReturn(containingTask).when(operator).getContainingTask();
// lets fail when calling the actual snapshotState method
doThrow(failingException).when(operator).snapshotState(eq(context));
try {
- operator.snapshotState(checkpointId, timestamp);
+ operator.snapshotState(checkpointId, timestamp, CheckpointOptions.forFullCheckpoint());
fail("Exception expected.");
} catch (Exception e) {
assertEquals(failingException, e.getCause());
@@ -571,23 +572,29 @@ public class AbstractStreamOperatorTest {
when(containingTask.getCancelables()).thenReturn(closeableRegistry);
AbstractStreamOperator<Void> operator = mock(AbstractStreamOperator.class);
- when(operator.snapshotState(anyLong(), anyLong())).thenCallRealMethod();
+ when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenCallRealMethod();
+
+ // The amount of mocking in this test makes it necessary to make the
+ // getCheckpointStreamFactory method visible for the test and to
+ // overwrite its behaviour.
+ when(operator.getCheckpointStreamFactory(any(CheckpointOptions.class))).thenReturn(streamFactory);
+
doReturn(containingTask).when(operator).getContainingTask();
RunnableFuture<OperatorStateHandle> futureManagedOperatorStateHandle = mock(RunnableFuture.class);
OperatorStateBackend operatorStateBackend = mock(OperatorStateBackend.class);
- when(operatorStateBackend.snapshot(eq(checkpointId), eq(timestamp), eq(streamFactory))).thenReturn(futureManagedOperatorStateHandle);
+ when(operatorStateBackend.snapshot(eq(checkpointId), eq(timestamp), eq(streamFactory), any(CheckpointOptions.class))).thenReturn(futureManagedOperatorStateHandle);
AbstractKeyedStateBackend<?> keyedStateBackend = mock(AbstractKeyedStateBackend.class);
- when(keyedStateBackend.snapshot(eq(checkpointId), eq(timestamp), eq(streamFactory))).thenThrow(failingException);
+ when(keyedStateBackend.snapshot(eq(checkpointId), eq(timestamp), eq(streamFactory), eq(CheckpointOptions.forFullCheckpoint()))).thenThrow(failingException);
Whitebox.setInternalState(operator, "operatorStateBackend", operatorStateBackend);
Whitebox.setInternalState(operator, "keyedStateBackend", keyedStateBackend);
Whitebox.setInternalState(operator, "checkpointStreamFactory", streamFactory);
try {
- operator.snapshotState(checkpointId, timestamp);
+ operator.snapshotState(checkpointId, timestamp, CheckpointOptions.forFullCheckpoint());
fail("Exception expected.");
} catch (Exception e) {
assertEquals(failingException, e.getCause());
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
index c4ddea8..d331171 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -90,8 +91,8 @@ public class AbstractUdfStreamOperatorLifecycleTest {
"setup[class org.apache.flink.streaming.runtime.tasks.StreamTask, class " +
"org.apache.flink.streaming.api.graph.StreamConfig, interface " +
"org.apache.flink.streaming.api.operators.Output], " +
- "snapshotLegacyOperatorState[long, long], " +
- "snapshotState[long, long]]";
+ "snapshotLegacyOperatorState[long, long, class org.apache.flink.runtime.checkpoint.CheckpointOptions], " +
+ "snapshotState[long, long, class org.apache.flink.runtime.checkpoint.CheckpointOptions]]";
private static final String ALL_METHODS_RICH_FUNCTION = "[close[], getIterationRuntimeContext[], getRuntimeContext[]" +
", open[class org.apache.flink.configuration.Configuration], setRuntimeContext[interface " +
@@ -240,7 +241,8 @@ public class AbstractUdfStreamOperatorLifecycleTest {
try {
runStarted.await();
if (getContainingTask().isCanceled() || getContainingTask().triggerCheckpoint(
- new CheckpointMetaData(0, System.currentTimeMillis()))) {
+ new CheckpointMetaData(0, System.currentTimeMillis()),
+ CheckpointOptions.forFullCheckpoint())) {
LifecycleTrackingStreamSource.runFinish.trigger();
}
} catch (Exception e) {
@@ -260,9 +262,9 @@ public class AbstractUdfStreamOperatorLifecycleTest {
}
@Override
- public StreamStateHandle snapshotLegacyOperatorState(long checkpointId, long timestamp) throws Exception {
+ public StreamStateHandle snapshotLegacyOperatorState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws Exception {
ACTUAL_ORDER_TRACKING.add("OPERATOR::snapshotLegacyOperatorState");
- return super.snapshotLegacyOperatorState(checkpointId, timestamp);
+ return super.snapshotLegacyOperatorState(checkpointId, timestamp, checkpointOptions);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java
index b1689f9..ab4258f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java
@@ -184,4 +184,4 @@ public class WrappingFunctionSnapshotRestoreTest {
return value;
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 907f8f1..c4867ff 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -522,7 +523,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, checkpointTimestamp);
- task.triggerCheckpoint(checkpointMetaData);
+ task.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint());
env.getCheckpointLatch().await();
@@ -557,7 +558,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
restoredTaskHarness.processElement(new StreamRecord<>(7, initialTime + 7));
// trigger the checkpoint while processing stream elements
- restoredTask.triggerCheckpoint(new CheckpointMetaData(checkpointId, checkpointTimestamp));
+ restoredTask.triggerCheckpoint(new CheckpointMetaData(checkpointId, checkpointTimestamp), CheckpointOptions.forFullCheckpoint());
restoredTaskHarness.processElement(new StreamRecord<>(8, initialTime + 8));