You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/02/26 08:18:25 UTC

[flink] branch release-1.11 updated: [FLINK-21486] Throw exception when restoring Rocks timers with Heap timers enabled

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new c6b10f6  [FLINK-21486] Throw exception when restoring Rocks timers with Heap timers enabled
c6b10f6 is described below

commit c6b10f60db09852ca5126b2ce9f71aa17ce207d8
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Mon Feb 22 16:49:43 2021 +0100

    [FLINK-21486] Throw exception when restoring Rocks timers with Heap timers enabled
    
    This closes #15022
---
 .../flink-statebackend-rocksdb/pom.xml             |  19 ++-
 .../state/RocksDBKeyedStateBackendBuilder.java     |  11 +-
 .../streaming/state/restore/PriorityQueueFlag.java |  29 +++++
 .../state/restore/RocksDBFullRestoreOperation.java |  13 +-
 .../RocksDBIncrementalRestoreOperation.java        |  16 ++-
 .../state/HeapTimersSnapshottingTest.java          | 135 +++++++++++++++++++++
 .../util/AbstractStreamOperatorTestHarness.java    |  16 ++-
 7 files changed, 228 insertions(+), 11 deletions(-)

diff --git a/flink-state-backends/flink-statebackend-rocksdb/pom.xml b/flink-state-backends/flink-statebackend-rocksdb/pom.xml
index 1ae62e6..1bdd5cf 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/pom.xml
+++ b/flink-state-backends/flink-statebackend-rocksdb/pom.xml
@@ -19,8 +19,8 @@ under the License.
 
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
 
 	<modelVersion>4.0.0</modelVersion>
 
@@ -81,6 +81,13 @@ under the License.
 			<type>test-jar</type>
 			<scope>test</scope>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
 	</dependencies>
 
 	<!-- build a test jar -->
@@ -96,8 +103,12 @@ under the License.
 						</goals>
 						<configuration>
 							<includes>
-								<include>**/org/apache/flink/contrib/streaming/state/RocksDBTestUtils*</include>
-								<include>**/org/apache/flink/contrib/streaming/state/benchmark/StateBackendBenchmarkUtils*</include>
+								<include>
+									**/org/apache/flink/contrib/streaming/state/RocksDBTestUtils*
+								</include>
+								<include>
+									**/org/apache/flink/contrib/streaming/state/benchmark/StateBackendBenchmarkUtils*
+								</include>
 								<include>META-INF/LICENSE</include>
 								<include>META-INF/NOTICE</include>
 							</includes>
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index 83faeb3..1e101e8 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation;
+import org.apache.flink.contrib.streaming.state.restore.PriorityQueueFlag;
 import org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation;
 import org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation;
 import org.apache.flink.contrib.streaming.state.restore.RocksDBNoneRestoreOperation;
@@ -419,6 +420,10 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
                     ttlCompactFiltersManager);
         }
         KeyedStateHandle firstStateHandle = restoreStateHandles.iterator().next();
