You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/01/26 15:46:56 UTC

[GitHub] [flink] aljoscha opened a new pull request #14765: [FLINK-21151] Extract common full-snapshot writer from RocksDB full-snapshot strategy

aljoscha opened a new pull request #14765:
URL: https://github.com/apache/flink/pull/14765


   As described in FLIP-41, the RocksDB full-snapshot format will serve as the common, unified savepoint format. We need to extract the common parts and make them reusable by other state backends.
   
   The commits themselves have good description of what's going on, it's easiest to follow the commits one by one instead of reviewing the final results.
   
   ## Verifying this change
   
   This is a pure refactoring and should be covered by existing tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
     - The S3 file system connector: no
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14765: [FLINK-21151] Extract common full-snapshot writer from RocksDB full-snapshot strategy

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14765:
URL: https://github.com/apache/flink/pull/14765#issuecomment-767648687


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "52588009ab10795ee7c7e576ad11879760350871",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12523",
       "triggerID" : "52588009ab10795ee7c7e576ad11879760350871",
       "triggerType" : "PUSH"
     }, {
       "hash" : "08c35397ba79eab9e80cfcb2b4d58b7df5da1f1f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12566",
       "triggerID" : "08c35397ba79eab9e80cfcb2b4d58b7df5da1f1f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "31d80dfb565af80b87945d5b717c6ea3d2b8d0ec",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12568",
       "triggerID" : "31d80dfb565af80b87945d5b717c6ea3d2b8d0ec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8146ebc2b9a154ee4550f6a85e564e38f632eca7",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12593",
       "triggerID" : "8146ebc2b9a154ee4550f6a85e564e38f632eca7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c60a8a6704928508e7bedd8d727d3e97840a8dfc",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c60a8a6704928508e7bedd8d727d3e97840a8dfc",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8146ebc2b9a154ee4550f6a85e564e38f632eca7 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12593) 
   * c60a8a6704928508e7bedd8d727d3e97840a8dfc UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14765: [FLINK-21151] Extract common full-snapshot writer from RocksDB full-snapshot strategy

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14765:
URL: https://github.com/apache/flink/pull/14765#issuecomment-767648687


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "52588009ab10795ee7c7e576ad11879760350871",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12523",
       "triggerID" : "52588009ab10795ee7c7e576ad11879760350871",
       "triggerType" : "PUSH"
     }, {
       "hash" : "08c35397ba79eab9e80cfcb2b4d58b7df5da1f1f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12566",
       "triggerID" : "08c35397ba79eab9e80cfcb2b4d58b7df5da1f1f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "31d80dfb565af80b87945d5b717c6ea3d2b8d0ec",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12568",
       "triggerID" : "31d80dfb565af80b87945d5b717c6ea3d2b8d0ec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8146ebc2b9a154ee4550f6a85e564e38f632eca7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12593",
       "triggerID" : "8146ebc2b9a154ee4550f6a85e564e38f632eca7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c60a8a6704928508e7bedd8d727d3e97840a8dfc",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12612",
       "triggerID" : "c60a8a6704928508e7bedd8d727d3e97840a8dfc",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5879d00c301300c2e84f50397ecc8130f36632c2",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5879d00c301300c2e84f50397ecc8130f36632c2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c60a8a6704928508e7bedd8d727d3e97840a8dfc Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12612) 
   * 5879d00c301300c2e84f50397ecc8130f36632c2 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #14765: [FLINK-21151] Extract common full-snapshot writer from RocksDB full-snapshot strategy

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #14765:
URL: https://github.com/apache/flink/pull/14765#discussion_r565295292



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/FullSnapshotUtil.java
##########
@@ -16,13 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.flink.contrib.streaming.state.snapshot;
+package org.apache.flink.runtime.state;
 
 /**
- * Utility methods and constants around RocksDB creating and restoring snapshots for {@link
- * org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend}.
+ * Utility methods and constants around creating and restoring full snapshots using {@link
+ * FullSnapshotAsyncWriter}.
  */
-public class RocksSnapshotUtil {
+public class FullSnapshotUtil {
 
     /** File suffix of sstable files. */
     public static final String SST_FILE_SUFFIX = ".sst";

Review comment:
       nit: This constant is Rocks specific, isn't it?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/FullSnapshotAsyncWriter.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.SupplierWithException;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Objects;
+
+import static org.apache.flink.runtime.state.FullSnapshotUtil.END_OF_KEY_GROUP_MARK;
+import static org.apache.flink.runtime.state.FullSnapshotUtil.hasMetaDataFollowsFlag;
+import static org.apache.flink.runtime.state.FullSnapshotUtil.setMetaDataFollowsFlagInKey;
+
+/**
+ * An asynchronous writer that can write a full snapshot/savepoint from a {@link
+ * FullSnapshotResources}.
+ *
+ * @param <K> type of the backend keys.
+ */
+public class FullSnapshotAsyncWriter<K>

Review comment:
       How do you feel about renaming it to e.g. `SavepointAsyncWriter`? I think it would make it more prominent that it is the unified savepoint format. Moreover I find the `Full` a bit unclear what it refers to.

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java
##########
@@ -106,7 +118,8 @@ public void next() {
                 detectNewKeyGroup(oldKey);
             }
         } else {
-            IOUtils.closeQuietly(rocksIterator);
+            IOUtils.closeQuietly(currentSubIterator);
+            closeableRegistry.unregisterCloseable(currentSubIterator);

Review comment:
       Usually the pattern is:
   
   ```
               if (closeableRegistry.unregisterCloseable(currentSubIterator)) {
                   IOUtils.closeQuietly(currentSubIterator);
               }
   ```
   BTW, do we even need to unregister it from the registry? I think it won't harm, worst case it will be closed twice (which was the case previously as well).

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBFullSnapshotResources.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.snapshot;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator;
+import org.apache.flink.contrib.streaming.state.iterator.RocksTransformingIteratorWrapper;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.state.FullSnapshotResources;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyValueStateIterator;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.StreamCompressionDecorator;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.ResourceGuard;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Snapshot;
+
+import javax.annotation.Nonnegative;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/** A {@link FullSnapshotResources} for the RocksDB backend. */
+class RocksDBFullSnapshotResources<K> implements FullSnapshotResources<K> {
+    private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
+    private final ResourceGuard.Lease lease;
+    private final Snapshot snapshot;
+    private final RocksDB db;
+    private final List<MetaData> metaData;
+
+    /** Number of bytes in the key-group prefix. */
+    @Nonnegative private final int keyGroupPrefixBytes;
+
+    private final KeyGroupRange keyGroupRange;
+    private final TypeSerializer<K> keySerializer;
+    private final StreamCompressionDecorator streamCompressionDecorator;
+
+    public RocksDBFullSnapshotResources(
+            ResourceGuard.Lease lease,
+            Snapshot snapshot,
+            List<RocksDBKeyedStateBackend.RocksDbKvStateInfo> metaDataCopy,
+            List<StateMetaInfoSnapshot> stateMetaInfoSnapshots,
+            RocksDB db,
+            int keyGroupPrefixBytes,
+            KeyGroupRange keyGroupRange,
+            TypeSerializer<K> keySerializer,
+            StreamCompressionDecorator streamCompressionDecorator) {
+        this.lease = lease;
+        this.snapshot = snapshot;
+        this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
+        this.db = db;
+        this.keyGroupPrefixBytes = keyGroupPrefixBytes;
+        this.keyGroupRange = keyGroupRange;
+        this.keySerializer = keySerializer;
+        this.streamCompressionDecorator = streamCompressionDecorator;
+
+        // we need to to this in the constructor, i.e. in the synchronous part of the snapshot
+        // TODO: better yet, we can do it outside the constructor
+        this.metaData = fillMetaData(metaDataCopy);
+    }
+
+    private List<MetaData> fillMetaData(
+            List<RocksDBKeyedStateBackend.RocksDbKvStateInfo> metaDataCopy) {
+        List<MetaData> metaData = new ArrayList<>(metaDataCopy.size());
+        for (RocksDBKeyedStateBackend.RocksDbKvStateInfo rocksDbKvStateInfo : metaDataCopy) {
+            StateSnapshotTransformer<byte[]> stateSnapshotTransformer = null;
+            if (rocksDbKvStateInfo.metaInfo instanceof RegisteredKeyValueStateBackendMetaInfo) {
+                stateSnapshotTransformer =
+                        ((RegisteredKeyValueStateBackendMetaInfo<?, ?>) rocksDbKvStateInfo.metaInfo)
+                                .getStateSnapshotTransformFactory()
+                                .createForSerializedState()
+                                .orElse(null);
+            }
+            metaData.add(new MetaData(rocksDbKvStateInfo, stateSnapshotTransformer));
+        }
+        return metaData;
+    }
+
+    @Override
+    public KeyValueStateIterator createKVStateIterator() throws IOException {
+        CloseableRegistry closeableRegistry = new CloseableRegistry();
+
+        try {
+            ReadOptions readOptions = new ReadOptions();
+            closeableRegistry.registerCloseable(readOptions::close);
+            readOptions.setSnapshot(snapshot);
+
+            List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators =
+                    createKVStateIterators(readOptions);
+
+            for (Tuple2<RocksIteratorWrapper, Integer> iter : kvStateIterators) {

Review comment:
       nit: We could do that in a single pass when creating iterators.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyValueStateIterator.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+/**
+ * Iterator that over all key-value state entries in a {@link KeyedStateBackend}. For use during
+ * snapshotting.
+ *
+ * <p>This is required to partition all states into contiguous key-groups. The resulting iteration
+ * sequence is ordered by (key-group, kv-state).
+ */
+public interface KeyValueStateIterator extends AutoCloseable {
+
+    /**
+     * Advances the iterator. Should only be called if {@link #isValid()} returned true. Valid flag
+     * can only change after calling {@link #next()}.
+     */
+    void next();
+
+    /** Returns the key-group for the current key. */
+    int keyGroup();
+
+    byte[] key();
+
+    byte[] value();
+
+    /** Returns the Id of the K/V state to which the current key belongs. */
+    int kvStateId();
+
+    /**
+     * Indicates if current key starts a new k/v-state, i.e. belong to a different k/v-state than
+     * it's predecessor.
+     *
+     * @return true iff the current key belong to a different k/v-state than it's predecessor.
+     */
+    boolean isNewKeyValueState();
+
+    /**
+     * Indicates if current key starts a new key-group, i.e. belong to a different key-group than
+     * it's predecessor.
+     *
+     * @return true iff the current key belong to a different key-group than it's predecessor.
+     */
+    boolean isNewKeyGroup();
+
+    /**
+     * Check if the iterator is still valid. Getters like {@link #key()}, {@link #value()}, etc. as
+     * well as {@link #next()} should only be called if valid returned true. Should be checked after
+     * each call to {@link #next()} before accessing iterator state.
+     *
+     * @return True iff this iterator is valid.
+     */
+    boolean isValid();
+
+    @Override
+    void close();

Review comment:
       Do we want to override it without an exception?

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksSingleStateIterator.java
##########
@@ -23,11 +23,13 @@
 
 import javax.annotation.Nonnull;
 
+import java.io.Closeable;
+
 /**
  * Wraps a RocksDB iterator to cache it's current key and assigns an id for the key/value state to
  * the iterator. Used by {@link RocksStatesPerKeyGroupMergeIterator}.
  */
-class RocksSingleStateIterator implements AutoCloseable {
+class RocksSingleStateIterator implements Closeable {

Review comment:
       is this change necessary? The `AutoCloseable` is the newer interface.

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java
##########
@@ -132,10 +145,14 @@ public void next() {
             final RocksIteratorWrapper rocksIterator = rocksIteratorWithKVStateId.f0;
             rocksIterator.seekToFirst();
             if (rocksIterator.isValid()) {
-                iteratorPriorityQueue.offer(
-                        new RocksSingleStateIterator(rocksIterator, rocksIteratorWithKVStateId.f1));
+                RocksSingleStateIterator wrappingIterator =
+                        new RocksSingleStateIterator(rocksIterator, rocksIteratorWithKVStateId.f1);
+                iteratorPriorityQueue.offer(wrappingIterator);
+                closeableRegistry.registerCloseable(wrappingIterator);
+                closeableRegistry.unregisterCloseable(rocksIterator);

Review comment:
       Same as above. Do we care about unregistering here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #14765: [FLINK-21151] Extract common full-snapshot writer from RocksDB full-snapshot strategy

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #14765:
URL: https://github.com/apache/flink/pull/14765#discussion_r565899941



##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksSnapshotUtil.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.snapshot;
+
+/**
+ * Utility methods and constants around RocksDB creating and restoring snapshots for {@link
+ * org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend}.
+ */
+public class RocksSnapshotUtil {
+
+    /** File suffix of sstable files. */
+    public static final String SST_FILE_SUFFIX = ".sst";

Review comment:
       Shall we leave just the `SST_FILE_SUFFIX` constant here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #14765: [FLINK-21151] Extract common full-snapshot writer from RocksDB full-snapshot strategy

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #14765:
URL: https://github.com/apache/flink/pull/14765#discussion_r565899313



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/FullSnapshotAsyncWriter.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.SupplierWithException;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Objects;
+
+import static org.apache.flink.runtime.state.FullSnapshotUtil.END_OF_KEY_GROUP_MARK;
+import static org.apache.flink.runtime.state.FullSnapshotUtil.hasMetaDataFollowsFlag;
+import static org.apache.flink.runtime.state.FullSnapshotUtil.setMetaDataFollowsFlagInKey;
+
+/**
+ * An asynchronous writer that can write a full snapshot/savepoint from a {@link
+ * FullSnapshotResources}.
+ *
+ * @param <K> type of the backend keys.
+ */
+public class FullSnapshotAsyncWriter<K>

Review comment:
       Got it. I think it's fine to leave it as is then.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on a change in pull request #14765: [FLINK-21151] Extract common full-snapshot writer from RocksDB full-snapshot strategy

Posted by GitBox <gi...@apache.org>.
aljoscha commented on a change in pull request #14765:
URL: https://github.com/apache/flink/pull/14765#discussion_r565454972



##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksSingleStateIterator.java
##########
@@ -23,11 +23,13 @@
 
 import javax.annotation.Nonnull;
 
+import java.io.Closeable;
+
 /**
  * Wraps a RocksDB iterator to cache it's current key and assigns an id for the key/value state to
  * the iterator. Used by {@link RocksStatesPerKeyGroupMergeIterator}.
  */
-class RocksSingleStateIterator implements AutoCloseable {
+class RocksSingleStateIterator implements Closeable {

Review comment:
       I made it `Closeable` because the `RocksStatesPerKeyGroupMergeIterator` now uses a `CloseableRegistry` which requires things to be `Closeable`. 
   
   Alternatively, I could change `CloseableRegistry` to take `AutoCloseables` but I felt that would be the bigger change. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on a change in pull request #14765: [FLINK-21151] Extract common full-snapshot writer from RocksDB full-snapshot strategy

Posted by GitBox <gi...@apache.org>.
aljoscha commented on a change in pull request #14765:
URL: https://github.com/apache/flink/pull/14765#discussion_r565458656



##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBFullSnapshotResources.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.snapshot;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator;
+import org.apache.flink.contrib.streaming.state.iterator.RocksTransformingIteratorWrapper;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.state.FullSnapshotResources;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyValueStateIterator;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.StreamCompressionDecorator;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.ResourceGuard;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Snapshot;
+
+import javax.annotation.Nonnegative;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/** A {@link FullSnapshotResources} for the RocksDB backend. */
+class RocksDBFullSnapshotResources<K> implements FullSnapshotResources<K> {
+    private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
+    private final ResourceGuard.Lease lease;
+    private final Snapshot snapshot;
+    private final RocksDB db;
+    private final List<MetaData> metaData;
+
+    /** Number of bytes in the key-group prefix. */
+    @Nonnegative private final int keyGroupPrefixBytes;
+
+    private final KeyGroupRange keyGroupRange;
+    private final TypeSerializer<K> keySerializer;
+    private final StreamCompressionDecorator streamCompressionDecorator;
+
+    public RocksDBFullSnapshotResources(
+            ResourceGuard.Lease lease,
+            Snapshot snapshot,
+            List<RocksDBKeyedStateBackend.RocksDbKvStateInfo> metaDataCopy,
+            List<StateMetaInfoSnapshot> stateMetaInfoSnapshots,
+            RocksDB db,
+            int keyGroupPrefixBytes,
+            KeyGroupRange keyGroupRange,
+            TypeSerializer<K> keySerializer,
+            StreamCompressionDecorator streamCompressionDecorator) {
+        this.lease = lease;
+        this.snapshot = snapshot;
+        this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
+        this.db = db;
+        this.keyGroupPrefixBytes = keyGroupPrefixBytes;
+        this.keyGroupRange = keyGroupRange;
+        this.keySerializer = keySerializer;
+        this.streamCompressionDecorator = streamCompressionDecorator;
+
+        // we need to to this in the constructor, i.e. in the synchronous part of the snapshot
+        // TODO: better yet, we can do it outside the constructor
+        this.metaData = fillMetaData(metaDataCopy);
+    }
+
+    private List<MetaData> fillMetaData(
+            List<RocksDBKeyedStateBackend.RocksDbKvStateInfo> metaDataCopy) {
+        List<MetaData> metaData = new ArrayList<>(metaDataCopy.size());
+        for (RocksDBKeyedStateBackend.RocksDbKvStateInfo rocksDbKvStateInfo : metaDataCopy) {
+            StateSnapshotTransformer<byte[]> stateSnapshotTransformer = null;
+            if (rocksDbKvStateInfo.metaInfo instanceof RegisteredKeyValueStateBackendMetaInfo) {
+                stateSnapshotTransformer =
+                        ((RegisteredKeyValueStateBackendMetaInfo<?, ?>) rocksDbKvStateInfo.metaInfo)
+                                .getStateSnapshotTransformFactory()
+                                .createForSerializedState()
+                                .orElse(null);
+            }
+            metaData.add(new MetaData(rocksDbKvStateInfo, stateSnapshotTransformer));
+        }
+        return metaData;
+    }
+
+    @Override
+    public KeyValueStateIterator createKVStateIterator() throws IOException {
+        CloseableRegistry closeableRegistry = new CloseableRegistry();
+
+        try {
+            ReadOptions readOptions = new ReadOptions();
+            closeableRegistry.registerCloseable(readOptions::close);
+            readOptions.setSnapshot(snapshot);
+
+            List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators =
+                    createKVStateIterators(readOptions);
+
+            for (Tuple2<RocksIteratorWrapper, Integer> iter : kvStateIterators) {

Review comment:
       fixing

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/FullSnapshotAsyncWriter.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.SupplierWithException;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Objects;
+
+import static org.apache.flink.runtime.state.FullSnapshotUtil.END_OF_KEY_GROUP_MARK;
+import static org.apache.flink.runtime.state.FullSnapshotUtil.hasMetaDataFollowsFlag;
+import static org.apache.flink.runtime.state.FullSnapshotUtil.setMetaDataFollowsFlagInKey;
+
+/**
+ * An asynchronous writer that can write a full snapshot/savepoint from a {@link
+ * FullSnapshotResources}.
+ *
+ * @param <K> type of the backend keys.
+ */
+public class FullSnapshotAsyncWriter<K>

Review comment:
       Can do, I have two thoughts on it:
    - the `RocksFullSnapshotStrategy` still uses this, and not just for savepoints
    - my latest commit introduces a `SavepointSnapshotStrategy` that uses this. In my head the `FullSnapshotAsyncWriter` is an implementation detail that happens to also be used for savepoints
   
   Agreed that `Full` is not very clear here. Also, I'm not at all set on the naming.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on a change in pull request #14765: [FLINK-21151] Extract common full-snapshot writer from RocksDB full-snapshot strategy

Posted by GitBox <gi...@apache.org>.
aljoscha commented on a change in pull request #14765:
URL: https://github.com/apache/flink/pull/14765#discussion_r565455652



##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java
##########
@@ -106,7 +118,8 @@ public void next() {
                 detectNewKeyGroup(oldKey);
             }
         } else {
-            IOUtils.closeQuietly(rocksIterator);
+            IOUtils.closeQuietly(currentSubIterator);
+            closeableRegistry.unregisterCloseable(currentSubIterator);

Review comment:
       Thanks, I will use this pattern! And yes, it's actually important to unregister, we have tests that verify that exactly the right amount of `close()` calls happen on the Rocks classes. I was surprised myself. 🎊 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14765: [FLINK-21151] Extract common full-snapshot writer from RocksDB full-snapshot strategy

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14765:
URL: https://github.com/apache/flink/pull/14765#issuecomment-767648687


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "52588009ab10795ee7c7e576ad11879760350871",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12523",
       "triggerID" : "52588009ab10795ee7c7e576ad11879760350871",
       "triggerType" : "PUSH"
     }, {
       "hash" : "08c35397ba79eab9e80cfcb2b4d58b7df5da1f1f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12566",
       "triggerID" : "08c35397ba79eab9e80cfcb2b4d58b7df5da1f1f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "31d80dfb565af80b87945d5b717c6ea3d2b8d0ec",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12568",
       "triggerID" : "31d80dfb565af80b87945d5b717c6ea3d2b8d0ec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8146ebc2b9a154ee4550f6a85e564e38f632eca7",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8146ebc2b9a154ee4550f6a85e564e38f632eca7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 31d80dfb565af80b87945d5b717c6ea3d2b8d0ec Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12568) 
   * 8146ebc2b9a154ee4550f6a85e564e38f632eca7 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14765: [FLINK-21151] Extract common full-snapshot writer from RocksDB full-snapshot strategy

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14765:
URL: https://github.com/apache/flink/pull/14765#issuecomment-767648687


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "52588009ab10795ee7c7e576ad11879760350871",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12523",
       "triggerID" : "52588009ab10795ee7c7e576ad11879760350871",
       "triggerType" : "PUSH"
     }, {
       "hash" : "08c35397ba79eab9e80cfcb2b4d58b7df5da1f1f",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "08c35397ba79eab9e80cfcb2b4d58b7df5da1f1f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 52588009ab10795ee7c7e576ad11879760350871 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12523) 
   * 08c35397ba79eab9e80cfcb2b4d58b7df5da1f1f UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14765: [FLINK-21151] Extract common full-snapshot writer from RocksDB full-snapshot strategy

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14765:
URL: https://github.com/apache/flink/pull/14765#issuecomment-767648687


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "52588009ab10795ee7c7e576ad11879760350871",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12523",
       "triggerID" : "52588009ab10795ee7c7e576ad11879760350871",
       "triggerType" : "PUSH"
     }, {
       "hash" : "08c35397ba79eab9e80cfcb2b4d58b7df5da1f1f",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12566",
       "triggerID" : "08c35397ba79eab9e80cfcb2b4d58b7df5da1f1f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "31d80dfb565af80b87945d5b717c6ea3d2b8d0ec",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "31d80dfb565af80b87945d5b717c6ea3d2b8d0ec",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 08c35397ba79eab9e80cfcb2b4d58b7df5da1f1f Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12566) 
   * 31d80dfb565af80b87945d5b717c6ea3d2b8d0ec UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14765: [FLINK-21151] Extract common full-snapshot writer from RocksDB full-snapshot strategy

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14765:
URL: https://github.com/apache/flink/pull/14765#issuecomment-767648687


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "52588009ab10795ee7c7e576ad11879760350871",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12523",
       "triggerID" : "52588009ab10795ee7c7e576ad11879760350871",
       "triggerType" : "PUSH"
     }, {
       "hash" : "08c35397ba79eab9e80cfcb2b4d58b7df5da1f1f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12566",
       "triggerID" : "08c35397ba79eab9e80cfcb2b4d58b7df5da1f1f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "31d80dfb565af80b87945d5b717c6ea3d2b8d0ec",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12568",
       "triggerID" : "31d80dfb565af80b87945d5b717c6ea3d2b8d0ec",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 31d80dfb565af80b87945d5b717c6ea3d2b8d0ec Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12568) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14765: [FLINK-21151] Extract common full-snapshot writer from RocksDB full-snapshot strategy

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14765:
URL: https://github.com/apache/flink/pull/14765#issuecomment-767648687


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "52588009ab10795ee7c7e576ad11879760350871",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12523",
       "triggerID" : "52588009ab10795ee7c7e576ad11879760350871",
       "triggerType" : "PUSH"
     }, {
       "hash" : "08c35397ba79eab9e80cfcb2b4d58b7df5da1f1f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12566",
       "triggerID" : "08c35397ba79eab9e80cfcb2b4d58b7df5da1f1f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "31d80dfb565af80b87945d5b717c6ea3d2b8d0ec",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12568",
       "triggerID" : "31d80dfb565af80b87945d5b717c6ea3d2b8d0ec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8146ebc2b9a154ee4550f6a85e564e38f632eca7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12593",
       "triggerID" : "8146ebc2b9a154ee4550f6a85e564e38f632eca7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c60a8a6704928508e7bedd8d727d3e97840a8dfc",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12612",
       "triggerID" : "c60a8a6704928508e7bedd8d727d3e97840a8dfc",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5879d00c301300c2e84f50397ecc8130f36632c2",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12654",
       "triggerID" : "5879d00c301300c2e84f50397ecc8130f36632c2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c60a8a6704928508e7bedd8d727d3e97840a8dfc Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12612) 
   * 5879d00c301300c2e84f50397ecc8130f36632c2 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12654) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on pull request #14765: [FLINK-21151] Extract common full-snapshot writer from RocksDB full-snapshot strategy

Posted by GitBox <gi...@apache.org>.
aljoscha commented on pull request #14765:
URL: https://github.com/apache/flink/pull/14765#issuecomment-769858222


   merged! Thanks for the reviews, Dawid!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha closed pull request #14765: [FLINK-21151] Extract common full-snapshot writer from RocksDB full-snapshot strategy

Posted by GitBox <gi...@apache.org>.
aljoscha closed pull request #14765:
URL: https://github.com/apache/flink/pull/14765


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14765: [FLINK-21151] Extract common full-snapshot writer from RocksDB full-snapshot strategy

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14765:
URL: https://github.com/apache/flink/pull/14765#issuecomment-767648687


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "52588009ab10795ee7c7e576ad11879760350871",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12523",
       "triggerID" : "52588009ab10795ee7c7e576ad11879760350871",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 52588009ab10795ee7c7e576ad11879760350871 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12523) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on a change in pull request #14765: [FLINK-21151] Extract common full-snapshot writer from RocksDB full-snapshot strategy

Posted by GitBox <gi...@apache.org>.
aljoscha commented on a change in pull request #14765:
URL: https://github.com/apache/flink/pull/14765#discussion_r565457375



##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java
##########
@@ -132,10 +145,14 @@ public void next() {
             final RocksIteratorWrapper rocksIterator = rocksIteratorWithKVStateId.f0;
             rocksIterator.seekToFirst();
             if (rocksIterator.isValid()) {
-                iteratorPriorityQueue.offer(
-                        new RocksSingleStateIterator(rocksIterator, rocksIteratorWithKVStateId.f1));
+                RocksSingleStateIterator wrappingIterator =
+                        new RocksSingleStateIterator(rocksIterator, rocksIteratorWithKVStateId.f1);
+                iteratorPriorityQueue.offer(wrappingIterator);
+                closeableRegistry.registerCloseable(wrappingIterator);
+                closeableRegistry.unregisterCloseable(rocksIterator);

Review comment:
       See answer above.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14765: [FLINK-21151] Extract common full-snapshot writer from RocksDB full-snapshot strategy

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14765:
URL: https://github.com/apache/flink/pull/14765#issuecomment-767648687


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "52588009ab10795ee7c7e576ad11879760350871",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12523",
       "triggerID" : "52588009ab10795ee7c7e576ad11879760350871",
       "triggerType" : "PUSH"
     }, {
       "hash" : "08c35397ba79eab9e80cfcb2b4d58b7df5da1f1f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12566",
       "triggerID" : "08c35397ba79eab9e80cfcb2b4d58b7df5da1f1f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "31d80dfb565af80b87945d5b717c6ea3d2b8d0ec",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12568",
       "triggerID" : "31d80dfb565af80b87945d5b717c6ea3d2b8d0ec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8146ebc2b9a154ee4550f6a85e564e38f632eca7",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12593",
       "triggerID" : "8146ebc2b9a154ee4550f6a85e564e38f632eca7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 31d80dfb565af80b87945d5b717c6ea3d2b8d0ec Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12568) 
   * 8146ebc2b9a154ee4550f6a85e564e38f632eca7 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12593) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on a change in pull request #14765: [FLINK-21151] Extract common full-snapshot writer from RocksDB full-snapshot strategy

Posted by GitBox <gi...@apache.org>.
aljoscha commented on a change in pull request #14765:
URL: https://github.com/apache/flink/pull/14765#discussion_r565454247



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyValueStateIterator.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+/**
+ * Iterator that over all key-value state entries in a {@link KeyedStateBackend}. For use during
+ * snapshotting.
+ *
+ * <p>This is required to partition all states into contiguous key-groups. The resulting iteration
+ * sequence is ordered by (key-group, kv-state).
+ */
+public interface KeyValueStateIterator extends AutoCloseable {
+
+    /**
+     * Advances the iterator. Should only be called if {@link #isValid()} returned true. Valid flag
+     * can only change after calling {@link #next()}.
+     */
+    void next();
+
+    /** Returns the key-group for the current key. */
+    int keyGroup();
+
+    byte[] key();
+
+    byte[] value();
+
+    /** Returns the Id of the K/V state to which the current key belongs. */
+    int kvStateId();
+
+    /**
+     * Indicates if current key starts a new k/v-state, i.e. belong to a different k/v-state than
+     * it's predecessor.
+     *
+     * @return true iff the current key belong to a different k/v-state than it's predecessor.
+     */
+    boolean isNewKeyValueState();
+
+    /**
+     * Indicates if current key starts a new key-group, i.e. belong to a different key-group than
+     * it's predecessor.
+     *
+     * @return true iff the current key belong to a different key-group than it's predecessor.
+     */
+    boolean isNewKeyGroup();
+
+    /**
+     * Check if the iterator is still valid. Getters like {@link #key()}, {@link #value()}, etc. as
+     * well as {@link #next()} should only be called if valid returned true. Should be checked after
+     * each call to {@link #next()} before accessing iterator state.
+     *
+     * @return True iff this iterator is valid.
+     */
+    boolean isValid();
+
+    @Override
+    void close();

Review comment:
       I kept this from `RocksStatesPerKeyGroupMergeIterator`, which also overrode `close()` without exception. And some of the code that uses it expects it not to throw.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14765: [FLINK-21151] Extract common full-snapshot writer from RocksDB full-snapshot strategy

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14765:
URL: https://github.com/apache/flink/pull/14765#issuecomment-767648687


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "52588009ab10795ee7c7e576ad11879760350871",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12523",
       "triggerID" : "52588009ab10795ee7c7e576ad11879760350871",
       "triggerType" : "PUSH"
     }, {
       "hash" : "08c35397ba79eab9e80cfcb2b4d58b7df5da1f1f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12566",
       "triggerID" : "08c35397ba79eab9e80cfcb2b4d58b7df5da1f1f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "31d80dfb565af80b87945d5b717c6ea3d2b8d0ec",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12568",
       "triggerID" : "31d80dfb565af80b87945d5b717c6ea3d2b8d0ec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8146ebc2b9a154ee4550f6a85e564e38f632eca7",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12593",
       "triggerID" : "8146ebc2b9a154ee4550f6a85e564e38f632eca7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c60a8a6704928508e7bedd8d727d3e97840a8dfc",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12612",
       "triggerID" : "c60a8a6704928508e7bedd8d727d3e97840a8dfc",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8146ebc2b9a154ee4550f6a85e564e38f632eca7 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12593) 
   * c60a8a6704928508e7bedd8d727d3e97840a8dfc Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12612) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on a change in pull request #14765: [FLINK-21151] Extract common full-snapshot writer from RocksDB full-snapshot strategy

Posted by GitBox <gi...@apache.org>.
aljoscha commented on a change in pull request #14765:
URL: https://github.com/apache/flink/pull/14765#discussion_r565449964



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/FullSnapshotUtil.java
##########
@@ -16,13 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.flink.contrib.streaming.state.snapshot;
+package org.apache.flink.runtime.state;
 
 /**
- * Utility methods and constants around RocksDB creating and restoring snapshots for {@link
- * org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend}.
+ * Utility methods and constants around creating and restoring full snapshots using {@link
+ * FullSnapshotAsyncWriter}.
  */
-public class RocksSnapshotUtil {
+public class FullSnapshotUtil {
 
     /** File suffix of sstable files. */
     public static final String SST_FILE_SUFFIX = ".sst";

Review comment:
       True, I will have to move some of them back to a Rocks-specific `RocksSnapshotUtil`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #14765: [FLINK-21151] Extract common full-snapshot writer from RocksDB full-snapshot strategy

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #14765:
URL: https://github.com/apache/flink/pull/14765#issuecomment-767635997


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 52588009ab10795ee7c7e576ad11879760350871 (Tue Jan 26 15:50:36 UTC 2021)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #14765: [FLINK-21151] Extract common full-snapshot writer from RocksDB full-snapshot strategy

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #14765:
URL: https://github.com/apache/flink/pull/14765#issuecomment-767648687


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "52588009ab10795ee7c7e576ad11879760350871",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "52588009ab10795ee7c7e576ad11879760350871",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 52588009ab10795ee7c7e576ad11879760350871 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14765: [FLINK-21151] Extract common full-snapshot writer from RocksDB full-snapshot strategy

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14765:
URL: https://github.com/apache/flink/pull/14765#issuecomment-767648687


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "52588009ab10795ee7c7e576ad11879760350871",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12523",
       "triggerID" : "52588009ab10795ee7c7e576ad11879760350871",
       "triggerType" : "PUSH"
     }, {
       "hash" : "08c35397ba79eab9e80cfcb2b4d58b7df5da1f1f",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12566",
       "triggerID" : "08c35397ba79eab9e80cfcb2b4d58b7df5da1f1f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 08c35397ba79eab9e80cfcb2b4d58b7df5da1f1f Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12566) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on a change in pull request #14765: [FLINK-21151] Extract common full-snapshot writer from RocksDB full-snapshot strategy

Posted by GitBox <gi...@apache.org>.
aljoscha commented on a change in pull request #14765:
URL: https://github.com/apache/flink/pull/14765#discussion_r565906863



##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksSnapshotUtil.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.snapshot;
+
+/**
+ * Utility methods and constants around RocksDB creating and restoring snapshots for {@link
+ * org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend}.
+ */
+public class RocksSnapshotUtil {
+
+    /** File suffix of sstable files. */
+    public static final String SST_FILE_SUFFIX = ".sst";

Review comment:
       Ah damn, I meant to remove the other ones, yes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14765: [FLINK-21151] Extract common full-snapshot writer from RocksDB full-snapshot strategy

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14765:
URL: https://github.com/apache/flink/pull/14765#issuecomment-767648687


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "52588009ab10795ee7c7e576ad11879760350871",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12523",
       "triggerID" : "52588009ab10795ee7c7e576ad11879760350871",
       "triggerType" : "PUSH"
     }, {
       "hash" : "08c35397ba79eab9e80cfcb2b4d58b7df5da1f1f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12566",
       "triggerID" : "08c35397ba79eab9e80cfcb2b4d58b7df5da1f1f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "31d80dfb565af80b87945d5b717c6ea3d2b8d0ec",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12568",
       "triggerID" : "31d80dfb565af80b87945d5b717c6ea3d2b8d0ec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8146ebc2b9a154ee4550f6a85e564e38f632eca7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12593",
       "triggerID" : "8146ebc2b9a154ee4550f6a85e564e38f632eca7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c60a8a6704928508e7bedd8d727d3e97840a8dfc",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12612",
       "triggerID" : "c60a8a6704928508e7bedd8d727d3e97840a8dfc",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c60a8a6704928508e7bedd8d727d3e97840a8dfc Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12612) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #14765: [FLINK-21151] Extract common full-snapshot writer from RocksDB full-snapshot strategy

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #14765:
URL: https://github.com/apache/flink/pull/14765#discussion_r565462300



##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksSingleStateIterator.java
##########
@@ -23,11 +23,13 @@
 
 import javax.annotation.Nonnull;
 
+import java.io.Closeable;
+
 /**
  * Wraps a RocksDB iterator to cache it's current key and assigns an id for the key/value state to
  * the iterator. Used by {@link RocksStatesPerKeyGroupMergeIterator}.
  */
-class RocksSingleStateIterator implements AutoCloseable {
+class RocksSingleStateIterator implements Closeable {

Review comment:
       got it, I think that's fine

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksSingleStateIterator.java
##########
@@ -23,11 +23,13 @@
 
 import javax.annotation.Nonnull;
 
+import java.io.Closeable;
+
 /**
  * Wraps a RocksDB iterator to cache it's current key and assigns an id for the key/value state to
  * the iterator. Used by {@link RocksStatesPerKeyGroupMergeIterator}.
  */
-class RocksSingleStateIterator implements AutoCloseable {
+class RocksSingleStateIterator implements Closeable {

Review comment:
       got it, I think that's fine to use Closeable for now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14765: [FLINK-21151] Extract common full-snapshot writer from RocksDB full-snapshot strategy

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14765:
URL: https://github.com/apache/flink/pull/14765#issuecomment-767648687


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "52588009ab10795ee7c7e576ad11879760350871",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12523",
       "triggerID" : "52588009ab10795ee7c7e576ad11879760350871",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 52588009ab10795ee7c7e576ad11879760350871 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12523) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14765: [FLINK-21151] Extract common full-snapshot writer from RocksDB full-snapshot strategy

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14765:
URL: https://github.com/apache/flink/pull/14765#issuecomment-767648687


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "52588009ab10795ee7c7e576ad11879760350871",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12523",
       "triggerID" : "52588009ab10795ee7c7e576ad11879760350871",
       "triggerType" : "PUSH"
     }, {
       "hash" : "08c35397ba79eab9e80cfcb2b4d58b7df5da1f1f",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12566",
       "triggerID" : "08c35397ba79eab9e80cfcb2b4d58b7df5da1f1f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "31d80dfb565af80b87945d5b717c6ea3d2b8d0ec",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12568",
       "triggerID" : "31d80dfb565af80b87945d5b717c6ea3d2b8d0ec",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 08c35397ba79eab9e80cfcb2b4d58b7df5da1f1f Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12566) 
   * 31d80dfb565af80b87945d5b717c6ea3d2b8d0ec Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12568) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14765: [FLINK-21151] Extract common full-snapshot writer from RocksDB full-snapshot strategy

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14765:
URL: https://github.com/apache/flink/pull/14765#issuecomment-767648687


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "52588009ab10795ee7c7e576ad11879760350871",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12523",
       "triggerID" : "52588009ab10795ee7c7e576ad11879760350871",
       "triggerType" : "PUSH"
     }, {
       "hash" : "08c35397ba79eab9e80cfcb2b4d58b7df5da1f1f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12566",
       "triggerID" : "08c35397ba79eab9e80cfcb2b4d58b7df5da1f1f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "31d80dfb565af80b87945d5b717c6ea3d2b8d0ec",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12568",
       "triggerID" : "31d80dfb565af80b87945d5b717c6ea3d2b8d0ec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8146ebc2b9a154ee4550f6a85e564e38f632eca7",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12593",
       "triggerID" : "8146ebc2b9a154ee4550f6a85e564e38f632eca7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8146ebc2b9a154ee4550f6a85e564e38f632eca7 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12593) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org