You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/01/27 13:55:03 UTC

[GitHub] [flink] dawidwys commented on a change in pull request #14765: [FLINK-21151] Extract common full-snapshot writer from RocksDB full-snapshot strategy

dawidwys commented on a change in pull request #14765:
URL: https://github.com/apache/flink/pull/14765#discussion_r565295292



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/FullSnapshotUtil.java
##########
@@ -16,13 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.flink.contrib.streaming.state.snapshot;
+package org.apache.flink.runtime.state;
 
 /**
- * Utility methods and constants around RocksDB creating and restoring snapshots for {@link
- * org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend}.
+ * Utility methods and constants around creating and restoring full snapshots using {@link
+ * FullSnapshotAsyncWriter}.
  */
-public class RocksSnapshotUtil {
+public class FullSnapshotUtil {
 
     /** File suffix of sstable files. */
     public static final String SST_FILE_SUFFIX = ".sst";

Review comment:
       nit: This constant is Rocks specific, isn't it?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/FullSnapshotAsyncWriter.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.SupplierWithException;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Objects;
+
+import static org.apache.flink.runtime.state.FullSnapshotUtil.END_OF_KEY_GROUP_MARK;
+import static org.apache.flink.runtime.state.FullSnapshotUtil.hasMetaDataFollowsFlag;
+import static org.apache.flink.runtime.state.FullSnapshotUtil.setMetaDataFollowsFlagInKey;
+
+/**
+ * An asynchronous writer that can write a full snapshot/savepoint from a {@link
+ * FullSnapshotResources}.
+ *
+ * @param <K> type of the backend keys.
+ */
+public class FullSnapshotAsyncWriter<K>

Review comment:
       How do you feel about renaming it to e.g. `SavepointAsyncWriter`? I think it would make it more prominent that it is the unified savepoint format. Moreover I find the `Full` a bit unclear what it refers to.

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java
##########
@@ -106,7 +118,8 @@ public void next() {
                 detectNewKeyGroup(oldKey);
             }
         } else {
-            IOUtils.closeQuietly(rocksIterator);
+            IOUtils.closeQuietly(currentSubIterator);
+            closeableRegistry.unregisterCloseable(currentSubIterator);

Review comment:
       Usually the pattern is:
   
   ```
               if (closeableRegistry.unregisterCloseable(currentSubIterator)) {
                   IOUtils.closeQuietly(currentSubIterator);
               }
   ```
   BTW, do we even need to unregister it from the registry? I think it won't harm, worst case it will be closed twice (which was the case previously as well).

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBFullSnapshotResources.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.snapshot;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator;
+import org.apache.flink.contrib.streaming.state.iterator.RocksTransformingIteratorWrapper;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.state.FullSnapshotResources;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyValueStateIterator;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.StreamCompressionDecorator;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.ResourceGuard;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Snapshot;
+
+import javax.annotation.Nonnegative;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/** A {@link FullSnapshotResources} for the RocksDB backend. */
+class RocksDBFullSnapshotResources<K> implements FullSnapshotResources<K> {
+    private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
+    private final ResourceGuard.Lease lease;
+    private final Snapshot snapshot;
+    private final RocksDB db;
+    private final List<MetaData> metaData;
+
+    /** Number of bytes in the key-group prefix. */
+    @Nonnegative private final int keyGroupPrefixBytes;
+
+    private final KeyGroupRange keyGroupRange;
+    private final TypeSerializer<K> keySerializer;
+    private final StreamCompressionDecorator streamCompressionDecorator;
+
+    public RocksDBFullSnapshotResources(
+            ResourceGuard.Lease lease,
+            Snapshot snapshot,
+            List<RocksDBKeyedStateBackend.RocksDbKvStateInfo> metaDataCopy,
+            List<StateMetaInfoSnapshot> stateMetaInfoSnapshots,
+            RocksDB db,
+            int keyGroupPrefixBytes,
+            KeyGroupRange keyGroupRange,
+            TypeSerializer<K> keySerializer,
+            StreamCompressionDecorator streamCompressionDecorator) {
+        this.lease = lease;
+        this.snapshot = snapshot;
+        this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
+        this.db = db;
+        this.keyGroupPrefixBytes = keyGroupPrefixBytes;
+        this.keyGroupRange = keyGroupRange;
+        this.keySerializer = keySerializer;
+        this.streamCompressionDecorator = streamCompressionDecorator;
+
+        // we need to to this in the constructor, i.e. in the synchronous part of the snapshot
+        // TODO: better yet, we can do it outside the constructor
+        this.metaData = fillMetaData(metaDataCopy);
+    }
+
+    private List<MetaData> fillMetaData(
+            List<RocksDBKeyedStateBackend.RocksDbKvStateInfo> metaDataCopy) {
+        List<MetaData> metaData = new ArrayList<>(metaDataCopy.size());
+        for (RocksDBKeyedStateBackend.RocksDbKvStateInfo rocksDbKvStateInfo : metaDataCopy) {
+            StateSnapshotTransformer<byte[]> stateSnapshotTransformer = null;
+            if (rocksDbKvStateInfo.metaInfo instanceof RegisteredKeyValueStateBackendMetaInfo) {
+                stateSnapshotTransformer =
+                        ((RegisteredKeyValueStateBackendMetaInfo<?, ?>) rocksDbKvStateInfo.metaInfo)
+                                .getStateSnapshotTransformFactory()
+                                .createForSerializedState()
+                                .orElse(null);
+            }
+            metaData.add(new MetaData(rocksDbKvStateInfo, stateSnapshotTransformer));
+        }
+        return metaData;
+    }
+
+    @Override
+    public KeyValueStateIterator createKVStateIterator() throws IOException {
+        CloseableRegistry closeableRegistry = new CloseableRegistry();
+
+        try {
+            ReadOptions readOptions = new ReadOptions();
+            closeableRegistry.registerCloseable(readOptions::close);
+            readOptions.setSnapshot(snapshot);
+
+            List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators =
+                    createKVStateIterators(readOptions);
+
+            for (Tuple2<RocksIteratorWrapper, Integer> iter : kvStateIterators) {

Review comment:
       nit: We could do that in a single pass when creating iterators.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyValueStateIterator.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+/**
+ * Iterator that over all key-value state entries in a {@link KeyedStateBackend}. For use during
+ * snapshotting.
+ *
+ * <p>This is required to partition all states into contiguous key-groups. The resulting iteration
+ * sequence is ordered by (key-group, kv-state).
+ */
+public interface KeyValueStateIterator extends AutoCloseable {
+
+    /**
+     * Advances the iterator. Should only be called if {@link #isValid()} returned true. Valid flag
+     * can only change after calling {@link #next()}.
+     */
+    void next();
+
+    /** Returns the key-group for the current key. */
+    int keyGroup();
+
+    byte[] key();
+
+    byte[] value();
+
+    /** Returns the Id of the K/V state to which the current key belongs. */
+    int kvStateId();
+
+    /**
+     * Indicates if current key starts a new k/v-state, i.e. belong to a different k/v-state than
+     * it's predecessor.
+     *
+     * @return true iff the current key belong to a different k/v-state than it's predecessor.
+     */
+    boolean isNewKeyValueState();
+
+    /**
+     * Indicates if current key starts a new key-group, i.e. belong to a different key-group than
+     * it's predecessor.
+     *
+     * @return true iff the current key belong to a different key-group than it's predecessor.
+     */
+    boolean isNewKeyGroup();
+
+    /**
+     * Check if the iterator is still valid. Getters like {@link #key()}, {@link #value()}, etc. as
+     * well as {@link #next()} should only be called if valid returned true. Should be checked after
+     * each call to {@link #next()} before accessing iterator state.
+     *
+     * @return True iff this iterator is valid.
+     */
+    boolean isValid();
+
+    @Override
+    void close();

Review comment:
       Do we want to override it without an exception?

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksSingleStateIterator.java
##########
@@ -23,11 +23,13 @@
 
 import javax.annotation.Nonnull;
 
+import java.io.Closeable;
+
 /**
  * Wraps a RocksDB iterator to cache it's current key and assigns an id for the key/value state to
  * the iterator. Used by {@link RocksStatesPerKeyGroupMergeIterator}.
  */
-class RocksSingleStateIterator implements AutoCloseable {
+class RocksSingleStateIterator implements Closeable {

Review comment:
       is this change necessary? The `AutoCloseable` is the newer interface.

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java
##########
@@ -132,10 +145,14 @@ public void next() {
             final RocksIteratorWrapper rocksIterator = rocksIteratorWithKVStateId.f0;
             rocksIterator.seekToFirst();
             if (rocksIterator.isValid()) {
-                iteratorPriorityQueue.offer(
-                        new RocksSingleStateIterator(rocksIterator, rocksIteratorWithKVStateId.f1));
+                RocksSingleStateIterator wrappingIterator =
+                        new RocksSingleStateIterator(rocksIterator, rocksIteratorWithKVStateId.f1);
+                iteratorPriorityQueue.offer(wrappingIterator);
+                closeableRegistry.registerCloseable(wrappingIterator);
+                closeableRegistry.unregisterCloseable(rocksIterator);

Review comment:
       Same as above. Do we care about unregistering here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org