+        PriorityQueueFlag queueRestoreEnabled =
+                priorityQueueStateType == RocksDBStateBackend.PriorityQueueStateType.HEAP
+                        ? PriorityQueueFlag.THROW_ON_PRIORITY_QUEUE
+                        : PriorityQueueFlag.RESTORE_PRIORITY_QUEUE;
         if (firstStateHandle instanceof IncrementalKeyedStateHandle) {
             return new RocksDBIncrementalRestoreOperation<>(
                     operatorIdentifier,
@@ -437,7 +442,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
                     metricGroup,
                     restoreStateHandles,
                     ttlCompactFiltersManager,
-                    writeBatchSize);
+                    writeBatchSize,
+                    queueRestoreEnabled);
         } else {
             return new RocksDBFullRestoreOperation<>(
                     keyGroupRange,
@@ -455,7 +461,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
                     metricGroup,
                     restoreStateHandles,
                     ttlCompactFiltersManager,
-                    writeBatchSize);
+                    writeBatchSize,
+                    queueRestoreEnabled);
         }
     }
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/PriorityQueueFlag.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/PriorityQueueFlag.java
new file mode 100644
index 0000000..3fb2fa1
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/PriorityQueueFlag.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.restore;
+
+/**
+ * Flag indicating what should be done when a priority queue is spotted when restoring a RocksDB
+ * snapshot. Prior to Flink 1.13 it was not possible to change from Rocks timers to Heap timers. In
+ * this case we should throw an exception.
+ */
+public enum PriorityQueueFlag {
+    THROW_ON_PRIORITY_QUEUE,
+    RESTORE_PRIORITY_QUEUE
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
index 00163bc..a1d13cf 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.state.StateSerializerProvider;
 import org.apache.flink.runtime.state.StreamCompressionDecorator;
 import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot.BackendStateType;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StateMigrationException;
@@ -86,6 +87,8 @@ public class RocksDBFullRestoreOperation<K> extends AbstractRocksDBRestoreOperat
     /** Write batch size used in {@link RocksDBWriteBatchWrapper}. */
     private final long writeBatchSize;
 
+    private final PriorityQueueFlag queueRestoreEnabled;
+
     public RocksDBFullRestoreOperation(
             KeyGroupRange keyGroupRange,
             int keyGroupPrefixBytes,
@@ -102,7 +105,8 @@ public class RocksDBFullRestoreOperation<K> extends AbstractRocksDBRestoreOperat
             MetricGroup metricGroup,
             @Nonnull Collection<KeyedStateHandle> restoreStateHandles,
             @Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
-            @Nonnegative long writeBatchSize) {
+            @Nonnegative long writeBatchSize,
+            PriorityQueueFlag queueRestoreEnabled) {
         super(
                 keyGroupRange,
                 keyGroupPrefixBytes,
@@ -121,6 +125,7 @@ public class RocksDBFullRestoreOperation<K> extends AbstractRocksDBRestoreOperat
                 ttlCompactFiltersManager);
         checkArgument(writeBatchSize >= 0, "Write batch size have to be no negative.");
         this.writeBatchSize = writeBatchSize;
+        this.queueRestoreEnabled = queueRestoreEnabled;
     }
 
     /** Restores all key-groups data that is referenced by the passed state handles. */
@@ -181,6 +186,12 @@ public class RocksDBFullRestoreOperation<K> extends AbstractRocksDBRestoreOperat
         currentStateHandleKVStateColumnFamilies = new ArrayList<>(restoredMetaInfos.size());
 
         for (StateMetaInfoSnapshot restoredMetaInfo : restoredMetaInfos) {
+            if (restoredMetaInfo.getBackendStateType() == BackendStateType.PRIORITY_QUEUE
+                    && queueRestoreEnabled == PriorityQueueFlag.THROW_ON_PRIORITY_QUEUE) {
+                throw new StateMigrationException(
+                        "Can not restore savepoint taken with RocksDB timers enabled with Heap timers!");
+            }
+
             RocksDbKvStateInfo registeredStateCFHandle =
                     getOrRegisterStateColumnFamilyHandle(null, restoredMetaInfo);
             currentStateHandleKVStateColumnFamilies.add(registeredStateCFHandle.columnFamilyHandle);
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
index f755940..9380bc1 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
@@ -46,6 +46,7 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.StateMigrationException;
 
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
@@ -88,6 +89,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
 
     private final String operatorIdentifier;
     private final SortedMap<Long, Set<StateHandleID>> restoredSstFiles;
+    private final PriorityQueueFlag queueRestoreEnabled;
     private long lastCompletedCheckpointId;
     private UUID backendUID;
     private final long writeBatchSize;
@@ -109,7 +111,8 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
             MetricGroup metricGroup,
             @Nonnull Collection<KeyedStateHandle> restoreStateHandles,
             @Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
-            @Nonnegative long writeBatchSize) {
+            @Nonnegative long writeBatchSize,
+            PriorityQueueFlag queueRestoreEnabled) {
         super(
                 keyGroupRange,
                 keyGroupPrefixBytes,
@@ -130,6 +133,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
         this.restoredSstFiles = new TreeMap<>();
         this.lastCompletedCheckpointId = -1L;
         this.backendUID = UUID.randomUUID();
+        this.queueRestoreEnabled = queueRestoreEnabled;
         checkArgument(writeBatchSize >= 0, "Write batch size have to be no negative.");
         this.writeBatchSize = writeBatchSize;
     }
@@ -483,12 +487,20 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
      * meta data snapshot.
      */
     private List<ColumnFamilyDescriptor> createAndRegisterColumnFamilyDescriptors(
-            List<StateMetaInfoSnapshot> stateMetaInfoSnapshots, boolean registerTtlCompactFilter) {
+            List<StateMetaInfoSnapshot> stateMetaInfoSnapshots, boolean registerTtlCompactFilter)
+            throws StateMigrationException {
 
         List<ColumnFamilyDescriptor> columnFamilyDescriptors =
                 new ArrayList<>(stateMetaInfoSnapshots.size());
 
         for (StateMetaInfoSnapshot stateMetaInfoSnapshot : stateMetaInfoSnapshots) {
+            if (stateMetaInfoSnapshot.getBackendStateType()
+                            == StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE
+                    && queueRestoreEnabled == PriorityQueueFlag.THROW_ON_PRIORITY_QUEUE) {
+                throw new StateMigrationException(
+                        "Can not restore savepoint taken with RocksDB timers enabled with Heap timers!");
+            }
+
             RegisteredStateMetaInfoBase metaInfoBase =
                     RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot);
             ColumnFamilyDescriptor columnFamilyDescriptor =
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/HeapTimersSnapshottingTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/HeapTimersSnapshottingTest.java
new file mode 100644
index 0000000..5f81a57
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/HeapTimersSnapshottingTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend.PriorityQueueStateType;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.StateMigrationException;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.flink.util.CoreMatchers.containsCause;
+
+/**
+ * The RocksDB based timers are ignored when restored with RocksDB and heap timers enabled. These
+ * tests verify that an exception is thrown when users try to perform that incompatible migration.
+ */
+public class HeapTimersSnapshottingTest {
+
+    @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+    @Rule public ExpectedException thrown = ExpectedException.none();
+
+    @Test
+    public void testThrowExceptionWhenRestoringRocksTimersWithHeapTimers() throws Exception {
+        OperatorSubtaskState state;
+        try (KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
+                getTestHarness()) {
+            RocksDBStateBackend backend =
+                    new RocksDBStateBackend(temporaryFolder.newFolder().toURI());
+            backend.setPriorityQueueStateType(PriorityQueueStateType.ROCKSDB);
+            testHarness.setStateBackend(backend);
+            testHarness.open();
+            testHarness.processElement(0, 0L);
+
+            state =
+                    testHarness
+                            .snapshotWithLocalState(0L, 1L, CheckpointType.SAVEPOINT)
+                            .getJobManagerOwnedState();
+        }
+
+        try (KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
+                getTestHarness()) {
+            RocksDBStateBackend backend =
+                    new RocksDBStateBackend(temporaryFolder.newFolder().toURI());
+            backend.setPriorityQueueStateType(PriorityQueueStateType.HEAP);
+            testHarness.setStateBackend(backend);
+
+            thrown.expect(
+                    containsCause(
+                            new StateMigrationException(
+                                    "Can not restore savepoint taken with RocksDB timers enabled"
+                                            + " with Heap timers!")));
+            testHarness.initializeState(state);
+        }
+    }
+
+    @Test
+    public void testThrowExceptionWhenRestoringRocksTimersWithHeapTimersIncrementalCheckpoints()
+            throws Exception {
+        OperatorSubtaskState state;
+        try (KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
+                getTestHarness()) {
+            RocksDBStateBackend backend =
+                    new RocksDBStateBackend(temporaryFolder.newFolder().toURI(), true);
+            backend.setPriorityQueueStateType(PriorityQueueStateType.ROCKSDB);
+            testHarness.setStateBackend(backend);
+            testHarness.open();
+            testHarness.processElement(0, 0L);
+
+            state =
+                    testHarness
+                            .snapshotWithLocalState(0L, 1L, CheckpointType.SAVEPOINT)
+                            .getJobManagerOwnedState();
+        }
+
+        try (KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
+                getTestHarness()) {
+            RocksDBStateBackend backend =
+                    new RocksDBStateBackend(temporaryFolder.newFolder().toURI(), true);
+            backend.setPriorityQueueStateType(PriorityQueueStateType.HEAP);
+            testHarness.setStateBackend(backend);
+
+            thrown.expect(
+                    containsCause(
+                            new StateMigrationException(
+                                    "Can not restore savepoint taken with RocksDB timers enabled"
+                                            + " with Heap timers!")));
+            testHarness.initializeState(state);
+        }
+    }
+
+    private KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> getTestHarness()
+            throws Exception {
+        return new KeyedOneInputStreamOperatorTestHarness<>(
+                new KeyedProcessOperator<>(
+                        new KeyedProcessFunction<Integer, Integer, Integer>() {
+                            @Override
+                            public void processElement(
+                                    Integer value, Context ctx, Collector<Integer> out) {
+                                ctx.timerService().registerEventTimeTimer(0L);
+                            }
+                        }),
+                (KeySelector<Integer, Integer>) value -> value,
+                BasicTypeInfo.INT_TYPE_INFO,
+                1,
+                1,
+                0);
+    }
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 641a7d4..7d10af8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.OperatorStateRepartitioner;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.RoundRobinOperatorStateRepartitioner;
@@ -594,14 +595,25 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
      */
     public OperatorSnapshotFinalizer snapshotWithLocalState(long checkpointId, long timestamp)
             throws Exception {
+        return snapshotWithLocalState(checkpointId, timestamp, CheckpointType.CHECKPOINT);
+    }
+
+    /**
+     * Calls {@link StreamOperator#snapshotState(long, long, CheckpointOptions,
+     * org.apache.flink.runtime.state.CheckpointStreamFactory)}.
+     */
+    public OperatorSnapshotFinalizer snapshotWithLocalState(
+            long checkpointId, long timestamp, CheckpointType checkpointType) throws Exception {
 
+        CheckpointStorageLocationReference locationReference =
+                CheckpointStorageLocationReference.getDefault();
         OperatorSnapshotFutures operatorStateResult =
                 operator.snapshotState(
                         checkpointId,
                         timestamp,
-                        CheckpointOptions.forCheckpointWithDefaultLocation(),
+                        new CheckpointOptions(checkpointType, locationReference),
                         checkpointStorage.resolveCheckpointStorageLocation(
-                                checkpointId, CheckpointStorageLocationReference.getDefault()));
+                                checkpointId, locationReference));
 
         return new OperatorSnapshotFinalizer(operatorStateResult);
     }