You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@zookeeper.apache.org by GitBox <gi...@apache.org> on 2020/10/07 23:36:32 UTC

[GitHub] [zookeeper] muse-dev[bot] commented on a change in pull request #1491: ZOOKEEPER-3964: Introduce RocksDB snap and implement change data capture to enable incremental snapshot

muse-dev[bot] commented on a change in pull request #1491:
URL: https://github.com/apache/zookeeper/pull/1491#discussion_r501368906



##########
File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/RocksDBSnap.java
##########
@@ -0,0 +1,547 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.persistence;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.Index;
+import org.apache.zookeeper.common.Time;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.server.DataNode;
+import org.apache.zookeeper.server.DataTree;
+import org.apache.zookeeper.server.DataTree.ZxidDigest;
+import org.apache.zookeeper.server.ReferenceCountedACLCache;
+import org.apache.zookeeper.server.ServerMetrics;
+import org.apache.zookeeper.server.TransactionChangeRecord;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements the snapshot interface.
+ * It is responsible for storing, serializing
+ * and deserializing the right snapshot in RocksDB,
+ * and provides access to the snapshots.
+ */
+public class RocksDBSnap implements SnapShot {
+    File snapDir;
+    RocksDB db;
+    Options options;
+    WriteOptions writeOpts;
+    RocksIterator rocksIterator;
+
+    private volatile boolean close = false;
+
+    private static final boolean SYNC_WRITE = false;
+    private static final boolean DISABLE_WAL = true;
+
+    //VisibleForTesting
+    public static final String ROCKSDB_WRITE_BUFFER_SIZE = "zookeeper.rocksdbWriteBufferSize";
+
+    private static final int PREFIX_STARTING_INDEX = 0;
+    private static final int PREFIX_ENDING_INDEX = 3;
+
+    private static final String SESSION_KEY_PREFIX = "S::";
+    private static final String DATATREE_KEY_PREFIX = "T::";
+    private static final String ACL_KEY_PREFIX = "A::";
+
+    private static final String ZXID_KEY = "Zxid";
+    private static final String ZXIDDIGEST_KEY = "ZxidDigest";
+
+    private static final long DEFAULT_ROCKSDB_WRITE_BUFFER_SIZE = 4096 * 1024 * 1024;
+
+    private static final Logger LOG = LoggerFactory.getLogger(RocksDBSnap.class);
+
+    /**
+     * The constructor which takes the snapDir. This class is instantiated
+     * via SnapshotFactory
+     *
+     * @param snapDir the snapshot directory
+     */
+    public RocksDBSnap(File snapDir) throws IOException {
+        RocksDB.loadLibrary();
+        if (snapDir == null) {
+            throw new IllegalArgumentException("Snap Directory can't be null!");
+        }
+
+        this.snapDir = snapDir;
+
+        long rocksdbWriteBufferSize = Long.getLong(
+                ROCKSDB_WRITE_BUFFER_SIZE, DEFAULT_ROCKSDB_WRITE_BUFFER_SIZE);
+        this.options = new Options()
+                .setCreateIfMissing(true)
+                .setCreateMissingColumnFamilies(true)
+                .setDbWriteBufferSize(rocksdbWriteBufferSize);
+
+        try {
+            this.db = RocksDB.open(options, snapDir.getAbsolutePath());
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to open RocksDB. " + "error: " + e.getMessage(), e);
+        }
+        // setting Sync = true and DisableWAL = true will lead to writes failing
+        // and throwing an exception. So we set Sync = false here and let RocksDB
+        // flush after serialization.
+        this.writeOpts = new WriteOptions().setSync(SYNC_WRITE).setDisableWAL(DISABLE_WAL);
+    }
+
+    // VisibleForTesting
+    public void initializeIterator() {
+        rocksIterator = db.newIterator();
+        rocksIterator.seekToFirst();
+    }
+
+    // VisibleForTesting
+    public void closeIterator() {
+        rocksIterator.close();
+    }
+
+    public long deserialize(DataTree dt, Map<Long, Integer> sessions)
+            throws IOException {
+        File[] files = snapDir.listFiles();
+        if (files == null || files.length == 0) {
+            LOG.info("No snapshot found in {}", snapDir.getName());
+            return -1L;
+        }
+        long lastProcessedZxid;
+        long start = Time.currentElapsedTime();
+        try {
+            byte[] zxidBytes = db.get(ZXID_KEY.getBytes(StandardCharsets.UTF_8));
+            if (zxidBytes == null) {
+                // We didn't find zxid infomation in RocksDB, which means
+                // there is no RocksDB snapshot in the snapDir.
+                LOG.info("No snapshot found in {}", snapDir.getName());
+                return -1L;
+            }
+            lastProcessedZxid = Long.parseLong(new String(zxidBytes, StandardCharsets.UTF_8));
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to deserialize last processed zxid in RocksDB. " + "error: " + e.getMessage(), e);
+        }
+        LOG.info("RocksDB: Reading snapshot 0x{} from {}", Long.toHexString(lastProcessedZxid), snapDir);
+        dt.lastProcessedZxid = lastProcessedZxid;
+
+        rocksIterator = db.newIterator();
+        rocksIterator.seekToFirst();
+        ByteArrayInputStream bais;
+        BinaryInputArchive bia;
+        while (rocksIterator.isValid()) {
+            String key = new String(rocksIterator.key(), StandardCharsets.UTF_8);
+            String prefix = key.substring(PREFIX_STARTING_INDEX, PREFIX_ENDING_INDEX);
+            switch (prefix) {
+                case SESSION_KEY_PREFIX:
+                    deserializeSessions(sessions);
+                    break;
+                case ACL_KEY_PREFIX:
+                    deserializeACL(dt.getReferenceCountedAclCache());
+                    break;
+                case DATATREE_KEY_PREFIX:
+                    dt.deserialize(this, "tree");
+                    break;
+                default:
+                    // last processed zxid or zxid digest
+                    rocksIterator.next();
+                    break;
+            }
+        }
+        rocksIterator.close();
+
+        deserializeZxidDigest(dt);
+        if (dt.getDigestFromLoadedSnapshot() != null) {
+            dt.compareSnapshotDigests(lastProcessedZxid);
+        }
+        long elapsed = Time.currentElapsedTime() - start;
+        LOG.info("RocksDBSnap deserialization takes " + elapsed + " ms");
+        ServerMetrics.getMetrics().ROCKSDB_SNAPSHOT_DESERIALIZATION_TIME.add(elapsed);
+        return lastProcessedZxid;
+    }
+
+    public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions,
+                                       long lastZxid, boolean fsync) throws IOException {
+        if (close) {
+            return;
+        }
+        if (fsync) {
+            // take a full snapshot when snap sync with the leader
+
+            // close RocksDB for cleaning up the old snapshot,
+            // because destroyDB will fail if the RocksDB is open and locked
+            db.close();
+            // clean up the old snapshot
+            try {
+                RocksDB.destroyDB(snapDir.getAbsolutePath(), options);
+            } catch (RocksDBException e) {
+                throw new IOException("Failed to clean old data in RocksDB files: " + "error: " + e.getMessage(), e);
+            }
+            // re-open RocksDB for taking a new snapshot
+            try {
+                db = RocksDB.open(options, snapDir.getAbsolutePath());
+            } catch (RocksDBException e) {
+                throw new IOException("Failed to open RocksDB. " + "error: " + e.getMessage(), e);
+            }
+
+            LOG.info("RocksDB: Snapshotting: 0x{} to {}", Long.toHexString(lastZxid), snapDir);
+
+            updateLastProcessedZxid(lastZxid, null);
+            serializeSessions(sessions);
+            serializeACL(dt.getReferenceCountedAclCache());
+            dt.serialize(this, "tree");
+            serializeZxidDigest(dt);
+        }
+        flush();
+    }
+
+    public void flush() throws IOException {
+        long start = Time.currentElapsedTime();
+        try (final FlushOptions flushOptions = new FlushOptions().setWaitForFlush(true)) {
+            db.flush(flushOptions);
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to flush in RocksDB: " + "error: " + e.getMessage(), e);
+        }
+        long elapsed = Time.currentElapsedTime() - start;
+        ServerMetrics.getMetrics().ROCKSDB_FLUSH_TIME.add(elapsed);
+    }
+
+    public File findMostRecentSnapshot() throws IOException {
+        // In RocksDB, we always apply transactions to the current snapshot.
+        // So we only keep one single folder for the RocksDB snapshot. If
+        // this snapshot cannot be loaded because of corrupted data, we will
+        // sync with leader to get the latest data. Keeping multiple snapshots
+        // won't help here, since we still need to take snapshot syncing with
+        // the old snapshot, that's why we only keep one here.
+        return snapDir;
+    }
+
+    @SuppressWarnings("unchecked")
+    public void applyTxn(List<TransactionChangeRecord> changeList, long zxid) throws IOException {
+        // We use RocksDB WriteBatch to make atomic updates.
+        // We didn't let applying client's requests wait until flush finished because
+        // flushing 200MB of data in memtables takes nearly 1.5 seconds, flushing 500MB
+        // of data takes 4.5 seconds, and flushing 1GB of data takes more than 10 seconds.
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            // update sessions, ACL, DataTree and ZxidDigest in RocksDB
+            for (int i = 0; i < changeList.size(); i++) {
+                TransactionChangeRecord change = changeList.get(i);
+                switch (change.getType()) {
+                    case TransactionChangeRecord.DATANODE:
+                        String path = (String) change.getKey();
+                        DataNode node = (DataNode) change.getValue();
+                        String operation = change.getOperation();
+                        if (operation.equals(TransactionChangeRecord.ADD)
+                            || operation.equals(TransactionChangeRecord.UPDATE)) {
+                            addNode(path, node, writeBatch);
+                        } else {
+                            removeNode(path, writeBatch);
+                        }
+                        break;
+                    case TransactionChangeRecord.ACL:
+                        Long index = (Long) change.getKey();
+                        List<ACL> aclList = (List<ACL>) change.getValue();
+                        if (change.getOperation().equals(TransactionChangeRecord.ADD)) {
+                            addACLKeyValue(index, aclList, writeBatch);
+                        } else {
+                            removeACLKeyValue(index, writeBatch);
+                        }
+                        break;
+                    case TransactionChangeRecord.SESSION:
+                        Long id = (Long) change.getKey();
+                        Integer timeout = (Integer) change.getValue();
+                        if (change.getOperation().equals(TransactionChangeRecord.ADD)) {
+                            addSessionKeyValue(id, timeout, writeBatch);
+                        } else {
+                            removeSessionKeyValue(id, writeBatch);
+                        }
+                        break;
+                    case TransactionChangeRecord.ZXIDDIGEST:
+                        ZxidDigest zxidDigest = (ZxidDigest) change.getValue();
+                        if (change.getOperation().equals(TransactionChangeRecord.UPDATE)) {
+                            updateZxidDigest(zxidDigest, writeBatch);
+                        }
+                        break;
+                    default:
+                        LOG.warn("Unknown TransactionChangeRecord type {}", change);
+                        break;
+                }
+            }
+            // update the zxid in RocksDB
+            updateLastProcessedZxid(zxid, writeBatch);
+
+            // even if RocksDB's memTable is auto flushed, we always have consistent data and zxid digest.
+            db.write(writeOpts, writeBatch);
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to apply txns to RocksDB " + "error: " + e.getMessage(), e);
+        }
+    }
+
+    private void updateLastProcessedZxid(long zxid, WriteBatch writeBatch) throws IOException {
+        try {
+            if (writeBatch != null) {
+                writeBatch.put(ZXID_KEY.getBytes(StandardCharsets.UTF_8),
+                        Long.toString(zxid).getBytes(StandardCharsets.UTF_8));
+            } else {
+                db.put(writeOpts, ZXID_KEY.getBytes(StandardCharsets.UTF_8),
+                        Long.toString(zxid).getBytes(StandardCharsets.UTF_8));
+            }
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to serialize last processed zxid in RocksDB. "
+                    + "error: " + e.getMessage(), e);
+        }
+    }
+
+    public void serializeSessions(Map<Long, Integer> sessions) throws IOException {
+        HashMap<Long, Integer> sessSnap = new HashMap<Long, Integer>(sessions);
+        for (Entry<Long, Integer> entry : sessSnap.entrySet()) {
+            addSessionKeyValue(entry.getKey(), entry.getValue(), null);
+        }
+    }
+
+    private void addSessionKeyValue(Long id, Integer timeout, WriteBatch writeBatch) throws IOException {
+        try {
+            String key = SESSION_KEY_PREFIX + id;
+            if (writeBatch != null) {
+                writeBatch.put(key.getBytes(StandardCharsets.UTF_8),
+                        timeout.toString().getBytes(StandardCharsets.UTF_8));
+            } else {
+                db.put(writeOpts, key.getBytes(StandardCharsets.UTF_8),
+                        timeout.toString().getBytes(StandardCharsets.UTF_8));
+            }
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to serialize sessions in RocksDB " + "error: " + e.getMessage(), e);
+        }
+    }
+
+    private void removeSessionKeyValue(Long id, WriteBatch writeBatch) throws IOException {
+        try {
+            String key = SESSION_KEY_PREFIX + id;
+            writeBatch.delete(key.getBytes(StandardCharsets.UTF_8));
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to delete the session in RocksDB " + "error: " + e.getMessage(), e);
+        }
+    }
+
+    public void deserializeSessions(Map<Long, Integer> sessions) throws IOException {
+        while (rocksIterator.isValid()) {
+            String key = new String(rocksIterator.key(), StandardCharsets.UTF_8);
+            if (!key.startsWith(SESSION_KEY_PREFIX)) {
+                break;
+            }
+            key = key.substring(PREFIX_ENDING_INDEX);
+            long id = Long.parseLong(key);
+            int to = Integer.parseInt(new String(rocksIterator.value(), StandardCharsets.UTF_8));
+            sessions.put(id, to);
+            rocksIterator.next();
+        }
+    }
+
+    public synchronized void serializeACL(ReferenceCountedACLCache aclCache) throws IOException {
+        Set<Map.Entry<Long, List<ACL>>> set = aclCache.getLongKeyMap().entrySet();
+        for (Map.Entry<Long, List<ACL>> val : set) {
+            addACLKeyValue(val.getKey(), val.getValue(), null);
+        }
+    }
+
+    private void addACLKeyValue(Long index, List<ACL> aclList, WriteBatch writeBatch) throws IOException {
+        String key = ACL_KEY_PREFIX + index;
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
+        boa.startVector(aclList, "acls");
+        for (ACL acl : aclList) {
+            acl.serialize(boa, "acl");
+        }
+        boa.endVector(aclList, "acls");
+        ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
+        try {
+            if (writeBatch != null) {
+                writeBatch.put(key.getBytes(StandardCharsets.UTF_8), bb.array());
+            } else {
+                db.put(writeOpts, key.getBytes(StandardCharsets.UTF_8), bb.array());
+            }
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to serialize ACL lists in RocksDB " + "error: " + e.getMessage(), e);
+        }
+    }
+
+    private void removeACLKeyValue(Long index, WriteBatch writeBatch) throws IOException {
+        try {
+            String key = ACL_KEY_PREFIX + index;
+            writeBatch.delete(key.getBytes(StandardCharsets.UTF_8));
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to delete the ACL list in RocksDB " + "error: " + e.getMessage(), e);
+        }
+    }
+
+    public synchronized void deserializeACL(ReferenceCountedACLCache aclCache) throws IOException {
+        aclCache.clear();
+        ByteArrayInputStream bais;
+        BinaryInputArchive bia;
+        while (rocksIterator.isValid()) {
+            String key = new String(rocksIterator.key(), StandardCharsets.UTF_8);
+            if (!key.startsWith(ACL_KEY_PREFIX)) {
+                break;
+            }
+            key = key.substring(PREFIX_ENDING_INDEX);
+            long val = Long.parseLong(key);
+            List<ACL> aclList = new ArrayList<ACL>();
+            bais = new ByteArrayInputStream(rocksIterator.value());
+            bia = BinaryInputArchive.getArchive(bais);
+            Index j = bia.startVector("acls");
+            if (j == null) {
+                throw new RuntimeException("Incorrent format of InputArchive when deserialize DataTree - missing acls");

Review comment:
       *RESOURCE_LEAK:*  resource of type `java.io.DataInputStream` acquired by call to `getArchive(...)` at line 418 is not released after line 421.

##########
File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/RocksDBSnap.java
##########
@@ -0,0 +1,547 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.persistence;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.Index;
+import org.apache.zookeeper.common.Time;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.server.DataNode;
+import org.apache.zookeeper.server.DataTree;
+import org.apache.zookeeper.server.DataTree.ZxidDigest;
+import org.apache.zookeeper.server.ReferenceCountedACLCache;
+import org.apache.zookeeper.server.ServerMetrics;
+import org.apache.zookeeper.server.TransactionChangeRecord;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements the snapshot interface.
+ * It is responsible for storing, serializing
+ * and deserializing the right snapshot in RocksDB,
+ * and provides access to the snapshots.
+ */
+public class RocksDBSnap implements SnapShot {
+    File snapDir;
+    RocksDB db;
+    Options options;
+    WriteOptions writeOpts;
+    RocksIterator rocksIterator;
+
+    private volatile boolean close = false;
+
+    private static final boolean SYNC_WRITE = false;
+    private static final boolean DISABLE_WAL = true;
+
+    //VisibleForTesting
+    public static final String ROCKSDB_WRITE_BUFFER_SIZE = "zookeeper.rocksdbWriteBufferSize";
+
+    private static final int PREFIX_STARTING_INDEX = 0;
+    private static final int PREFIX_ENDING_INDEX = 3;
+
+    private static final String SESSION_KEY_PREFIX = "S::";
+    private static final String DATATREE_KEY_PREFIX = "T::";
+    private static final String ACL_KEY_PREFIX = "A::";
+
+    private static final String ZXID_KEY = "Zxid";
+    private static final String ZXIDDIGEST_KEY = "ZxidDigest";
+
+    private static final long DEFAULT_ROCKSDB_WRITE_BUFFER_SIZE = 4096 * 1024 * 1024;
+
+    private static final Logger LOG = LoggerFactory.getLogger(RocksDBSnap.class);
+
+    /**
+     * The constructor which takes the snapDir. This class is instantiated
+     * via SnapshotFactory
+     *
+     * @param snapDir the snapshot directory
+     */
+    public RocksDBSnap(File snapDir) throws IOException {
+        RocksDB.loadLibrary();
+        if (snapDir == null) {
+            throw new IllegalArgumentException("Snap Directory can't be null!");
+        }
+
+        this.snapDir = snapDir;
+
+        long rocksdbWriteBufferSize = Long.getLong(
+                ROCKSDB_WRITE_BUFFER_SIZE, DEFAULT_ROCKSDB_WRITE_BUFFER_SIZE);
+        this.options = new Options()
+                .setCreateIfMissing(true)
+                .setCreateMissingColumnFamilies(true)
+                .setDbWriteBufferSize(rocksdbWriteBufferSize);
+
+        try {
+            this.db = RocksDB.open(options, snapDir.getAbsolutePath());
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to open RocksDB. " + "error: " + e.getMessage(), e);
+        }
+        // setting Sync = true and DisableWAL = true will lead to writes failing
+        // and throwing an exception. So we set Sync = false here and let RocksDB
+        // flush after serialization.
+        this.writeOpts = new WriteOptions().setSync(SYNC_WRITE).setDisableWAL(DISABLE_WAL);
+    }
+
+    // VisibleForTesting
+    public void initializeIterator() {
+        rocksIterator = db.newIterator();
+        rocksIterator.seekToFirst();
+    }
+
+    // VisibleForTesting
+    public void closeIterator() {
+        rocksIterator.close();
+    }
+
+    public long deserialize(DataTree dt, Map<Long, Integer> sessions)
+            throws IOException {
+        File[] files = snapDir.listFiles();
+        if (files == null || files.length == 0) {
+            LOG.info("No snapshot found in {}", snapDir.getName());
+            return -1L;
+        }
+        long lastProcessedZxid;
+        long start = Time.currentElapsedTime();
+        try {
+            byte[] zxidBytes = db.get(ZXID_KEY.getBytes(StandardCharsets.UTF_8));
+            if (zxidBytes == null) {
+                // We didn't find zxid infomation in RocksDB, which means
+                // there is no RocksDB snapshot in the snapDir.
+                LOG.info("No snapshot found in {}", snapDir.getName());
+                return -1L;
+            }
+            lastProcessedZxid = Long.parseLong(new String(zxidBytes, StandardCharsets.UTF_8));
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to deserialize last processed zxid in RocksDB. " + "error: " + e.getMessage(), e);
+        }
+        LOG.info("RocksDB: Reading snapshot 0x{} from {}", Long.toHexString(lastProcessedZxid), snapDir);
+        dt.lastProcessedZxid = lastProcessedZxid;
+
+        rocksIterator = db.newIterator();
+        rocksIterator.seekToFirst();
+        ByteArrayInputStream bais;
+        BinaryInputArchive bia;
+        while (rocksIterator.isValid()) {
+            String key = new String(rocksIterator.key(), StandardCharsets.UTF_8);
+            String prefix = key.substring(PREFIX_STARTING_INDEX, PREFIX_ENDING_INDEX);
+            switch (prefix) {
+                case SESSION_KEY_PREFIX:
+                    deserializeSessions(sessions);
+                    break;
+                case ACL_KEY_PREFIX:
+                    deserializeACL(dt.getReferenceCountedAclCache());
+                    break;
+                case DATATREE_KEY_PREFIX:
+                    dt.deserialize(this, "tree");
+                    break;
+                default:
+                    // last processed zxid or zxid digest
+                    rocksIterator.next();
+                    break;
+            }
+        }
+        rocksIterator.close();
+
+        deserializeZxidDigest(dt);
+        if (dt.getDigestFromLoadedSnapshot() != null) {
+            dt.compareSnapshotDigests(lastProcessedZxid);
+        }
+        long elapsed = Time.currentElapsedTime() - start;
+        LOG.info("RocksDBSnap deserialization takes " + elapsed + " ms");
+        ServerMetrics.getMetrics().ROCKSDB_SNAPSHOT_DESERIALIZATION_TIME.add(elapsed);
+        return lastProcessedZxid;
+    }
+
+    public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions,
+                                       long lastZxid, boolean fsync) throws IOException {
+        if (close) {
+            return;
+        }
+        if (fsync) {
+            // take a full snapshot when snap sync with the leader
+
+            // close RocksDB for cleaning up the old snapshot,
+            // because destroyDB will fail if the RocksDB is open and locked
+            db.close();
+            // clean up the old snapshot
+            try {
+                RocksDB.destroyDB(snapDir.getAbsolutePath(), options);
+            } catch (RocksDBException e) {
+                throw new IOException("Failed to clean old data in RocksDB files: " + "error: " + e.getMessage(), e);
+            }
+            // re-open RocksDB for taking a new snapshot
+            try {
+                db = RocksDB.open(options, snapDir.getAbsolutePath());
+            } catch (RocksDBException e) {
+                throw new IOException("Failed to open RocksDB. " + "error: " + e.getMessage(), e);
+            }
+
+            LOG.info("RocksDB: Snapshotting: 0x{} to {}", Long.toHexString(lastZxid), snapDir);
+
+            updateLastProcessedZxid(lastZxid, null);
+            serializeSessions(sessions);
+            serializeACL(dt.getReferenceCountedAclCache());
+            dt.serialize(this, "tree");
+            serializeZxidDigest(dt);
+        }
+        flush();
+    }
+
+    public void flush() throws IOException {
+        long start = Time.currentElapsedTime();
+        try (final FlushOptions flushOptions = new FlushOptions().setWaitForFlush(true)) {
+            db.flush(flushOptions);
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to flush in RocksDB: " + "error: " + e.getMessage(), e);
+        }
+        long elapsed = Time.currentElapsedTime() - start;
+        ServerMetrics.getMetrics().ROCKSDB_FLUSH_TIME.add(elapsed);
+    }
+
+    public File findMostRecentSnapshot() throws IOException {
+        // In RocksDB, we always apply transactions to the current snapshot.
+        // So we only keep one single folder for the RocksDB snapshot. If
+        // this snapshot cannot be loaded because of corrupted data, we will
+        // sync with leader to get the latest data. Keeping multiple snapshots
+        // won't help here, since we still need to take snapshot syncing with
+        // the old snapshot, that's why we only keep one here.
+        return snapDir;
+    }
+
+    @SuppressWarnings("unchecked")
+    public void applyTxn(List<TransactionChangeRecord> changeList, long zxid) throws IOException {
+        // We use RocksDB WriteBatch to make atomic updates.
+        // We didn't let applying client's requests wait until flush finished because
+        // flushing 200MB of data in memtables takes nearly 1.5 seconds, flushing 500MB
+        // of data takes 4.5 seconds, and flushing 1GB of data takes more than 10 seconds.
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            // update sessions, ACL, DataTree and ZxidDigest in RocksDB
+            for (int i = 0; i < changeList.size(); i++) {
+                TransactionChangeRecord change = changeList.get(i);
+                switch (change.getType()) {
+                    case TransactionChangeRecord.DATANODE:
+                        String path = (String) change.getKey();
+                        DataNode node = (DataNode) change.getValue();
+                        String operation = change.getOperation();
+                        if (operation.equals(TransactionChangeRecord.ADD)
+                            || operation.equals(TransactionChangeRecord.UPDATE)) {
+                            addNode(path, node, writeBatch);
+                        } else {
+                            removeNode(path, writeBatch);
+                        }
+                        break;
+                    case TransactionChangeRecord.ACL:
+                        Long index = (Long) change.getKey();
+                        List<ACL> aclList = (List<ACL>) change.getValue();
+                        if (change.getOperation().equals(TransactionChangeRecord.ADD)) {
+                            addACLKeyValue(index, aclList, writeBatch);
+                        } else {
+                            removeACLKeyValue(index, writeBatch);
+                        }
+                        break;
+                    case TransactionChangeRecord.SESSION:
+                        Long id = (Long) change.getKey();
+                        Integer timeout = (Integer) change.getValue();
+                        if (change.getOperation().equals(TransactionChangeRecord.ADD)) {
+                            addSessionKeyValue(id, timeout, writeBatch);
+                        } else {
+                            removeSessionKeyValue(id, writeBatch);
+                        }
+                        break;
+                    case TransactionChangeRecord.ZXIDDIGEST:
+                        ZxidDigest zxidDigest = (ZxidDigest) change.getValue();
+                        if (change.getOperation().equals(TransactionChangeRecord.UPDATE)) {
+                            updateZxidDigest(zxidDigest, writeBatch);
+                        }
+                        break;
+                    default:
+                        LOG.warn("Unknown TransactionChangeRecord type {}", change);
+                        break;
+                }
+            }
+            // update the zxid in RocksDB
+            updateLastProcessedZxid(zxid, writeBatch);
+
+            // even if RocksDB's memTable is auto flushed, we always have consistent data and zxid digest.
+            db.write(writeOpts, writeBatch);
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to apply txns to RocksDB " + "error: " + e.getMessage(), e);
+        }
+    }
+
+    private void updateLastProcessedZxid(long zxid, WriteBatch writeBatch) throws IOException {
+        try {
+            if (writeBatch != null) {
+                writeBatch.put(ZXID_KEY.getBytes(StandardCharsets.UTF_8),
+                        Long.toString(zxid).getBytes(StandardCharsets.UTF_8));
+            } else {
+                db.put(writeOpts, ZXID_KEY.getBytes(StandardCharsets.UTF_8),
+                        Long.toString(zxid).getBytes(StandardCharsets.UTF_8));
+            }
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to serialize last processed zxid in RocksDB. "
+                    + "error: " + e.getMessage(), e);
+        }
+    }
+
+    public void serializeSessions(Map<Long, Integer> sessions) throws IOException {
+        HashMap<Long, Integer> sessSnap = new HashMap<Long, Integer>(sessions);
+        for (Entry<Long, Integer> entry : sessSnap.entrySet()) {
+            addSessionKeyValue(entry.getKey(), entry.getValue(), null);
+        }
+    }
+
+    private void addSessionKeyValue(Long id, Integer timeout, WriteBatch writeBatch) throws IOException {
+        try {
+            String key = SESSION_KEY_PREFIX + id;
+            if (writeBatch != null) {
+                writeBatch.put(key.getBytes(StandardCharsets.UTF_8),
+                        timeout.toString().getBytes(StandardCharsets.UTF_8));
+            } else {
+                db.put(writeOpts, key.getBytes(StandardCharsets.UTF_8),
+                        timeout.toString().getBytes(StandardCharsets.UTF_8));
+            }
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to serialize sessions in RocksDB " + "error: " + e.getMessage(), e);
+        }
+    }
+
+    private void removeSessionKeyValue(Long id, WriteBatch writeBatch) throws IOException {
+        try {
+            String key = SESSION_KEY_PREFIX + id;
+            writeBatch.delete(key.getBytes(StandardCharsets.UTF_8));
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to delete the session in RocksDB " + "error: " + e.getMessage(), e);
+        }
+    }
+
+    public void deserializeSessions(Map<Long, Integer> sessions) throws IOException {
+        while (rocksIterator.isValid()) {
+            String key = new String(rocksIterator.key(), StandardCharsets.UTF_8);
+            if (!key.startsWith(SESSION_KEY_PREFIX)) {
+                break;
+            }
+            key = key.substring(PREFIX_ENDING_INDEX);
+            long id = Long.parseLong(key);
+            int to = Integer.parseInt(new String(rocksIterator.value(), StandardCharsets.UTF_8));
+            sessions.put(id, to);
+            rocksIterator.next();
+        }
+    }
+
+    public synchronized void serializeACL(ReferenceCountedACLCache aclCache) throws IOException {
+        Set<Map.Entry<Long, List<ACL>>> set = aclCache.getLongKeyMap().entrySet();
+        for (Map.Entry<Long, List<ACL>> val : set) {
+            addACLKeyValue(val.getKey(), val.getValue(), null);
+        }
+    }
+
+    private void addACLKeyValue(Long index, List<ACL> aclList, WriteBatch writeBatch) throws IOException {
+        String key = ACL_KEY_PREFIX + index;
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
+        boa.startVector(aclList, "acls");
+        for (ACL acl : aclList) {
+            acl.serialize(boa, "acl");
+        }
+        boa.endVector(aclList, "acls");
+        ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
+        try {
+            if (writeBatch != null) {
+                writeBatch.put(key.getBytes(StandardCharsets.UTF_8), bb.array());
+            } else {
+                db.put(writeOpts, key.getBytes(StandardCharsets.UTF_8), bb.array());
+            }
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to serialize ACL lists in RocksDB " + "error: " + e.getMessage(), e);
+        }
+    }
+
+    private void removeACLKeyValue(Long index, WriteBatch writeBatch) throws IOException {
+        try {
+            String key = ACL_KEY_PREFIX + index;
+            writeBatch.delete(key.getBytes(StandardCharsets.UTF_8));
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to delete the ACL list in RocksDB " + "error: " + e.getMessage(), e);
+        }
+    }
+
+    public synchronized void deserializeACL(ReferenceCountedACLCache aclCache) throws IOException {
+        aclCache.clear();
+        ByteArrayInputStream bais;
+        BinaryInputArchive bia;
+        while (rocksIterator.isValid()) {
+            String key = new String(rocksIterator.key(), StandardCharsets.UTF_8);
+            if (!key.startsWith(ACL_KEY_PREFIX)) {
+                break;
+            }
+            key = key.substring(PREFIX_ENDING_INDEX);
+            long val = Long.parseLong(key);
+            List<ACL> aclList = new ArrayList<ACL>();
+            bais = new ByteArrayInputStream(rocksIterator.value());
+            bia = BinaryInputArchive.getArchive(bais);
+            Index j = bia.startVector("acls");
+            if (j == null) {
+                throw new RuntimeException("Incorrent format of InputArchive when deserialize DataTree - missing acls");
+            }
+            while (!j.done()) {
+                ACL acl = new ACL();
+                acl.deserialize(bia, "acl");
+                aclList.add(acl);
+                j.incr();
+            }
+            aclCache.updateMaps(val, aclList);
+            rocksIterator.next();
+        }
+    }
+
+    public void writeNode(String pathString, DataNode node) throws IOException {
+        addNode(pathString, node, null);
+    }
+
+    private void addNode(String pathString, DataNode node, WriteBatch writeBatch) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
+        boa.writeRecord(node, "node");
+        ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
+        try {
+            pathString = DATATREE_KEY_PREFIX + pathString;
+            if (writeBatch != null) {
+                writeBatch.put(pathString.getBytes(StandardCharsets.UTF_8), bb.array());
+            } else {
+                db.put(writeOpts, pathString.getBytes(StandardCharsets.UTF_8), bb.array());
+            }
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to serialize data node in RocksDB " + "error: " + e.getMessage(), e);
+        }
+    }
+
+    private void removeNode(String pathString, WriteBatch writeBatch) throws IOException {
+        try {
+            pathString = DATATREE_KEY_PREFIX + pathString;
+            writeBatch.delete(pathString.getBytes(StandardCharsets.UTF_8));
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to delete the data node in RocksDB " + "error: " + e.getMessage(), e);
+        }
+    }
+
+    public void markEnd() throws IOException {
+        // nothing needs to be done here when taking a snapshot in RocksDB
+    }
+
+    public String readNode(DataNode node) throws IOException {
+        if (!rocksIterator.isValid()) {
+            // finished iterating over all data nodes in RocksDB snapshot
+            return "/";
+        }
+        String path = new String(rocksIterator.key(), StandardCharsets.UTF_8);
+        if (!path.startsWith(DATATREE_KEY_PREFIX)) {
+            // finished iterating over all data nodes in RocksDB snapshot
+            return "/";
+        }
+        path = path.substring(PREFIX_ENDING_INDEX);
+        ByteArrayInputStream bais = new ByteArrayInputStream(rocksIterator.value());
+        BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
+        bia.readRecord(node, "node");
+        rocksIterator.next();
+        return path;
+    }
+
+    public boolean serializeZxidDigest(DataTree dt) throws IOException {
+        if (dt.nodesDigestEnabled()) {
+            ZxidDigest zxidDigest = dt.getLastProcessedZxidDigest();
+            if (zxidDigest == null) {
+                zxidDigest = dt.getBlankDigest();
+            }
+            updateZxidDigest(zxidDigest, null);
+            return true;
+        }
+        return false;
+    }
+
+    private void updateZxidDigest(ZxidDigest zxidDigest, WriteBatch writeBatch) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
+        zxidDigest.serialize(boa);
+        ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
+        try {
+            if (writeBatch != null) {
+                writeBatch.put(ZXIDDIGEST_KEY.getBytes(StandardCharsets.UTF_8), bb.array());
+            } else {
+                db.put(writeOpts, ZXIDDIGEST_KEY.getBytes(StandardCharsets.UTF_8), bb.array());
+            }
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to serialize zxid digest in RocksDB " + "error: " + e.getMessage(), e);
+        }
+    }
+
+    public boolean deserializeZxidDigest(DataTree dt) throws IOException {
+        if (dt.nodesDigestEnabled()) {
+            try {
+                byte[] zxidDigestBytes = db.get(ZXIDDIGEST_KEY.getBytes(StandardCharsets.UTF_8));
+                ZxidDigest zxidDigest = dt.getBlankDigest();
+                ByteArrayInputStream bais = new ByteArrayInputStream(zxidDigestBytes);
+                BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
+                zxidDigest.deserialize(bia);

Review comment:
       *RESOURCE_LEAK:*  resource of type `java.io.DataInputStream` acquired by call to `getArchive(...)` at line 520 is not released after line 521.

##########
File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/RocksDBSnap.java
##########
@@ -0,0 +1,547 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.persistence;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.Index;
+import org.apache.zookeeper.common.Time;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.server.DataNode;
+import org.apache.zookeeper.server.DataTree;
+import org.apache.zookeeper.server.DataTree.ZxidDigest;
+import org.apache.zookeeper.server.ReferenceCountedACLCache;
+import org.apache.zookeeper.server.ServerMetrics;
+import org.apache.zookeeper.server.TransactionChangeRecord;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements the snapshot interface.
+ * It is responsible for storing, serializing
+ * and deserializing the right snapshot in RocksDB,
+ * and provides access to the snapshots.
+ */
+public class RocksDBSnap implements SnapShot {
+    File snapDir;
+    RocksDB db;
+    Options options;
+    WriteOptions writeOpts;
+    RocksIterator rocksIterator;
+
+    private volatile boolean close = false;
+
+    private static final boolean SYNC_WRITE = false;
+    private static final boolean DISABLE_WAL = true;
+
+    //VisibleForTesting
+    public static final String ROCKSDB_WRITE_BUFFER_SIZE = "zookeeper.rocksdbWriteBufferSize";
+
+    private static final int PREFIX_STARTING_INDEX = 0;
+    private static final int PREFIX_ENDING_INDEX = 3;
+
+    private static final String SESSION_KEY_PREFIX = "S::";
+    private static final String DATATREE_KEY_PREFIX = "T::";
+    private static final String ACL_KEY_PREFIX = "A::";
+
+    private static final String ZXID_KEY = "Zxid";
+    private static final String ZXIDDIGEST_KEY = "ZxidDigest";
+
+    private static final long DEFAULT_ROCKSDB_WRITE_BUFFER_SIZE = 4096 * 1024 * 1024;
+
+    private static final Logger LOG = LoggerFactory.getLogger(RocksDBSnap.class);
+
+    /**
+     * The constructor which takes the snapDir. This class is instantiated
+     * via SnapshotFactory
+     *
+     * @param snapDir the snapshot directory
+     */
+    public RocksDBSnap(File snapDir) throws IOException {
+        RocksDB.loadLibrary();
+        if (snapDir == null) {
+            throw new IllegalArgumentException("Snap Directory can't be null!");
+        }
+
+        this.snapDir = snapDir;
+
+        long rocksdbWriteBufferSize = Long.getLong(
+                ROCKSDB_WRITE_BUFFER_SIZE, DEFAULT_ROCKSDB_WRITE_BUFFER_SIZE);
+        this.options = new Options()
+                .setCreateIfMissing(true)
+                .setCreateMissingColumnFamilies(true)
+                .setDbWriteBufferSize(rocksdbWriteBufferSize);
+
+        try {
+            this.db = RocksDB.open(options, snapDir.getAbsolutePath());
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to open RocksDB. " + "error: " + e.getMessage(), e);
+        }
+        // setting Sync = true and DisableWAL = true will lead to writes failing
+        // and throwing an exception. So we set Sync = false here and let RocksDB
+        // flush after serialization.
+        this.writeOpts = new WriteOptions().setSync(SYNC_WRITE).setDisableWAL(DISABLE_WAL);
+    }
+
+    // VisibleForTesting
+    public void initializeIterator() {
+        rocksIterator = db.newIterator();
+        rocksIterator.seekToFirst();
+    }
+
+    // VisibleForTesting
+    public void closeIterator() {
+        rocksIterator.close();
+    }
+
+    public long deserialize(DataTree dt, Map<Long, Integer> sessions)
+            throws IOException {
+        File[] files = snapDir.listFiles();
+        if (files == null || files.length == 0) {
+            LOG.info("No snapshot found in {}", snapDir.getName());
+            return -1L;
+        }
+        long lastProcessedZxid;
+        long start = Time.currentElapsedTime();
+        try {
+            byte[] zxidBytes = db.get(ZXID_KEY.getBytes(StandardCharsets.UTF_8));
+            if (zxidBytes == null) {
+                // We didn't find zxid infomation in RocksDB, which means
+                // there is no RocksDB snapshot in the snapDir.
+                LOG.info("No snapshot found in {}", snapDir.getName());
+                return -1L;
+            }
+            lastProcessedZxid = Long.parseLong(new String(zxidBytes, StandardCharsets.UTF_8));
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to deserialize last processed zxid in RocksDB. " + "error: " + e.getMessage(), e);
+        }
+        LOG.info("RocksDB: Reading snapshot 0x{} from {}", Long.toHexString(lastProcessedZxid), snapDir);
+        dt.lastProcessedZxid = lastProcessedZxid;
+
+        rocksIterator = db.newIterator();
+        rocksIterator.seekToFirst();
+        ByteArrayInputStream bais;
+        BinaryInputArchive bia;
+        while (rocksIterator.isValid()) {
+            String key = new String(rocksIterator.key(), StandardCharsets.UTF_8);
+            String prefix = key.substring(PREFIX_STARTING_INDEX, PREFIX_ENDING_INDEX);
+            switch (prefix) {
+                case SESSION_KEY_PREFIX:
+                    deserializeSessions(sessions);
+                    break;
+                case ACL_KEY_PREFIX:
+                    deserializeACL(dt.getReferenceCountedAclCache());
+                    break;
+                case DATATREE_KEY_PREFIX:
+                    dt.deserialize(this, "tree");
+                    break;
+                default:
+                    // last processed zxid or zxid digest
+                    rocksIterator.next();
+                    break;
+            }
+        }
+        rocksIterator.close();
+
+        deserializeZxidDigest(dt);
+        if (dt.getDigestFromLoadedSnapshot() != null) {
+            dt.compareSnapshotDigests(lastProcessedZxid);
+        }
+        long elapsed = Time.currentElapsedTime() - start;
+        LOG.info("RocksDBSnap deserialization takes " + elapsed + " ms");
+        ServerMetrics.getMetrics().ROCKSDB_SNAPSHOT_DESERIALIZATION_TIME.add(elapsed);
+        return lastProcessedZxid;
+    }
+
+    public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions,
+                                       long lastZxid, boolean fsync) throws IOException {
+        if (close) {
+            return;
+        }
+        if (fsync) {
+            // take a full snapshot when snap sync with the leader
+
+            // close RocksDB for cleaning up the old snapshot,
+            // because destroyDB will fail if the RocksDB is open and locked
+            db.close();
+            // clean up the old snapshot
+            try {
+                RocksDB.destroyDB(snapDir.getAbsolutePath(), options);
+            } catch (RocksDBException e) {
+                throw new IOException("Failed to clean old data in RocksDB files: " + "error: " + e.getMessage(), e);
+            }
+            // re-open RocksDB for taking a new snapshot
+            try {
+                db = RocksDB.open(options, snapDir.getAbsolutePath());
+            } catch (RocksDBException e) {
+                throw new IOException("Failed to open RocksDB. " + "error: " + e.getMessage(), e);
+            }
+
+            LOG.info("RocksDB: Snapshotting: 0x{} to {}", Long.toHexString(lastZxid), snapDir);
+
+            updateLastProcessedZxid(lastZxid, null);
+            serializeSessions(sessions);
+            serializeACL(dt.getReferenceCountedAclCache());
+            dt.serialize(this, "tree");
+            serializeZxidDigest(dt);
+        }
+        flush();
+    }
+
+    public void flush() throws IOException {
+        long start = Time.currentElapsedTime();
+        try (final FlushOptions flushOptions = new FlushOptions().setWaitForFlush(true)) {
+            db.flush(flushOptions);
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to flush in RocksDB: " + "error: " + e.getMessage(), e);
+        }
+        long elapsed = Time.currentElapsedTime() - start;
+        ServerMetrics.getMetrics().ROCKSDB_FLUSH_TIME.add(elapsed);
+    }
+
+    public File findMostRecentSnapshot() throws IOException {
+        // In RocksDB, we always apply transactions to the current snapshot.
+        // So we only keep one single folder for the RocksDB snapshot. If
+        // this snapshot cannot be loaded because of corrupted data, we will
+        // sync with leader to get the latest data. Keeping multiple snapshots
+        // won't help here, since we still need to take snapshot syncing with
+        // the old snapshot, that's why we only keep one here.
+        return snapDir;
+    }
+
+    @SuppressWarnings("unchecked")
+    public void applyTxn(List<TransactionChangeRecord> changeList, long zxid) throws IOException {
+        // We use RocksDB WriteBatch to make atomic updates.
+        // We didn't let applying client's requests wait until flush finished because
+        // flushing 200MB of data in memtables takes nearly 1.5 seconds, flushing 500MB
+        // of data takes 4.5 seconds, and flushing 1GB of data takes more than 10 seconds.
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            // update sessions, ACL, DataTree and ZxidDigest in RocksDB
+            for (int i = 0; i < changeList.size(); i++) {
+                TransactionChangeRecord change = changeList.get(i);
+                switch (change.getType()) {
+                    case TransactionChangeRecord.DATANODE:
+                        String path = (String) change.getKey();
+                        DataNode node = (DataNode) change.getValue();
+                        String operation = change.getOperation();
+                        if (operation.equals(TransactionChangeRecord.ADD)
+                            || operation.equals(TransactionChangeRecord.UPDATE)) {
+                            addNode(path, node, writeBatch);
+                        } else {
+                            removeNode(path, writeBatch);
+                        }
+                        break;
+                    case TransactionChangeRecord.ACL:
+                        Long index = (Long) change.getKey();
+                        List<ACL> aclList = (List<ACL>) change.getValue();
+                        if (change.getOperation().equals(TransactionChangeRecord.ADD)) {
+                            addACLKeyValue(index, aclList, writeBatch);
+                        } else {
+                            removeACLKeyValue(index, writeBatch);
+                        }
+                        break;
+                    case TransactionChangeRecord.SESSION:
+                        Long id = (Long) change.getKey();
+                        Integer timeout = (Integer) change.getValue();
+                        if (change.getOperation().equals(TransactionChangeRecord.ADD)) {
+                            addSessionKeyValue(id, timeout, writeBatch);
+                        } else {
+                            removeSessionKeyValue(id, writeBatch);
+                        }
+                        break;
+                    case TransactionChangeRecord.ZXIDDIGEST:
+                        ZxidDigest zxidDigest = (ZxidDigest) change.getValue();
+                        if (change.getOperation().equals(TransactionChangeRecord.UPDATE)) {
+                            updateZxidDigest(zxidDigest, writeBatch);
+                        }
+                        break;
+                    default:
+                        LOG.warn("Unknown TransactionChangeRecord type {}", change);
+                        break;
+                }
+            }
+            // update the zxid in RocksDB
+            updateLastProcessedZxid(zxid, writeBatch);
+
+            // even if RocksDB's memTable is auto flushed, we always have consistent data and zxid digest.
+            db.write(writeOpts, writeBatch);
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to apply txns to RocksDB " + "error: " + e.getMessage(), e);
+        }
+    }
+
+    private void updateLastProcessedZxid(long zxid, WriteBatch writeBatch) throws IOException {
+        try {
+            if (writeBatch != null) {
+                writeBatch.put(ZXID_KEY.getBytes(StandardCharsets.UTF_8),
+                        Long.toString(zxid).getBytes(StandardCharsets.UTF_8));
+            } else {
+                db.put(writeOpts, ZXID_KEY.getBytes(StandardCharsets.UTF_8),
+                        Long.toString(zxid).getBytes(StandardCharsets.UTF_8));
+            }
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to serialize last processed zxid in RocksDB. "
+                    + "error: " + e.getMessage(), e);
+        }
+    }
+
+    public void serializeSessions(Map<Long, Integer> sessions) throws IOException {
+        HashMap<Long, Integer> sessSnap = new HashMap<Long, Integer>(sessions);
+        for (Entry<Long, Integer> entry : sessSnap.entrySet()) {
+            addSessionKeyValue(entry.getKey(), entry.getValue(), null);
+        }
+    }
+
+    private void addSessionKeyValue(Long id, Integer timeout, WriteBatch writeBatch) throws IOException {
+        try {
+            String key = SESSION_KEY_PREFIX + id;
+            if (writeBatch != null) {
+                writeBatch.put(key.getBytes(StandardCharsets.UTF_8),
+                        timeout.toString().getBytes(StandardCharsets.UTF_8));
+            } else {
+                db.put(writeOpts, key.getBytes(StandardCharsets.UTF_8),
+                        timeout.toString().getBytes(StandardCharsets.UTF_8));
+            }
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to serialize sessions in RocksDB " + "error: " + e.getMessage(), e);
+        }
+    }
+
+    private void removeSessionKeyValue(Long id, WriteBatch writeBatch) throws IOException {
+        try {
+            String key = SESSION_KEY_PREFIX + id;
+            writeBatch.delete(key.getBytes(StandardCharsets.UTF_8));
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to delete the session in RocksDB " + "error: " + e.getMessage(), e);
+        }
+    }
+
+    public void deserializeSessions(Map<Long, Integer> sessions) throws IOException {
+        while (rocksIterator.isValid()) {
+            String key = new String(rocksIterator.key(), StandardCharsets.UTF_8);
+            if (!key.startsWith(SESSION_KEY_PREFIX)) {
+                break;
+            }
+            key = key.substring(PREFIX_ENDING_INDEX);
+            long id = Long.parseLong(key);
+            int to = Integer.parseInt(new String(rocksIterator.value(), StandardCharsets.UTF_8));
+            sessions.put(id, to);
+            rocksIterator.next();
+        }
+    }
+
+    public synchronized void serializeACL(ReferenceCountedACLCache aclCache) throws IOException {
+        Set<Map.Entry<Long, List<ACL>>> set = aclCache.getLongKeyMap().entrySet();
+        for (Map.Entry<Long, List<ACL>> val : set) {
+            addACLKeyValue(val.getKey(), val.getValue(), null);
+        }
+    }
+
+    private void addACLKeyValue(Long index, List<ACL> aclList, WriteBatch writeBatch) throws IOException {
+        String key = ACL_KEY_PREFIX + index;
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
+        boa.startVector(aclList, "acls");
+        for (ACL acl : aclList) {
+            acl.serialize(boa, "acl");
+        }
+        boa.endVector(aclList, "acls");
+        ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
+        try {
+            if (writeBatch != null) {
+                writeBatch.put(key.getBytes(StandardCharsets.UTF_8), bb.array());
+            } else {
+                db.put(writeOpts, key.getBytes(StandardCharsets.UTF_8), bb.array());
+            }
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to serialize ACL lists in RocksDB " + "error: " + e.getMessage(), e);
+        }
+    }
+
+    private void removeACLKeyValue(Long index, WriteBatch writeBatch) throws IOException {
+        try {
+            String key = ACL_KEY_PREFIX + index;
+            writeBatch.delete(key.getBytes(StandardCharsets.UTF_8));
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to delete the ACL list in RocksDB " + "error: " + e.getMessage(), e);
+        }
+    }
+
+    public synchronized void deserializeACL(ReferenceCountedACLCache aclCache) throws IOException {
+        aclCache.clear();
+        ByteArrayInputStream bais;
+        BinaryInputArchive bia;
+        while (rocksIterator.isValid()) {
+            String key = new String(rocksIterator.key(), StandardCharsets.UTF_8);
+            if (!key.startsWith(ACL_KEY_PREFIX)) {
+                break;
+            }
+            key = key.substring(PREFIX_ENDING_INDEX);
+            long val = Long.parseLong(key);
+            List<ACL> aclList = new ArrayList<ACL>();
+            bais = new ByteArrayInputStream(rocksIterator.value());
+            bia = BinaryInputArchive.getArchive(bais);
+            Index j = bia.startVector("acls");
+            if (j == null) {
+                throw new RuntimeException("Incorrent format of InputArchive when deserialize DataTree - missing acls");
+            }
+            while (!j.done()) {
+                ACL acl = new ACL();
+                acl.deserialize(bia, "acl");
+                aclList.add(acl);
+                j.incr();
+            }
+            aclCache.updateMaps(val, aclList);
+            rocksIterator.next();
+        }
+    }
+
+    public void writeNode(String pathString, DataNode node) throws IOException {
+        addNode(pathString, node, null);
+    }
+
+    private void addNode(String pathString, DataNode node, WriteBatch writeBatch) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
+        boa.writeRecord(node, "node");
+        ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
+        try {
+            pathString = DATATREE_KEY_PREFIX + pathString;
+            if (writeBatch != null) {
+                writeBatch.put(pathString.getBytes(StandardCharsets.UTF_8), bb.array());
+            } else {
+                db.put(writeOpts, pathString.getBytes(StandardCharsets.UTF_8), bb.array());
+            }
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to serialize data node in RocksDB " + "error: " + e.getMessage(), e);
+        }
+    }
+
+    private void removeNode(String pathString, WriteBatch writeBatch) throws IOException {
+        try {
+            pathString = DATATREE_KEY_PREFIX + pathString;
+            writeBatch.delete(pathString.getBytes(StandardCharsets.UTF_8));
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to delete the data node in RocksDB " + "error: " + e.getMessage(), e);
+        }
+    }
+
+    public void markEnd() throws IOException {
+        // nothing needs to be done here when taking a snapshot in RocksDB
+    }
+
+    public String readNode(DataNode node) throws IOException {
+        if (!rocksIterator.isValid()) {
+            // finished iterating over all data nodes in RocksDB snapshot
+            return "/";
+        }
+        String path = new String(rocksIterator.key(), StandardCharsets.UTF_8);
+        if (!path.startsWith(DATATREE_KEY_PREFIX)) {
+            // finished iterating over all data nodes in RocksDB snapshot
+            return "/";
+        }
+        path = path.substring(PREFIX_ENDING_INDEX);
+        ByteArrayInputStream bais = new ByteArrayInputStream(rocksIterator.value());
+        BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
+        bia.readRecord(node, "node");

Review comment:
       *RESOURCE_LEAK:*  resource of type `java.io.DataInputStream` acquired by call to `getArchive(...)` at line 480 is not released after line 481.

##########
File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
##########
@@ -1137,7 +1198,18 @@ public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTx
         return rc;
     }
 
+    void createSession(long id, int timeout) {
+        if (changeList != null) {
+            changeList.add(new TransactionChangeRecord(TransactionChangeRecord.SESSION,

Review comment:
       *THREAD_SAFETY_VIOLATION:*  Unprotected write. Non-private method `DataTree.createSession(...)` mutates container `this.changeList` via call to `List.add(...)` outside of synchronization.
    Reporting because another access to the same memory occurs on a background thread, although this access may not.

##########
File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
##########
@@ -585,15 +615,24 @@ public void deleteNode(String path, long zxid) throws KeeperException.NoNodeExce
                 parent.stat.setPzxid(zxid);
             }
             nodes.postChange(parentName, parent);
+
+            if (changeList != null) {
+                changeList.add(new TransactionChangeRecord(TransactionChangeRecord.DATANODE,
+                        TransactionChangeRecord.UPDATE, parentName, parent));
+            }
         }
 
         DataNode node = nodes.get(path);
         if (node == null) {
             throw new KeeperException.NoNodeException();
         }
         nodes.remove(path);
+        if (changeList != null) {
+            changeList.add(new TransactionChangeRecord(TransactionChangeRecord.DATANODE,

Review comment:
       *THREAD_SAFETY_VIOLATION:*  Unprotected write. Non-private method `DataTree.deleteNode(...)` mutates container `this.changeList` via call to `List.add(...)` outside of synchronization.
    Reporting because this access may occur on a background thread.

##########
File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
##########
@@ -1677,6 +1744,10 @@ private void updateWriteStat(String path, long bytes) {

Review comment:
       *THREAD_SAFETY_VIOLATION:*  Read/Write race. Non-private method `DataTree.deserialize(...)` reads without synchronization from `this.root`. Potentially races with write in method `DataTree.deserialize(...)`.
    Reporting because this access may occur on a background thread.

##########
File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
##########
@@ -1137,7 +1198,18 @@ public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTx
         return rc;
     }
 
+    void createSession(long id, int timeout) {
+        if (changeList != null) {
+            changeList.add(new TransactionChangeRecord(TransactionChangeRecord.SESSION,
+                    TransactionChangeRecord.ADD, id, timeout));
+        }
+    }
+
     void killSession(long session, long zxid) {
+        if (changeList != null) {
+            changeList.add(new TransactionChangeRecord(TransactionChangeRecord.SESSION,

Review comment:
       *THREAD_SAFETY_VIOLATION:*  Unprotected write. Non-private method `DataTree.killSession(...)` mutates container `this.changeList` via call to `List.add(...)` outside of synchronization.
    Reporting because another access to the same memory occurs on a background thread, although this access may not.

##########
File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
##########
@@ -874,6 +927,10 @@ public ProcessTxnResult processTxn(TxnHeader header, Record txn) {
     }
 
     public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn) {
+        if (!isSubTxn) {
+            resetChangeList();

Review comment:
       *THREAD_SAFETY_VIOLATION:*  Unprotected write. Non-private method `DataTree.processTxn(...)` indirectly mutates container `this.changeList` via call to `List.clear()` outside of synchronization.
    Reporting because another access to the same memory occurs on a background thread, although this access may not.

##########
File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
##########
@@ -874,6 +927,10 @@ public ProcessTxnResult processTxn(TxnHeader header, Record txn) {

Review comment:
       *THREAD_SAFETY_VIOLATION:*  Read/Write race. Non-private method `DataTree.processTxn(...)` indirectly reads with synchronization from container `this.ephemerals` via call to `Map.get(...)`. Potentially races with unsynchronized write in method `DataTree.deserialize(...)`.
    Reporting because another access to the same memory occurs on a background thread, although this access may not.

##########
File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
##########
@@ -1345,43 +1416,40 @@ void serializeNode(OutputArchive oa, StringBuilder path) throws IOException {
             // to truncate the previous bytes of string.
             path.delete(off, Integer.MAX_VALUE);
             path.append(child);
-            serializeNode(oa, path);
+            serializeNode(snapLog, path);
         }
     }
 
     // visiable for test
-    public void serializeNodeData(OutputArchive oa, String path, DataNode node) throws IOException {
-        oa.writeString(path, "path");
-        oa.writeRecord(node, "node");
+    public void serializeNodeData(final SnapShot snapLog, String path, DataNode node) throws IOException {
+        snapLog.writeNode(path, node);
     }
 
-    public void serializeAcls(OutputArchive oa) throws IOException {
-        aclCache.serialize(oa);
+    public void serializeNodes(SnapShot snapLog) throws IOException {
+        serializeNode(snapLog, new StringBuilder(""));
     }
 
-    public void serializeNodes(OutputArchive oa) throws IOException {
-        serializeNode(oa, new StringBuilder());
+    public void serialize(SnapShot snapLog, String tag) throws IOException {
+        serializeNodes(snapLog);
         // / marks end of stream
         // we need to check if clear had been called in between the snapshot.
         if (root != null) {

Review comment:
       *THREAD_SAFETY_VIOLATION:*  Read/Write race. Non-private method `DataTree.serialize(...)` reads without synchronization from `this.root`. Potentially races with write in method `DataTree.deserialize(...)`.
    Reporting because another access to the same memory occurs on a background thread, although this access may not.

##########
File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/ReferenceCountedACLCache.java
##########
@@ -99,67 +103,33 @@ private long incrementIndex() {
         return ++aclIndex;
     }
 
-    public void deserialize(InputArchive ia) throws IOException {
-        clear();
-        int i = ia.readInt("map");
-
-        LinkedHashMap<Long, List<ACL>> deserializedMap = new LinkedHashMap<>();
-        // keep read operations out of synchronization block
-        while (i > 0) {
-            Long val = ia.readLong("long");
-            List<ACL> aclList = new ArrayList<ACL>();
-            Index j = ia.startVector("acls");
-            if (j == null) {
-                throw new RuntimeException("Incorrent format of InputArchive when deserialize DataTree - missing acls");
-            }
-            while (!j.done()) {
-                ACL acl = new ACL();
-                acl.deserialize(ia, "acl");
-                aclList.add(acl);
-                j.incr();
-            }
-
-            deserializedMap.put(val, aclList);
-            i--;
+    public synchronized void updateMaps(long val, List<ACL> aclList) {
+        if (aclIndex < val) {
+            aclIndex = val;
         }
 
-        synchronized (this) {
-            for (Map.Entry<Long, List<ACL>> entry : deserializedMap.entrySet()) {
-                Long val = entry.getKey();
-                List<ACL> aclList = entry.getValue();
-                if (aclIndex < val) {
-                    aclIndex = val;
-                }
+        longKeyMap.put(val, aclList);
+        aclKeyMap.put(aclList, val);
+        referenceCounter.put(val, new AtomicLongWithEquals(0));
+    }
 
-                longKeyMap.put(val, aclList);
-                aclKeyMap.put(aclList, val);
-                referenceCounter.put(val, new AtomicLongWithEquals(0));
-            }
-        }
+    public synchronized HashMap<Long, List<ACL>> getLongKeyMap() {
+        return new HashMap<>(longKeyMap);
     }
 
-    public void serialize(OutputArchive oa) throws IOException {
-        Map<Long, List<ACL>> clonedLongKeyMap;
-        synchronized (this) {
-            clonedLongKeyMap = new HashMap<>(longKeyMap);
-        }
-        oa.writeInt(clonedLongKeyMap.size(), "map");
-        for (Map.Entry<Long, List<ACL>> val : clonedLongKeyMap.entrySet()) {
-            oa.writeLong(val.getKey(), "long");
-            List<ACL> aclList = val.getValue();
-            oa.startVector(aclList, "acls");
-            for (ACL acl : aclList) {
-                acl.serialize(oa, "acl");
-            }
-            oa.endVector(aclList, "acls");
-        }
+    public synchronized Map<List<ACL>, Long> getAclKeyMap() {
+        return new HashMap<>(aclKeyMap);
+    }
+
+    public synchronized Map<Long, AtomicLongWithEquals> getReferenceCounter() {
+        return new HashMap<>(referenceCounter);
     }
 
     public int size() {
         return aclKeyMap.size();
     }
 
-    private void clear() {
+    public void clear() {
         aclKeyMap.clear();

Review comment:
       *THREAD_SAFETY_VIOLATION:*  Unprotected write. Non-private method `ReferenceCountedACLCache.clear()` mutates container `this.aclKeyMap` via call to `Map.clear()` outside of synchronization.
    Reporting because another access to the same memory occurs on a background thread, although this access may not.

##########
File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/ReferenceCountedACLCache.java
##########
@@ -99,67 +103,33 @@ private long incrementIndex() {
         return ++aclIndex;
     }
 
-    public void deserialize(InputArchive ia) throws IOException {
-        clear();
-        int i = ia.readInt("map");
-
-        LinkedHashMap<Long, List<ACL>> deserializedMap = new LinkedHashMap<>();
-        // keep read operations out of synchronization block
-        while (i > 0) {
-            Long val = ia.readLong("long");
-            List<ACL> aclList = new ArrayList<ACL>();
-            Index j = ia.startVector("acls");
-            if (j == null) {
-                throw new RuntimeException("Incorrent format of InputArchive when deserialize DataTree - missing acls");
-            }
-            while (!j.done()) {
-                ACL acl = new ACL();
-                acl.deserialize(ia, "acl");
-                aclList.add(acl);
-                j.incr();
-            }
-
-            deserializedMap.put(val, aclList);
-            i--;
+    public synchronized void updateMaps(long val, List<ACL> aclList) {
+        if (aclIndex < val) {
+            aclIndex = val;
         }
 
-        synchronized (this) {
-            for (Map.Entry<Long, List<ACL>> entry : deserializedMap.entrySet()) {
-                Long val = entry.getKey();
-                List<ACL> aclList = entry.getValue();
-                if (aclIndex < val) {
-                    aclIndex = val;
-                }
+        longKeyMap.put(val, aclList);
+        aclKeyMap.put(aclList, val);
+        referenceCounter.put(val, new AtomicLongWithEquals(0));
+    }
 
-                longKeyMap.put(val, aclList);
-                aclKeyMap.put(aclList, val);
-                referenceCounter.put(val, new AtomicLongWithEquals(0));
-            }
-        }
+    public synchronized HashMap<Long, List<ACL>> getLongKeyMap() {
+        return new HashMap<>(longKeyMap);
     }
 
-    public void serialize(OutputArchive oa) throws IOException {
-        Map<Long, List<ACL>> clonedLongKeyMap;
-        synchronized (this) {
-            clonedLongKeyMap = new HashMap<>(longKeyMap);
-        }
-        oa.writeInt(clonedLongKeyMap.size(), "map");
-        for (Map.Entry<Long, List<ACL>> val : clonedLongKeyMap.entrySet()) {
-            oa.writeLong(val.getKey(), "long");
-            List<ACL> aclList = val.getValue();
-            oa.startVector(aclList, "acls");
-            for (ACL acl : aclList) {
-                acl.serialize(oa, "acl");
-            }
-            oa.endVector(aclList, "acls");
-        }
+    public synchronized Map<List<ACL>, Long> getAclKeyMap() {
+        return new HashMap<>(aclKeyMap);
+    }
+
+    public synchronized Map<Long, AtomicLongWithEquals> getReferenceCounter() {
+        return new HashMap<>(referenceCounter);
     }
 
     public int size() {
         return aclKeyMap.size();
     }
 
-    private void clear() {
+    public void clear() {
         aclKeyMap.clear();
         longKeyMap.clear();

Review comment:
       *THREAD_SAFETY_VIOLATION:*  Unprotected write. Non-private method `ReferenceCountedACLCache.clear()` mutates container `this.longKeyMap` via call to `Map.clear()` outside of synchronization.
    Reporting because another access to the same memory occurs on a background thread, although this access may not.

##########
File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/ReferenceCountedACLCache.java
##########
@@ -99,67 +103,33 @@ private long incrementIndex() {
         return ++aclIndex;
     }
 
-    public void deserialize(InputArchive ia) throws IOException {
-        clear();
-        int i = ia.readInt("map");
-
-        LinkedHashMap<Long, List<ACL>> deserializedMap = new LinkedHashMap<>();
-        // keep read operations out of synchronization block
-        while (i > 0) {
-            Long val = ia.readLong("long");
-            List<ACL> aclList = new ArrayList<ACL>();
-            Index j = ia.startVector("acls");
-            if (j == null) {
-                throw new RuntimeException("Incorrent format of InputArchive when deserialize DataTree - missing acls");
-            }
-            while (!j.done()) {
-                ACL acl = new ACL();
-                acl.deserialize(ia, "acl");
-                aclList.add(acl);
-                j.incr();
-            }
-
-            deserializedMap.put(val, aclList);
-            i--;
+    public synchronized void updateMaps(long val, List<ACL> aclList) {
+        if (aclIndex < val) {
+            aclIndex = val;
         }
 
-        synchronized (this) {
-            for (Map.Entry<Long, List<ACL>> entry : deserializedMap.entrySet()) {
-                Long val = entry.getKey();
-                List<ACL> aclList = entry.getValue();
-                if (aclIndex < val) {
-                    aclIndex = val;
-                }
+        longKeyMap.put(val, aclList);
+        aclKeyMap.put(aclList, val);
+        referenceCounter.put(val, new AtomicLongWithEquals(0));
+    }
 
-                longKeyMap.put(val, aclList);
-                aclKeyMap.put(aclList, val);
-                referenceCounter.put(val, new AtomicLongWithEquals(0));
-            }
-        }
+    public synchronized HashMap<Long, List<ACL>> getLongKeyMap() {
+        return new HashMap<>(longKeyMap);
     }
 
-    public void serialize(OutputArchive oa) throws IOException {
-        Map<Long, List<ACL>> clonedLongKeyMap;
-        synchronized (this) {
-            clonedLongKeyMap = new HashMap<>(longKeyMap);
-        }
-        oa.writeInt(clonedLongKeyMap.size(), "map");
-        for (Map.Entry<Long, List<ACL>> val : clonedLongKeyMap.entrySet()) {
-            oa.writeLong(val.getKey(), "long");
-            List<ACL> aclList = val.getValue();
-            oa.startVector(aclList, "acls");
-            for (ACL acl : aclList) {
-                acl.serialize(oa, "acl");
-            }
-            oa.endVector(aclList, "acls");
-        }
+    public synchronized Map<List<ACL>, Long> getAclKeyMap() {
+        return new HashMap<>(aclKeyMap);
+    }
+
+    public synchronized Map<Long, AtomicLongWithEquals> getReferenceCounter() {
+        return new HashMap<>(referenceCounter);
     }
 
     public int size() {
         return aclKeyMap.size();
     }
 
-    private void clear() {
+    public void clear() {
         aclKeyMap.clear();
         longKeyMap.clear();
         referenceCounter.clear();

Review comment:
       *THREAD_SAFETY_VIOLATION:*  Unprotected write. Non-private method `ReferenceCountedACLCache.clear()` mutates container `this.referenceCounter` via call to `Map.clear()` outside of synchronization.
    Reporting because another access to the same memory occurs on a background thread, although this access may not.

##########
File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/ReferenceCountedACLCache.java
##########
@@ -99,67 +103,33 @@ private long incrementIndex() {
         return ++aclIndex;
     }
 
-    public void deserialize(InputArchive ia) throws IOException {
-        clear();
-        int i = ia.readInt("map");
-
-        LinkedHashMap<Long, List<ACL>> deserializedMap = new LinkedHashMap<>();
-        // keep read operations out of synchronization block
-        while (i > 0) {
-            Long val = ia.readLong("long");
-            List<ACL> aclList = new ArrayList<ACL>();
-            Index j = ia.startVector("acls");
-            if (j == null) {
-                throw new RuntimeException("Incorrent format of InputArchive when deserialize DataTree - missing acls");
-            }
-            while (!j.done()) {
-                ACL acl = new ACL();
-                acl.deserialize(ia, "acl");
-                aclList.add(acl);
-                j.incr();
-            }
-
-            deserializedMap.put(val, aclList);
-            i--;
+    public synchronized void updateMaps(long val, List<ACL> aclList) {
+        if (aclIndex < val) {
+            aclIndex = val;
         }
 
-        synchronized (this) {
-            for (Map.Entry<Long, List<ACL>> entry : deserializedMap.entrySet()) {
-                Long val = entry.getKey();
-                List<ACL> aclList = entry.getValue();
-                if (aclIndex < val) {
-                    aclIndex = val;
-                }
+        longKeyMap.put(val, aclList);
+        aclKeyMap.put(aclList, val);
+        referenceCounter.put(val, new AtomicLongWithEquals(0));
+    }
 
-                longKeyMap.put(val, aclList);
-                aclKeyMap.put(aclList, val);
-                referenceCounter.put(val, new AtomicLongWithEquals(0));
-            }
-        }
+    public synchronized HashMap<Long, List<ACL>> getLongKeyMap() {
+        return new HashMap<>(longKeyMap);
     }
 
-    public void serialize(OutputArchive oa) throws IOException {
-        Map<Long, List<ACL>> clonedLongKeyMap;
-        synchronized (this) {
-            clonedLongKeyMap = new HashMap<>(longKeyMap);
-        }
-        oa.writeInt(clonedLongKeyMap.size(), "map");
-        for (Map.Entry<Long, List<ACL>> val : clonedLongKeyMap.entrySet()) {
-            oa.writeLong(val.getKey(), "long");
-            List<ACL> aclList = val.getValue();
-            oa.startVector(aclList, "acls");
-            for (ACL acl : aclList) {
-                acl.serialize(oa, "acl");
-            }
-            oa.endVector(aclList, "acls");
-        }
+    public synchronized Map<List<ACL>, Long> getAclKeyMap() {
+        return new HashMap<>(aclKeyMap);
+    }
+
+    public synchronized Map<Long, AtomicLongWithEquals> getReferenceCounter() {
+        return new HashMap<>(referenceCounter);
     }
 
     public int size() {
         return aclKeyMap.size();

Review comment:
       *THREAD_SAFETY_VIOLATION:*  Read/Write race. Non-private method `ReferenceCountedACLCache.size()` reads without synchronization from container `this.aclKeyMap` via call to `Map.size()`. Potentially races with write in method `ReferenceCountedACLCache.convertAcls(...)`.
    Reporting because another access to the same memory occurs on a background thread, although this access may not.

##########
File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
##########
@@ -479,7 +481,19 @@ public void setlastProcessedZxid(long zxid) {
      * datatree/zkdatabase
      */
     public ProcessTxnResult processTxn(TxnHeader hdr, Record txn, TxnDigest digest) {
-        return dataTree.processTxn(hdr, txn, digest);
+        ProcessTxnResult rc = dataTree.processTxn(hdr, txn, digest);
+
+        if (dataTree.getChangeList() != null) {

Review comment:
       *THREAD_SAFETY_VIOLATION:*  Read/Write race. Non-private method `ZKDatabase.processTxn(...)` reads with synchronization from `this.dataTree`. Potentially races with unsynchronized write in method `ZKDatabase.clear()`.
    Reporting because another access to the same memory occurs on a background thread, although this access may not.

##########
File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
##########
@@ -752,4 +778,16 @@ public long getTxnSize() {
     public boolean compareDigest(TxnHeader header, Record txn, TxnDigest digest) {
         return dataTree.compareDigest(header, txn, digest);
     }
+
+    /**
+     * Take a snap shot of zk database.
+     *
+     * @param syncSnap Controls whether or not to do a full snap or
+     *                 incremental snap. When set to true, will do
+     *                 a full snap. When set to false, will do an
+     *                 incremental snap.
+     */
+    public void save(boolean syncSnap) throws IOException {
+        snapLog.save(dataTree, sessionsWithTimeouts, syncSnap);

Review comment:
       *THREAD_SAFETY_VIOLATION:*  Read/Write race. Non-private method `ZKDatabase.save(...)` reads without synchronization from `this.dataTree`. Potentially races with write in method `ZKDatabase.clear()`.
    Reporting because another access to the same memory occurs on a background thread, although this access may not.

##########
File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java
##########
@@ -209,22 +262,22 @@ public File findMostRecentSnapshot() throws IOException {
      * serialize the datatree and sessions
      * @param dt the datatree to be serialized
      * @param sessions the sessions to be serialized
-     * @param oa the output archive to serialize into
      * @param header the header of this snapshot
      * @throws IOException
      */
     protected void serialize(
         DataTree dt,
         Map<Long, Integer> sessions,
-        OutputArchive oa,
         FileHeader header) throws IOException {
         // this is really a programmatic error and not something that can
         // happen at runtime
         if (header == null) {
             throw new IllegalStateException("Snapshot's not open for writing: uninitialized header");
         }
         header.serialize(oa, "fileheader");

Review comment:
       *THREAD_SAFETY_VIOLATION:*  Read/Write race. Non-private method `FileSnap.serialize(...)` reads without synchronization from `this.oa`. Potentially races with write in method `FileSnap.serialize(...)`.
    Reporting because another access to the same memory occurs on a background thread, although this access may not.

##########
File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/RocksDBSnap.java
##########
@@ -0,0 +1,547 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.persistence;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.Index;
+import org.apache.zookeeper.common.Time;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.server.DataNode;
+import org.apache.zookeeper.server.DataTree;
+import org.apache.zookeeper.server.DataTree.ZxidDigest;
+import org.apache.zookeeper.server.ReferenceCountedACLCache;
+import org.apache.zookeeper.server.ServerMetrics;
+import org.apache.zookeeper.server.TransactionChangeRecord;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements the snapshot interface.
+ * It is responsible for storing, serializing
+ * and deserializing the right snapshot in RocksDB,
+ * and provides access to the snapshots.
+ */
+public class RocksDBSnap implements SnapShot {
+    File snapDir;
+    RocksDB db;
+    Options options;
+    WriteOptions writeOpts;
+    RocksIterator rocksIterator;
+
+    private volatile boolean close = false;
+
+    private static final boolean SYNC_WRITE = false;
+    private static final boolean DISABLE_WAL = true;
+
+    //VisibleForTesting
+    public static final String ROCKSDB_WRITE_BUFFER_SIZE = "zookeeper.rocksdbWriteBufferSize";
+
+    private static final int PREFIX_STARTING_INDEX = 0;
+    private static final int PREFIX_ENDING_INDEX = 3;
+
+    private static final String SESSION_KEY_PREFIX = "S::";
+    private static final String DATATREE_KEY_PREFIX = "T::";
+    private static final String ACL_KEY_PREFIX = "A::";
+
+    private static final String ZXID_KEY = "Zxid";
+    private static final String ZXIDDIGEST_KEY = "ZxidDigest";
+
+    private static final long DEFAULT_ROCKSDB_WRITE_BUFFER_SIZE = 4096 * 1024 * 1024;
+
+    private static final Logger LOG = LoggerFactory.getLogger(RocksDBSnap.class);
+
+    /**
+     * The constructor which takes the snapDir. This class is instantiated
+     * via SnapshotFactory
+     *
+     * @param snapDir the snapshot directory
+     */
+    public RocksDBSnap(File snapDir) throws IOException {
+        RocksDB.loadLibrary();
+        if (snapDir == null) {
+            throw new IllegalArgumentException("Snap Directory can't be null!");
+        }
+
+        this.snapDir = snapDir;
+
+        long rocksdbWriteBufferSize = Long.getLong(
+                ROCKSDB_WRITE_BUFFER_SIZE, DEFAULT_ROCKSDB_WRITE_BUFFER_SIZE);
+        this.options = new Options()
+                .setCreateIfMissing(true)
+                .setCreateMissingColumnFamilies(true)
+                .setDbWriteBufferSize(rocksdbWriteBufferSize);
+
+        try {
+            this.db = RocksDB.open(options, snapDir.getAbsolutePath());
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to open RocksDB. " + "error: " + e.getMessage(), e);
+        }
+        // setting Sync = true and DisableWAL = true will lead to writes failing
+        // and throwing an exception. So we set Sync = false here and let RocksDB
+        // flush after serialization.
+        this.writeOpts = new WriteOptions().setSync(SYNC_WRITE).setDisableWAL(DISABLE_WAL);
+    }
+
+    // VisibleForTesting
+    public void initializeIterator() {
+        rocksIterator = db.newIterator();
+        rocksIterator.seekToFirst();
+    }
+
+    // VisibleForTesting
+    public void closeIterator() {
+        rocksIterator.close();
+    }
+
+    public long deserialize(DataTree dt, Map<Long, Integer> sessions)
+            throws IOException {
+        File[] files = snapDir.listFiles();
+        if (files == null || files.length == 0) {
+            LOG.info("No snapshot found in {}", snapDir.getName());
+            return -1L;
+        }
+        long lastProcessedZxid;
+        long start = Time.currentElapsedTime();
+        try {
+            byte[] zxidBytes = db.get(ZXID_KEY.getBytes(StandardCharsets.UTF_8));
+            if (zxidBytes == null) {
+                // We didn't find zxid infomation in RocksDB, which means
+                // there is no RocksDB snapshot in the snapDir.
+                LOG.info("No snapshot found in {}", snapDir.getName());
+                return -1L;
+            }
+            lastProcessedZxid = Long.parseLong(new String(zxidBytes, StandardCharsets.UTF_8));
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to deserialize last processed zxid in RocksDB. " + "error: " + e.getMessage(), e);
+        }
+        LOG.info("RocksDB: Reading snapshot 0x{} from {}", Long.toHexString(lastProcessedZxid), snapDir);
+        dt.lastProcessedZxid = lastProcessedZxid;
+
+        rocksIterator = db.newIterator();
+        rocksIterator.seekToFirst();
+        ByteArrayInputStream bais;
+        BinaryInputArchive bia;
+        while (rocksIterator.isValid()) {
+            String key = new String(rocksIterator.key(), StandardCharsets.UTF_8);
+            String prefix = key.substring(PREFIX_STARTING_INDEX, PREFIX_ENDING_INDEX);
+            switch (prefix) {
+                case SESSION_KEY_PREFIX:
+                    deserializeSessions(sessions);
+                    break;
+                case ACL_KEY_PREFIX:
+                    deserializeACL(dt.getReferenceCountedAclCache());
+                    break;
+                case DATATREE_KEY_PREFIX:
+                    dt.deserialize(this, "tree");
+                    break;
+                default:
+                    // last processed zxid or zxid digest
+                    rocksIterator.next();
+                    break;
+            }
+        }
+        rocksIterator.close();
+
+        deserializeZxidDigest(dt);
+        if (dt.getDigestFromLoadedSnapshot() != null) {
+            dt.compareSnapshotDigests(lastProcessedZxid);
+        }
+        long elapsed = Time.currentElapsedTime() - start;
+        LOG.info("RocksDBSnap deserialization takes " + elapsed + " ms");
+        ServerMetrics.getMetrics().ROCKSDB_SNAPSHOT_DESERIALIZATION_TIME.add(elapsed);
+        return lastProcessedZxid;
+    }
+
+    public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions,
+                                       long lastZxid, boolean fsync) throws IOException {
+        if (close) {
+            return;
+        }
+        if (fsync) {
+            // take a full snapshot when snap sync with the leader
+
+            // close RocksDB for cleaning up the old snapshot,
+            // because destroyDB will fail if the RocksDB is open and locked
+            db.close();
+            // clean up the old snapshot
+            try {
+                RocksDB.destroyDB(snapDir.getAbsolutePath(), options);
+            } catch (RocksDBException e) {
+                throw new IOException("Failed to clean old data in RocksDB files: " + "error: " + e.getMessage(), e);
+            }
+            // re-open RocksDB for taking a new snapshot
+            try {
+                db = RocksDB.open(options, snapDir.getAbsolutePath());
+            } catch (RocksDBException e) {
+                throw new IOException("Failed to open RocksDB. " + "error: " + e.getMessage(), e);
+            }
+
+            LOG.info("RocksDB: Snapshotting: 0x{} to {}", Long.toHexString(lastZxid), snapDir);
+
+            updateLastProcessedZxid(lastZxid, null);
+            serializeSessions(sessions);
+            serializeACL(dt.getReferenceCountedAclCache());
+            dt.serialize(this, "tree");
+            serializeZxidDigest(dt);
+        }
+        flush();
+    }
+
+    public void flush() throws IOException {
+        long start = Time.currentElapsedTime();
+        try (final FlushOptions flushOptions = new FlushOptions().setWaitForFlush(true)) {
+            db.flush(flushOptions);
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to flush in RocksDB: " + "error: " + e.getMessage(), e);
+        }
+        long elapsed = Time.currentElapsedTime() - start;
+        ServerMetrics.getMetrics().ROCKSDB_FLUSH_TIME.add(elapsed);
+    }
+
+    public File findMostRecentSnapshot() throws IOException {
+        // In RocksDB, we always apply transactions to the current snapshot.
+        // So we only keep one single folder for the RocksDB snapshot. If
+        // this snapshot cannot be loaded because of corrupted data, we will
+        // sync with leader to get the latest data. Keeping multiple snapshots
+        // won't help here, since we still need to take snapshot syncing with
+        // the old snapshot, that's why we only keep one here.
+        return snapDir;
+    }
+
+    @SuppressWarnings("unchecked")
+    public void applyTxn(List<TransactionChangeRecord> changeList, long zxid) throws IOException {
+        // We use RocksDB WriteBatch to make atomic updates.
+        // We didn't let applying client's requests wait until flush finished because
+        // flushing 200MB of data in memtables takes nearly 1.5 seconds, flushing 500MB
+        // of data takes 4.5 seconds, and flushing 1GB of data takes more than 10 seconds.
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            // update sessions, ACL, DataTree and ZxidDigest in RocksDB
+            for (int i = 0; i < changeList.size(); i++) {
+                TransactionChangeRecord change = changeList.get(i);
+                switch (change.getType()) {
+                    case TransactionChangeRecord.DATANODE:
+                        String path = (String) change.getKey();
+                        DataNode node = (DataNode) change.getValue();
+                        String operation = change.getOperation();
+                        if (operation.equals(TransactionChangeRecord.ADD)
+                            || operation.equals(TransactionChangeRecord.UPDATE)) {
+                            addNode(path, node, writeBatch);

Review comment:
       *THREAD_SAFETY_VIOLATION:*  Read/Write race. Non-private method `RocksDBSnap.applyTxn(...)` indirectly reads without synchronization from `this.db`. Potentially races with write in method `RocksDBSnap.serialize(...)`.
    Reporting because another access to the same memory occurs on a background thread, although this access may not.

##########
File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/RocksDBSnap.java
##########
@@ -0,0 +1,547 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.persistence;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.Index;
+import org.apache.zookeeper.common.Time;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.server.DataNode;
+import org.apache.zookeeper.server.DataTree;
+import org.apache.zookeeper.server.DataTree.ZxidDigest;
+import org.apache.zookeeper.server.ReferenceCountedACLCache;
+import org.apache.zookeeper.server.ServerMetrics;
+import org.apache.zookeeper.server.TransactionChangeRecord;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements the snapshot interface.
+ * It is responsible for storing, serializing
+ * and deserializing the right snapshot in RocksDB,
+ * and provides access to the snapshots.
+ */
+public class RocksDBSnap implements SnapShot {
+    File snapDir;
+    RocksDB db;
+    Options options;
+    WriteOptions writeOpts;
+    RocksIterator rocksIterator;
+
+    private volatile boolean close = false;
+
+    private static final boolean SYNC_WRITE = false;
+    private static final boolean DISABLE_WAL = true;
+
+    //VisibleForTesting
+    public static final String ROCKSDB_WRITE_BUFFER_SIZE = "zookeeper.rocksdbWriteBufferSize";
+
+    private static final int PREFIX_STARTING_INDEX = 0;
+    private static final int PREFIX_ENDING_INDEX = 3;
+
+    private static final String SESSION_KEY_PREFIX = "S::";
+    private static final String DATATREE_KEY_PREFIX = "T::";
+    private static final String ACL_KEY_PREFIX = "A::";
+
+    private static final String ZXID_KEY = "Zxid";
+    private static final String ZXIDDIGEST_KEY = "ZxidDigest";
+
+    private static final long DEFAULT_ROCKSDB_WRITE_BUFFER_SIZE = 4096 * 1024 * 1024;
+
+    private static final Logger LOG = LoggerFactory.getLogger(RocksDBSnap.class);
+
+    /**
+     * The constructor which takes the snapDir. This class is instantiated
+     * via SnapshotFactory
+     *
+     * @param snapDir the snapshot directory
+     */
+    public RocksDBSnap(File snapDir) throws IOException {
+        RocksDB.loadLibrary();
+        if (snapDir == null) {
+            throw new IllegalArgumentException("Snap Directory can't be null!");
+        }
+
+        this.snapDir = snapDir;
+
+        long rocksdbWriteBufferSize = Long.getLong(
+                ROCKSDB_WRITE_BUFFER_SIZE, DEFAULT_ROCKSDB_WRITE_BUFFER_SIZE);
+        this.options = new Options()
+                .setCreateIfMissing(true)
+                .setCreateMissingColumnFamilies(true)
+                .setDbWriteBufferSize(rocksdbWriteBufferSize);
+
+        try {
+            this.db = RocksDB.open(options, snapDir.getAbsolutePath());
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to open RocksDB. " + "error: " + e.getMessage(), e);
+        }
+        // setting Sync = true and DisableWAL = true will lead to writes failing
+        // and throwing an exception. So we set Sync = false here and let RocksDB
+        // flush after serialization.
+        this.writeOpts = new WriteOptions().setSync(SYNC_WRITE).setDisableWAL(DISABLE_WAL);
+    }
+
+    // VisibleForTesting
+    public void initializeIterator() {
+        rocksIterator = db.newIterator();
+        rocksIterator.seekToFirst();
+    }
+
+    // VisibleForTesting
+    public void closeIterator() {
+        rocksIterator.close();
+    }
+
+    public long deserialize(DataTree dt, Map<Long, Integer> sessions)
+            throws IOException {
+        File[] files = snapDir.listFiles();
+        if (files == null || files.length == 0) {
+            LOG.info("No snapshot found in {}", snapDir.getName());
+            return -1L;
+        }
+        long lastProcessedZxid;
+        long start = Time.currentElapsedTime();
+        try {
+            byte[] zxidBytes = db.get(ZXID_KEY.getBytes(StandardCharsets.UTF_8));

Review comment:
       *THREAD_SAFETY_VIOLATION:*  Read/Write race. Non-private method `RocksDBSnap.deserialize(...)` reads without synchronization from `this.db`. Potentially races with write in method `RocksDBSnap.serialize(...)`.
    Reporting because another access to the same memory occurs on a background thread, although this access may not.

##########
File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/RocksDBSnap.java
##########
@@ -0,0 +1,547 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.persistence;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.Index;
+import org.apache.zookeeper.common.Time;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.server.DataNode;
+import org.apache.zookeeper.server.DataTree;
+import org.apache.zookeeper.server.DataTree.ZxidDigest;
+import org.apache.zookeeper.server.ReferenceCountedACLCache;
+import org.apache.zookeeper.server.ServerMetrics;
+import org.apache.zookeeper.server.TransactionChangeRecord;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements the snapshot interface.
+ * It is responsible for storing, serializing
+ * and deserializing the right snapshot in RocksDB,
+ * and provides access to the snapshots.
+ */
+public class RocksDBSnap implements SnapShot {
+    File snapDir;
+    RocksDB db;
+    Options options;
+    WriteOptions writeOpts;
+    RocksIterator rocksIterator;
+
+    private volatile boolean close = false;
+
+    private static final boolean SYNC_WRITE = false;
+    private static final boolean DISABLE_WAL = true;
+
+    //VisibleForTesting
+    public static final String ROCKSDB_WRITE_BUFFER_SIZE = "zookeeper.rocksdbWriteBufferSize";
+
+    private static final int PREFIX_STARTING_INDEX = 0;
+    private static final int PREFIX_ENDING_INDEX = 3;
+
+    private static final String SESSION_KEY_PREFIX = "S::";
+    private static final String DATATREE_KEY_PREFIX = "T::";
+    private static final String ACL_KEY_PREFIX = "A::";
+
+    private static final String ZXID_KEY = "Zxid";
+    private static final String ZXIDDIGEST_KEY = "ZxidDigest";
+
+    private static final long DEFAULT_ROCKSDB_WRITE_BUFFER_SIZE = 4096 * 1024 * 1024;
+
+    private static final Logger LOG = LoggerFactory.getLogger(RocksDBSnap.class);
+
+    /**
+     * The constructor which takes the snapDir. This class is instantiated
+     * via SnapshotFactory
+     *
+     * @param snapDir the snapshot directory
+     */
+    public RocksDBSnap(File snapDir) throws IOException {
+        RocksDB.loadLibrary();
+        if (snapDir == null) {
+            throw new IllegalArgumentException("Snap Directory can't be null!");
+        }
+
+        this.snapDir = snapDir;
+
+        long rocksdbWriteBufferSize = Long.getLong(
+                ROCKSDB_WRITE_BUFFER_SIZE, DEFAULT_ROCKSDB_WRITE_BUFFER_SIZE);
+        this.options = new Options()
+                .setCreateIfMissing(true)
+                .setCreateMissingColumnFamilies(true)
+                .setDbWriteBufferSize(rocksdbWriteBufferSize);
+
+        try {
+            this.db = RocksDB.open(options, snapDir.getAbsolutePath());
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to open RocksDB. " + "error: " + e.getMessage(), e);
+        }
+        // setting Sync = true and DisableWAL = true will lead to writes failing
+        // and throwing an exception. So we set Sync = false here and let RocksDB
+        // flush after serialization.
+        this.writeOpts = new WriteOptions().setSync(SYNC_WRITE).setDisableWAL(DISABLE_WAL);
+    }
+
+    // VisibleForTesting
+    public void initializeIterator() {
+        rocksIterator = db.newIterator();
+        rocksIterator.seekToFirst();
+    }
+
+    // VisibleForTesting
+    public void closeIterator() {
+        rocksIterator.close();
+    }
+
+    public long deserialize(DataTree dt, Map<Long, Integer> sessions)
+            throws IOException {
+        File[] files = snapDir.listFiles();
+        if (files == null || files.length == 0) {
+            LOG.info("No snapshot found in {}", snapDir.getName());
+            return -1L;
+        }
+        long lastProcessedZxid;
+        long start = Time.currentElapsedTime();
+        try {
+            byte[] zxidBytes = db.get(ZXID_KEY.getBytes(StandardCharsets.UTF_8));
+            if (zxidBytes == null) {
+                // We didn't find zxid infomation in RocksDB, which means
+                // there is no RocksDB snapshot in the snapDir.
+                LOG.info("No snapshot found in {}", snapDir.getName());
+                return -1L;
+            }
+            lastProcessedZxid = Long.parseLong(new String(zxidBytes, StandardCharsets.UTF_8));
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to deserialize last processed zxid in RocksDB. " + "error: " + e.getMessage(), e);
+        }
+        LOG.info("RocksDB: Reading snapshot 0x{} from {}", Long.toHexString(lastProcessedZxid), snapDir);
+        dt.lastProcessedZxid = lastProcessedZxid;
+
+        rocksIterator = db.newIterator();
+        rocksIterator.seekToFirst();
+        ByteArrayInputStream bais;
+        BinaryInputArchive bia;
+        while (rocksIterator.isValid()) {
+            String key = new String(rocksIterator.key(), StandardCharsets.UTF_8);
+            String prefix = key.substring(PREFIX_STARTING_INDEX, PREFIX_ENDING_INDEX);
+            switch (prefix) {
+                case SESSION_KEY_PREFIX:
+                    deserializeSessions(sessions);
+                    break;
+                case ACL_KEY_PREFIX:
+                    deserializeACL(dt.getReferenceCountedAclCache());
+                    break;
+                case DATATREE_KEY_PREFIX:
+                    dt.deserialize(this, "tree");
+                    break;
+                default:
+                    // last processed zxid or zxid digest
+                    rocksIterator.next();
+                    break;
+            }
+        }
+        rocksIterator.close();
+
+        deserializeZxidDigest(dt);
+        if (dt.getDigestFromLoadedSnapshot() != null) {
+            dt.compareSnapshotDigests(lastProcessedZxid);
+        }
+        long elapsed = Time.currentElapsedTime() - start;
+        LOG.info("RocksDBSnap deserialization takes " + elapsed + " ms");
+        ServerMetrics.getMetrics().ROCKSDB_SNAPSHOT_DESERIALIZATION_TIME.add(elapsed);
+        return lastProcessedZxid;
+    }
+
+    public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions,
+                                       long lastZxid, boolean fsync) throws IOException {
+        if (close) {
+            return;
+        }
+        if (fsync) {
+            // take a full snapshot when snap sync with the leader
+
+            // close RocksDB for cleaning up the old snapshot,
+            // because destroyDB will fail if the RocksDB is open and locked
+            db.close();
+            // clean up the old snapshot
+            try {
+                RocksDB.destroyDB(snapDir.getAbsolutePath(), options);
+            } catch (RocksDBException e) {
+                throw new IOException("Failed to clean old data in RocksDB files: " + "error: " + e.getMessage(), e);
+            }
+            // re-open RocksDB for taking a new snapshot
+            try {
+                db = RocksDB.open(options, snapDir.getAbsolutePath());
+            } catch (RocksDBException e) {
+                throw new IOException("Failed to open RocksDB. " + "error: " + e.getMessage(), e);
+            }
+
+            LOG.info("RocksDB: Snapshotting: 0x{} to {}", Long.toHexString(lastZxid), snapDir);
+
+            updateLastProcessedZxid(lastZxid, null);
+            serializeSessions(sessions);
+            serializeACL(dt.getReferenceCountedAclCache());
+            dt.serialize(this, "tree");
+            serializeZxidDigest(dt);
+        }
+        flush();
+    }
+
+    public void flush() throws IOException {
+        long start = Time.currentElapsedTime();
+        try (final FlushOptions flushOptions = new FlushOptions().setWaitForFlush(true)) {
+            db.flush(flushOptions);
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to flush in RocksDB: " + "error: " + e.getMessage(), e);
+        }
+        long elapsed = Time.currentElapsedTime() - start;
+        ServerMetrics.getMetrics().ROCKSDB_FLUSH_TIME.add(elapsed);
+    }
+
+    public File findMostRecentSnapshot() throws IOException {
+        // In RocksDB, we always apply transactions to the current snapshot.
+        // So we only keep one single folder for the RocksDB snapshot. If
+        // this snapshot cannot be loaded because of corrupted data, we will
+        // sync with leader to get the latest data. Keeping multiple snapshots
+        // won't help here, since we still need to take snapshot syncing with
+        // the old snapshot, that's why we only keep one here.
+        return snapDir;
+    }
+
+    @SuppressWarnings("unchecked")
+    public void applyTxn(List<TransactionChangeRecord> changeList, long zxid) throws IOException {
+        // We use RocksDB WriteBatch to make atomic updates.
+        // We didn't let applying client's requests wait until flush finished because
+        // flushing 200MB of data in memtables takes nearly 1.5 seconds, flushing 500MB
+        // of data takes 4.5 seconds, and flushing 1GB of data takes more than 10 seconds.
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            // update sessions, ACL, DataTree and ZxidDigest in RocksDB
+            for (int i = 0; i < changeList.size(); i++) {
+                TransactionChangeRecord change = changeList.get(i);
+                switch (change.getType()) {
+                    case TransactionChangeRecord.DATANODE:
+                        String path = (String) change.getKey();
+                        DataNode node = (DataNode) change.getValue();
+                        String operation = change.getOperation();
+                        if (operation.equals(TransactionChangeRecord.ADD)
+                            || operation.equals(TransactionChangeRecord.UPDATE)) {
+                            addNode(path, node, writeBatch);
+                        } else {
+                            removeNode(path, writeBatch);
+                        }
+                        break;
+                    case TransactionChangeRecord.ACL:
+                        Long index = (Long) change.getKey();
+                        List<ACL> aclList = (List<ACL>) change.getValue();
+                        if (change.getOperation().equals(TransactionChangeRecord.ADD)) {
+                            addACLKeyValue(index, aclList, writeBatch);
+                        } else {
+                            removeACLKeyValue(index, writeBatch);
+                        }
+                        break;
+                    case TransactionChangeRecord.SESSION:
+                        Long id = (Long) change.getKey();
+                        Integer timeout = (Integer) change.getValue();
+                        if (change.getOperation().equals(TransactionChangeRecord.ADD)) {
+                            addSessionKeyValue(id, timeout, writeBatch);
+                        } else {
+                            removeSessionKeyValue(id, writeBatch);
+                        }
+                        break;
+                    case TransactionChangeRecord.ZXIDDIGEST:
+                        ZxidDigest zxidDigest = (ZxidDigest) change.getValue();
+                        if (change.getOperation().equals(TransactionChangeRecord.UPDATE)) {
+                            updateZxidDigest(zxidDigest, writeBatch);
+                        }
+                        break;
+                    default:
+                        LOG.warn("Unknown TransactionChangeRecord type {}", change);
+                        break;
+                }
+            }
+            // update the zxid in RocksDB
+            updateLastProcessedZxid(zxid, writeBatch);
+
+            // even if RocksDB's memTable is auto flushed, we always have consistent data and zxid digest.
+            db.write(writeOpts, writeBatch);
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to apply txns to RocksDB " + "error: " + e.getMessage(), e);
+        }
+    }
+
+    private void updateLastProcessedZxid(long zxid, WriteBatch writeBatch) throws IOException {
+        try {
+            if (writeBatch != null) {
+                writeBatch.put(ZXID_KEY.getBytes(StandardCharsets.UTF_8),
+                        Long.toString(zxid).getBytes(StandardCharsets.UTF_8));
+            } else {
+                db.put(writeOpts, ZXID_KEY.getBytes(StandardCharsets.UTF_8),
+                        Long.toString(zxid).getBytes(StandardCharsets.UTF_8));
+            }
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to serialize last processed zxid in RocksDB. "
+                    + "error: " + e.getMessage(), e);
+        }
+    }
+
+    public void serializeSessions(Map<Long, Integer> sessions) throws IOException {
+        HashMap<Long, Integer> sessSnap = new HashMap<Long, Integer>(sessions);
+        for (Entry<Long, Integer> entry : sessSnap.entrySet()) {
+            addSessionKeyValue(entry.getKey(), entry.getValue(), null);
+        }
+    }
+
+    private void addSessionKeyValue(Long id, Integer timeout, WriteBatch writeBatch) throws IOException {
+        try {
+            String key = SESSION_KEY_PREFIX + id;
+            if (writeBatch != null) {
+                writeBatch.put(key.getBytes(StandardCharsets.UTF_8),
+                        timeout.toString().getBytes(StandardCharsets.UTF_8));
+            } else {
+                db.put(writeOpts, key.getBytes(StandardCharsets.UTF_8),
+                        timeout.toString().getBytes(StandardCharsets.UTF_8));
+            }
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to serialize sessions in RocksDB " + "error: " + e.getMessage(), e);
+        }
+    }
+
+    private void removeSessionKeyValue(Long id, WriteBatch writeBatch) throws IOException {
+        try {
+            String key = SESSION_KEY_PREFIX + id;
+            writeBatch.delete(key.getBytes(StandardCharsets.UTF_8));
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to delete the session in RocksDB " + "error: " + e.getMessage(), e);
+        }
+    }
+
+    public void deserializeSessions(Map<Long, Integer> sessions) throws IOException {
+        while (rocksIterator.isValid()) {
+            String key = new String(rocksIterator.key(), StandardCharsets.UTF_8);
+            if (!key.startsWith(SESSION_KEY_PREFIX)) {
+                break;
+            }
+            key = key.substring(PREFIX_ENDING_INDEX);
+            long id = Long.parseLong(key);
+            int to = Integer.parseInt(new String(rocksIterator.value(), StandardCharsets.UTF_8));
+            sessions.put(id, to);
+            rocksIterator.next();
+        }
+    }
+
+    public synchronized void serializeACL(ReferenceCountedACLCache aclCache) throws IOException {
+        Set<Map.Entry<Long, List<ACL>>> set = aclCache.getLongKeyMap().entrySet();
+        for (Map.Entry<Long, List<ACL>> val : set) {
+            addACLKeyValue(val.getKey(), val.getValue(), null);
+        }
+    }
+
+    private void addACLKeyValue(Long index, List<ACL> aclList, WriteBatch writeBatch) throws IOException {
+        String key = ACL_KEY_PREFIX + index;
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
+        boa.startVector(aclList, "acls");
+        for (ACL acl : aclList) {
+            acl.serialize(boa, "acl");
+        }
+        boa.endVector(aclList, "acls");
+        ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
+        try {
+            if (writeBatch != null) {
+                writeBatch.put(key.getBytes(StandardCharsets.UTF_8), bb.array());
+            } else {
+                db.put(writeOpts, key.getBytes(StandardCharsets.UTF_8), bb.array());
+            }
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to serialize ACL lists in RocksDB " + "error: " + e.getMessage(), e);
+        }
+    }
+
+    private void removeACLKeyValue(Long index, WriteBatch writeBatch) throws IOException {
+        try {
+            String key = ACL_KEY_PREFIX + index;
+            writeBatch.delete(key.getBytes(StandardCharsets.UTF_8));
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to delete the ACL list in RocksDB " + "error: " + e.getMessage(), e);
+        }
+    }
+
+    public synchronized void deserializeACL(ReferenceCountedACLCache aclCache) throws IOException {
+        aclCache.clear();
+        ByteArrayInputStream bais;
+        BinaryInputArchive bia;
+        while (rocksIterator.isValid()) {
+            String key = new String(rocksIterator.key(), StandardCharsets.UTF_8);
+            if (!key.startsWith(ACL_KEY_PREFIX)) {
+                break;
+            }
+            key = key.substring(PREFIX_ENDING_INDEX);
+            long val = Long.parseLong(key);
+            List<ACL> aclList = new ArrayList<ACL>();
+            bais = new ByteArrayInputStream(rocksIterator.value());
+            bia = BinaryInputArchive.getArchive(bais);
+            Index j = bia.startVector("acls");
+            if (j == null) {
+                throw new RuntimeException("Incorrent format of InputArchive when deserialize DataTree - missing acls");
+            }
+            while (!j.done()) {
+                ACL acl = new ACL();
+                acl.deserialize(bia, "acl");
+                aclList.add(acl);
+                j.incr();
+            }
+            aclCache.updateMaps(val, aclList);
+            rocksIterator.next();
+        }
+    }
+
+    public void writeNode(String pathString, DataNode node) throws IOException {
+        addNode(pathString, node, null);
+    }
+
+    private void addNode(String pathString, DataNode node, WriteBatch writeBatch) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
+        boa.writeRecord(node, "node");
+        ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
+        try {
+            pathString = DATATREE_KEY_PREFIX + pathString;
+            if (writeBatch != null) {
+                writeBatch.put(pathString.getBytes(StandardCharsets.UTF_8), bb.array());
+            } else {
+                db.put(writeOpts, pathString.getBytes(StandardCharsets.UTF_8), bb.array());
+            }
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to serialize data node in RocksDB " + "error: " + e.getMessage(), e);
+        }
+    }
+
+    private void removeNode(String pathString, WriteBatch writeBatch) throws IOException {
+        try {
+            pathString = DATATREE_KEY_PREFIX + pathString;
+            writeBatch.delete(pathString.getBytes(StandardCharsets.UTF_8));
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to delete the data node in RocksDB " + "error: " + e.getMessage(), e);
+        }
+    }
+
+    public void markEnd() throws IOException {
+        // nothing needs to be done here when taking a snapshot in RocksDB
+    }
+
+    public String readNode(DataNode node) throws IOException {
+        if (!rocksIterator.isValid()) {
+            // finished iterating over all data nodes in RocksDB snapshot
+            return "/";
+        }
+        String path = new String(rocksIterator.key(), StandardCharsets.UTF_8);
+        if (!path.startsWith(DATATREE_KEY_PREFIX)) {
+            // finished iterating over all data nodes in RocksDB snapshot
+            return "/";
+        }
+        path = path.substring(PREFIX_ENDING_INDEX);
+        ByteArrayInputStream bais = new ByteArrayInputStream(rocksIterator.value());
+        BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
+        bia.readRecord(node, "node");
+        rocksIterator.next();
+        return path;
+    }
+
+    public boolean serializeZxidDigest(DataTree dt) throws IOException {
+        if (dt.nodesDigestEnabled()) {
+            ZxidDigest zxidDigest = dt.getLastProcessedZxidDigest();
+            if (zxidDigest == null) {
+                zxidDigest = dt.getBlankDigest();
+            }
+            updateZxidDigest(zxidDigest, null);
+            return true;
+        }
+        return false;
+    }
+
+    private void updateZxidDigest(ZxidDigest zxidDigest, WriteBatch writeBatch) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
+        zxidDigest.serialize(boa);
+        ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
+        try {
+            if (writeBatch != null) {
+                writeBatch.put(ZXIDDIGEST_KEY.getBytes(StandardCharsets.UTF_8), bb.array());
+            } else {
+                db.put(writeOpts, ZXIDDIGEST_KEY.getBytes(StandardCharsets.UTF_8), bb.array());
+            }
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to serialize zxid digest in RocksDB " + "error: " + e.getMessage(), e);
+        }
+    }
+
+    public boolean deserializeZxidDigest(DataTree dt) throws IOException {
+        if (dt.nodesDigestEnabled()) {
+            try {
+                byte[] zxidDigestBytes = db.get(ZXIDDIGEST_KEY.getBytes(StandardCharsets.UTF_8));

Review comment:
       *THREAD_SAFETY_VIOLATION:*  Read/Write race. Non-private method `RocksDBSnap.deserializeZxidDigest(...)` reads without synchronization from `this.db`. Potentially races with write in method `RocksDBSnap.serialize(...)`.
    Reporting because another access to the same memory occurs on a background thread, although this access may not.

##########
File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/RocksDBSnap.java
##########
@@ -0,0 +1,547 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.persistence;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.Index;
+import org.apache.zookeeper.common.Time;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.server.DataNode;
+import org.apache.zookeeper.server.DataTree;
+import org.apache.zookeeper.server.DataTree.ZxidDigest;
+import org.apache.zookeeper.server.ReferenceCountedACLCache;
+import org.apache.zookeeper.server.ServerMetrics;
+import org.apache.zookeeper.server.TransactionChangeRecord;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements the snapshot interface.
+ * It is responsible for storing, serializing
+ * and deserializing the right snapshot in RocksDB,
+ * and provides access to the snapshots.
+ */
+public class RocksDBSnap implements SnapShot {
+    File snapDir;
+    RocksDB db;
+    Options options;
+    WriteOptions writeOpts;
+    RocksIterator rocksIterator;
+
+    private volatile boolean close = false;
+
+    private static final boolean SYNC_WRITE = false;
+    private static final boolean DISABLE_WAL = true;
+
+    //VisibleForTesting
+    public static final String ROCKSDB_WRITE_BUFFER_SIZE = "zookeeper.rocksdbWriteBufferSize";
+
+    private static final int PREFIX_STARTING_INDEX = 0;
+    private static final int PREFIX_ENDING_INDEX = 3;
+
+    private static final String SESSION_KEY_PREFIX = "S::";
+    private static final String DATATREE_KEY_PREFIX = "T::";
+    private static final String ACL_KEY_PREFIX = "A::";
+
+    private static final String ZXID_KEY = "Zxid";
+    private static final String ZXIDDIGEST_KEY = "ZxidDigest";
+
+    private static final long DEFAULT_ROCKSDB_WRITE_BUFFER_SIZE = 4096 * 1024 * 1024;
+
+    private static final Logger LOG = LoggerFactory.getLogger(RocksDBSnap.class);
+
+    /**
+     * The constructor which takes the snapDir. This class is instantiated
+     * via SnapshotFactory
+     *
+     * @param snapDir the snapshot directory
+     */
+    public RocksDBSnap(File snapDir) throws IOException {
+        RocksDB.loadLibrary();
+        if (snapDir == null) {
+            throw new IllegalArgumentException("Snap Directory can't be null!");
+        }
+
+        this.snapDir = snapDir;
+
+        long rocksdbWriteBufferSize = Long.getLong(
+                ROCKSDB_WRITE_BUFFER_SIZE, DEFAULT_ROCKSDB_WRITE_BUFFER_SIZE);
+        this.options = new Options()
+                .setCreateIfMissing(true)
+                .setCreateMissingColumnFamilies(true)
+                .setDbWriteBufferSize(rocksdbWriteBufferSize);
+
+        try {
+            this.db = RocksDB.open(options, snapDir.getAbsolutePath());
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to open RocksDB. " + "error: " + e.getMessage(), e);
+        }
+        // setting Sync = true and DisableWAL = true will lead to writes failing
+        // and throwing an exception. So we set Sync = false here and let RocksDB
+        // flush after serialization.
+        this.writeOpts = new WriteOptions().setSync(SYNC_WRITE).setDisableWAL(DISABLE_WAL);
+    }
+
+    // VisibleForTesting
+    public void initializeIterator() {
+        rocksIterator = db.newIterator();
+        rocksIterator.seekToFirst();
+    }
+
+    // VisibleForTesting
+    public void closeIterator() {
+        rocksIterator.close();
+    }
+
+    public long deserialize(DataTree dt, Map<Long, Integer> sessions)
+            throws IOException {
+        File[] files = snapDir.listFiles();
+        if (files == null || files.length == 0) {
+            LOG.info("No snapshot found in {}", snapDir.getName());
+            return -1L;
+        }
+        long lastProcessedZxid;
+        long start = Time.currentElapsedTime();
+        try {
+            byte[] zxidBytes = db.get(ZXID_KEY.getBytes(StandardCharsets.UTF_8));
+            if (zxidBytes == null) {
+                // We didn't find zxid infomation in RocksDB, which means
+                // there is no RocksDB snapshot in the snapDir.
+                LOG.info("No snapshot found in {}", snapDir.getName());
+                return -1L;
+            }
+            lastProcessedZxid = Long.parseLong(new String(zxidBytes, StandardCharsets.UTF_8));
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to deserialize last processed zxid in RocksDB. " + "error: " + e.getMessage(), e);
+        }
+        LOG.info("RocksDB: Reading snapshot 0x{} from {}", Long.toHexString(lastProcessedZxid), snapDir);
+        dt.lastProcessedZxid = lastProcessedZxid;
+
+        rocksIterator = db.newIterator();
+        rocksIterator.seekToFirst();
+        ByteArrayInputStream bais;
+        BinaryInputArchive bia;
+        while (rocksIterator.isValid()) {
+            String key = new String(rocksIterator.key(), StandardCharsets.UTF_8);
+            String prefix = key.substring(PREFIX_STARTING_INDEX, PREFIX_ENDING_INDEX);
+            switch (prefix) {
+                case SESSION_KEY_PREFIX:
+                    deserializeSessions(sessions);
+                    break;
+                case ACL_KEY_PREFIX:
+                    deserializeACL(dt.getReferenceCountedAclCache());
+                    break;
+                case DATATREE_KEY_PREFIX:
+                    dt.deserialize(this, "tree");
+                    break;
+                default:
+                    // last processed zxid or zxid digest
+                    rocksIterator.next();
+                    break;
+            }
+        }
+        rocksIterator.close();
+
+        deserializeZxidDigest(dt);
+        if (dt.getDigestFromLoadedSnapshot() != null) {
+            dt.compareSnapshotDigests(lastProcessedZxid);
+        }
+        long elapsed = Time.currentElapsedTime() - start;
+        LOG.info("RocksDBSnap deserialization takes " + elapsed + " ms");
+        ServerMetrics.getMetrics().ROCKSDB_SNAPSHOT_DESERIALIZATION_TIME.add(elapsed);
+        return lastProcessedZxid;
+    }
+
+    public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions,
+                                       long lastZxid, boolean fsync) throws IOException {
+        if (close) {
+            return;
+        }
+        if (fsync) {
+            // take a full snapshot when snap sync with the leader
+
+            // close RocksDB for cleaning up the old snapshot,
+            // because destroyDB will fail if the RocksDB is open and locked
+            db.close();
+            // clean up the old snapshot
+            try {
+                RocksDB.destroyDB(snapDir.getAbsolutePath(), options);
+            } catch (RocksDBException e) {
+                throw new IOException("Failed to clean old data in RocksDB files: " + "error: " + e.getMessage(), e);
+            }
+            // re-open RocksDB for taking a new snapshot
+            try {
+                db = RocksDB.open(options, snapDir.getAbsolutePath());
+            } catch (RocksDBException e) {
+                throw new IOException("Failed to open RocksDB. " + "error: " + e.getMessage(), e);
+            }
+
+            LOG.info("RocksDB: Snapshotting: 0x{} to {}", Long.toHexString(lastZxid), snapDir);
+
+            updateLastProcessedZxid(lastZxid, null);
+            serializeSessions(sessions);
+            serializeACL(dt.getReferenceCountedAclCache());
+            dt.serialize(this, "tree");
+            serializeZxidDigest(dt);
+        }
+        flush();
+    }
+
+    public void flush() throws IOException {
+        long start = Time.currentElapsedTime();
+        try (final FlushOptions flushOptions = new FlushOptions().setWaitForFlush(true)) {
+            db.flush(flushOptions);

Review comment:
       *THREAD_SAFETY_VIOLATION:*  Read/Write race. Non-private method `RocksDBSnap.flush()` reads without synchronization from `this.db`. Potentially races with write in method `RocksDBSnap.serialize(...)`.
    Reporting because another access to the same memory occurs on a background thread, although this access may not.

##########
File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/RocksDBSnap.java
##########
@@ -0,0 +1,547 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.persistence;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.Index;
+import org.apache.zookeeper.common.Time;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.server.DataNode;
+import org.apache.zookeeper.server.DataTree;
+import org.apache.zookeeper.server.DataTree.ZxidDigest;
+import org.apache.zookeeper.server.ReferenceCountedACLCache;
+import org.apache.zookeeper.server.ServerMetrics;
+import org.apache.zookeeper.server.TransactionChangeRecord;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements the snapshot interface.
+ * It is responsible for storing, serializing
+ * and deserializing the right snapshot in RocksDB,
+ * and provides access to the snapshots.
+ */
+public class RocksDBSnap implements SnapShot {
+    File snapDir;
+    RocksDB db;
+    Options options;
+    WriteOptions writeOpts;
+    RocksIterator rocksIterator;
+
+    private volatile boolean close = false;
+
+    private static final boolean SYNC_WRITE = false;
+    private static final boolean DISABLE_WAL = true;
+
+    //VisibleForTesting
+    public static final String ROCKSDB_WRITE_BUFFER_SIZE = "zookeeper.rocksdbWriteBufferSize";
+
+    private static final int PREFIX_STARTING_INDEX = 0;
+    private static final int PREFIX_ENDING_INDEX = 3;
+
+    private static final String SESSION_KEY_PREFIX = "S::";
+    private static final String DATATREE_KEY_PREFIX = "T::";
+    private static final String ACL_KEY_PREFIX = "A::";
+
+    private static final String ZXID_KEY = "Zxid";
+    private static final String ZXIDDIGEST_KEY = "ZxidDigest";
+
+    private static final long DEFAULT_ROCKSDB_WRITE_BUFFER_SIZE = 4096 * 1024 * 1024;
+
+    private static final Logger LOG = LoggerFactory.getLogger(RocksDBSnap.class);
+
+    /**
+     * The constructor which takes the snapDir. This class is instantiated
+     * via SnapshotFactory
+     *
+     * @param snapDir the snapshot directory
+     */
+    public RocksDBSnap(File snapDir) throws IOException {
+        RocksDB.loadLibrary();
+        if (snapDir == null) {
+            throw new IllegalArgumentException("Snap Directory can't be null!");
+        }
+
+        this.snapDir = snapDir;
+
+        long rocksdbWriteBufferSize = Long.getLong(
+                ROCKSDB_WRITE_BUFFER_SIZE, DEFAULT_ROCKSDB_WRITE_BUFFER_SIZE);
+        this.options = new Options()
+                .setCreateIfMissing(true)
+                .setCreateMissingColumnFamilies(true)
+                .setDbWriteBufferSize(rocksdbWriteBufferSize);
+
+        try {
+            this.db = RocksDB.open(options, snapDir.getAbsolutePath());
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to open RocksDB. " + "error: " + e.getMessage(), e);
+        }
+        // setting Sync = true and DisableWAL = true will lead to writes failing
+        // and throwing an exception. So we set Sync = false here and let RocksDB
+        // flush after serialization.
+        this.writeOpts = new WriteOptions().setSync(SYNC_WRITE).setDisableWAL(DISABLE_WAL);
+    }
+
+    // VisibleForTesting
+    public void initializeIterator() {
+        rocksIterator = db.newIterator();

Review comment:
       *THREAD_SAFETY_VIOLATION:*  Read/Write race. Non-private method `RocksDBSnap.initializeIterator()` reads without synchronization from `this.db`. Potentially races with write in method `RocksDBSnap.serialize(...)`.
    Reporting because another access to the same memory occurs on a background thread, although this access may not.

##########
File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/RocksDBSnap.java
##########
@@ -0,0 +1,547 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.persistence;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.Index;
+import org.apache.zookeeper.common.Time;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.server.DataNode;
+import org.apache.zookeeper.server.DataTree;
+import org.apache.zookeeper.server.DataTree.ZxidDigest;
+import org.apache.zookeeper.server.ReferenceCountedACLCache;
+import org.apache.zookeeper.server.ServerMetrics;
+import org.apache.zookeeper.server.TransactionChangeRecord;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements the snapshot interface.
+ * It is responsible for storing, serializing
+ * and deserializing the right snapshot in RocksDB,
+ * and provides access to the snapshots.
+ */
+public class RocksDBSnap implements SnapShot {
+    File snapDir;
+    RocksDB db;
+    Options options;
+    WriteOptions writeOpts;
+    RocksIterator rocksIterator;
+
+    private volatile boolean close = false;
+
+    private static final boolean SYNC_WRITE = false;
+    private static final boolean DISABLE_WAL = true;
+
+    //VisibleForTesting
+    public static final String ROCKSDB_WRITE_BUFFER_SIZE = "zookeeper.rocksdbWriteBufferSize";
+
+    private static final int PREFIX_STARTING_INDEX = 0;
+    private static final int PREFIX_ENDING_INDEX = 3;
+
+    private static final String SESSION_KEY_PREFIX = "S::";
+    private static final String DATATREE_KEY_PREFIX = "T::";
+    private static final String ACL_KEY_PREFIX = "A::";
+
+    private static final String ZXID_KEY = "Zxid";
+    private static final String ZXIDDIGEST_KEY = "ZxidDigest";
+
+    private static final long DEFAULT_ROCKSDB_WRITE_BUFFER_SIZE = 4096 * 1024 * 1024;
+
+    private static final Logger LOG = LoggerFactory.getLogger(RocksDBSnap.class);
+
+    /**
+     * The constructor which takes the snapDir. This class is instantiated
+     * via SnapshotFactory
+     *
+     * @param snapDir the snapshot directory
+     */
+    public RocksDBSnap(File snapDir) throws IOException {
+        RocksDB.loadLibrary();
+        if (snapDir == null) {
+            throw new IllegalArgumentException("Snap Directory can't be null!");
+        }
+
+        this.snapDir = snapDir;
+
+        long rocksdbWriteBufferSize = Long.getLong(
+                ROCKSDB_WRITE_BUFFER_SIZE, DEFAULT_ROCKSDB_WRITE_BUFFER_SIZE);
+        this.options = new Options()
+                .setCreateIfMissing(true)
+                .setCreateMissingColumnFamilies(true)
+                .setDbWriteBufferSize(rocksdbWriteBufferSize);
+
+        try {
+            this.db = RocksDB.open(options, snapDir.getAbsolutePath());
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to open RocksDB. " + "error: " + e.getMessage(), e);
+        }
+        // setting Sync = true and DisableWAL = true will lead to writes failing
+        // and throwing an exception. So we set Sync = false here and let RocksDB
+        // flush after serialization.
+        this.writeOpts = new WriteOptions().setSync(SYNC_WRITE).setDisableWAL(DISABLE_WAL);
+    }
+
+    // VisibleForTesting
+    public void initializeIterator() {
+        rocksIterator = db.newIterator();
+        rocksIterator.seekToFirst();
+    }
+
+    // VisibleForTesting
+    public void closeIterator() {
+        rocksIterator.close();
+    }
+
+    public long deserialize(DataTree dt, Map<Long, Integer> sessions)
+            throws IOException {
+        File[] files = snapDir.listFiles();
+        if (files == null || files.length == 0) {
+            LOG.info("No snapshot found in {}", snapDir.getName());
+            return -1L;
+        }
+        long lastProcessedZxid;
+        long start = Time.currentElapsedTime();
+        try {
+            byte[] zxidBytes = db.get(ZXID_KEY.getBytes(StandardCharsets.UTF_8));
+            if (zxidBytes == null) {
+                // We didn't find zxid infomation in RocksDB, which means
+                // there is no RocksDB snapshot in the snapDir.
+                LOG.info("No snapshot found in {}", snapDir.getName());
+                return -1L;
+            }
+            lastProcessedZxid = Long.parseLong(new String(zxidBytes, StandardCharsets.UTF_8));
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to deserialize last processed zxid in RocksDB. " + "error: " + e.getMessage(), e);
+        }
+        LOG.info("RocksDB: Reading snapshot 0x{} from {}", Long.toHexString(lastProcessedZxid), snapDir);
+        dt.lastProcessedZxid = lastProcessedZxid;
+
+        rocksIterator = db.newIterator();
+        rocksIterator.seekToFirst();
+        ByteArrayInputStream bais;
+        BinaryInputArchive bia;
+        while (rocksIterator.isValid()) {
+            String key = new String(rocksIterator.key(), StandardCharsets.UTF_8);
+            String prefix = key.substring(PREFIX_STARTING_INDEX, PREFIX_ENDING_INDEX);
+            switch (prefix) {
+                case SESSION_KEY_PREFIX:
+                    deserializeSessions(sessions);
+                    break;
+                case ACL_KEY_PREFIX:
+                    deserializeACL(dt.getReferenceCountedAclCache());
+                    break;
+                case DATATREE_KEY_PREFIX:
+                    dt.deserialize(this, "tree");
+                    break;
+                default:
+                    // last processed zxid or zxid digest
+                    rocksIterator.next();
+                    break;
+            }
+        }
+        rocksIterator.close();
+
+        deserializeZxidDigest(dt);
+        if (dt.getDigestFromLoadedSnapshot() != null) {
+            dt.compareSnapshotDigests(lastProcessedZxid);
+        }
+        long elapsed = Time.currentElapsedTime() - start;
+        LOG.info("RocksDBSnap deserialization takes " + elapsed + " ms");
+        ServerMetrics.getMetrics().ROCKSDB_SNAPSHOT_DESERIALIZATION_TIME.add(elapsed);
+        return lastProcessedZxid;
+    }
+
+    public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions,
+                                       long lastZxid, boolean fsync) throws IOException {
+        if (close) {
+            return;
+        }
+        if (fsync) {
+            // take a full snapshot when snap sync with the leader
+
+            // close RocksDB for cleaning up the old snapshot,
+            // because destroyDB will fail if the RocksDB is open and locked
+            db.close();
+            // clean up the old snapshot
+            try {
+                RocksDB.destroyDB(snapDir.getAbsolutePath(), options);
+            } catch (RocksDBException e) {
+                throw new IOException("Failed to clean old data in RocksDB files: " + "error: " + e.getMessage(), e);
+            }
+            // re-open RocksDB for taking a new snapshot
+            try {
+                db = RocksDB.open(options, snapDir.getAbsolutePath());
+            } catch (RocksDBException e) {
+                throw new IOException("Failed to open RocksDB. " + "error: " + e.getMessage(), e);
+            }
+
+            LOG.info("RocksDB: Snapshotting: 0x{} to {}", Long.toHexString(lastZxid), snapDir);
+
+            updateLastProcessedZxid(lastZxid, null);
+            serializeSessions(sessions);
+            serializeACL(dt.getReferenceCountedAclCache());
+            dt.serialize(this, "tree");
+            serializeZxidDigest(dt);
+        }
+        flush();
+    }
+
+    public void flush() throws IOException {
+        long start = Time.currentElapsedTime();
+        try (final FlushOptions flushOptions = new FlushOptions().setWaitForFlush(true)) {
+            db.flush(flushOptions);
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to flush in RocksDB: " + "error: " + e.getMessage(), e);
+        }
+        long elapsed = Time.currentElapsedTime() - start;
+        ServerMetrics.getMetrics().ROCKSDB_FLUSH_TIME.add(elapsed);
+    }
+
+    public File findMostRecentSnapshot() throws IOException {
+        // In RocksDB, we always apply transactions to the current snapshot.
+        // So we only keep one single folder for the RocksDB snapshot. If
+        // this snapshot cannot be loaded because of corrupted data, we will
+        // sync with leader to get the latest data. Keeping multiple snapshots
+        // won't help here, since we still need to take snapshot syncing with
+        // the old snapshot, that's why we only keep one here.
+        return snapDir;
+    }
+
+    @SuppressWarnings("unchecked")
+    public void applyTxn(List<TransactionChangeRecord> changeList, long zxid) throws IOException {
+        // We use RocksDB WriteBatch to make atomic updates.
+        // We didn't let applying client's requests wait until flush finished because
+        // flushing 200MB of data in memtables takes nearly 1.5 seconds, flushing 500MB
+        // of data takes 4.5 seconds, and flushing 1GB of data takes more than 10 seconds.
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            // update sessions, ACL, DataTree and ZxidDigest in RocksDB
+            for (int i = 0; i < changeList.size(); i++) {
+                TransactionChangeRecord change = changeList.get(i);
+                switch (change.getType()) {
+                    case TransactionChangeRecord.DATANODE:
+                        String path = (String) change.getKey();
+                        DataNode node = (DataNode) change.getValue();
+                        String operation = change.getOperation();
+                        if (operation.equals(TransactionChangeRecord.ADD)
+                            || operation.equals(TransactionChangeRecord.UPDATE)) {
+                            addNode(path, node, writeBatch);
+                        } else {
+                            removeNode(path, writeBatch);
+                        }
+                        break;
+                    case TransactionChangeRecord.ACL:
+                        Long index = (Long) change.getKey();
+                        List<ACL> aclList = (List<ACL>) change.getValue();
+                        if (change.getOperation().equals(TransactionChangeRecord.ADD)) {
+                            addACLKeyValue(index, aclList, writeBatch);
+                        } else {
+                            removeACLKeyValue(index, writeBatch);
+                        }
+                        break;
+                    case TransactionChangeRecord.SESSION:
+                        Long id = (Long) change.getKey();
+                        Integer timeout = (Integer) change.getValue();
+                        if (change.getOperation().equals(TransactionChangeRecord.ADD)) {
+                            addSessionKeyValue(id, timeout, writeBatch);
+                        } else {
+                            removeSessionKeyValue(id, writeBatch);
+                        }
+                        break;
+                    case TransactionChangeRecord.ZXIDDIGEST:
+                        ZxidDigest zxidDigest = (ZxidDigest) change.getValue();
+                        if (change.getOperation().equals(TransactionChangeRecord.UPDATE)) {
+                            updateZxidDigest(zxidDigest, writeBatch);
+                        }
+                        break;
+                    default:
+                        LOG.warn("Unknown TransactionChangeRecord type {}", change);
+                        break;
+                }
+            }
+            // update the zxid in RocksDB
+            updateLastProcessedZxid(zxid, writeBatch);
+
+            // even if RocksDB's memTable is auto flushed, we always have consistent data and zxid digest.
+            db.write(writeOpts, writeBatch);
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to apply txns to RocksDB " + "error: " + e.getMessage(), e);
+        }
+    }
+
+    private void updateLastProcessedZxid(long zxid, WriteBatch writeBatch) throws IOException {
+        try {
+            if (writeBatch != null) {
+                writeBatch.put(ZXID_KEY.getBytes(StandardCharsets.UTF_8),
+                        Long.toString(zxid).getBytes(StandardCharsets.UTF_8));
+            } else {
+                db.put(writeOpts, ZXID_KEY.getBytes(StandardCharsets.UTF_8),
+                        Long.toString(zxid).getBytes(StandardCharsets.UTF_8));
+            }
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to serialize last processed zxid in RocksDB. "
+                    + "error: " + e.getMessage(), e);
+        }
+    }
+
+    public void serializeSessions(Map<Long, Integer> sessions) throws IOException {
+        HashMap<Long, Integer> sessSnap = new HashMap<Long, Integer>(sessions);
+        for (Entry<Long, Integer> entry : sessSnap.entrySet()) {
+            addSessionKeyValue(entry.getKey(), entry.getValue(), null);

Review comment:
       *THREAD_SAFETY_VIOLATION:*  Read/Write race. Non-private method `RocksDBSnap.serializeSessions(...)` indirectly reads without synchronization from `this.db`. Potentially races with write in method `RocksDBSnap.serialize(...)`.
    Reporting because another access to the same memory occurs on a background thread, although this access may not.

##########
File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/RocksDBSnap.java
##########
@@ -0,0 +1,547 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.persistence;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.Index;
+import org.apache.zookeeper.common.Time;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.server.DataNode;
+import org.apache.zookeeper.server.DataTree;
+import org.apache.zookeeper.server.DataTree.ZxidDigest;
+import org.apache.zookeeper.server.ReferenceCountedACLCache;
+import org.apache.zookeeper.server.ServerMetrics;
+import org.apache.zookeeper.server.TransactionChangeRecord;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements the snapshot interface.
+ * It is responsible for storing, serializing
+ * and deserializing the right snapshot in RocksDB,
+ * and provides access to the snapshots.
+ */
+public class RocksDBSnap implements SnapShot {
+    File snapDir;
+    RocksDB db;
+    Options options;
+    WriteOptions writeOpts;
+    RocksIterator rocksIterator;
+
+    private volatile boolean close = false;
+
+    private static final boolean SYNC_WRITE = false;
+    private static final boolean DISABLE_WAL = true;
+
+    //VisibleForTesting
+    public static final String ROCKSDB_WRITE_BUFFER_SIZE = "zookeeper.rocksdbWriteBufferSize";
+
+    private static final int PREFIX_STARTING_INDEX = 0;
+    private static final int PREFIX_ENDING_INDEX = 3;
+
+    private static final String SESSION_KEY_PREFIX = "S::";
+    private static final String DATATREE_KEY_PREFIX = "T::";
+    private static final String ACL_KEY_PREFIX = "A::";
+
+    private static final String ZXID_KEY = "Zxid";
+    private static final String ZXIDDIGEST_KEY = "ZxidDigest";
+
+    private static final long DEFAULT_ROCKSDB_WRITE_BUFFER_SIZE = 4096 * 1024 * 1024;
+
+    private static final Logger LOG = LoggerFactory.getLogger(RocksDBSnap.class);
+
+    /**
+     * The constructor which takes the snapDir. This class is instantiated
+     * via SnapshotFactory
+     *
+     * @param snapDir the snapshot directory
+     */
+    public RocksDBSnap(File snapDir) throws IOException {
+        RocksDB.loadLibrary();
+        if (snapDir == null) {
+            throw new IllegalArgumentException("Snap Directory can't be null!");
+        }
+
+        this.snapDir = snapDir;
+
+        long rocksdbWriteBufferSize = Long.getLong(
+                ROCKSDB_WRITE_BUFFER_SIZE, DEFAULT_ROCKSDB_WRITE_BUFFER_SIZE);
+        this.options = new Options()
+                .setCreateIfMissing(true)
+                .setCreateMissingColumnFamilies(true)
+                .setDbWriteBufferSize(rocksdbWriteBufferSize);
+
+        try {
+            this.db = RocksDB.open(options, snapDir.getAbsolutePath());
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to open RocksDB. " + "error: " + e.getMessage(), e);
+        }
+        // setting Sync = true and DisableWAL = true will lead to writes failing
+        // and throwing an exception. So we set Sync = false here and let RocksDB
+        // flush after serialization.
+        this.writeOpts = new WriteOptions().setSync(SYNC_WRITE).setDisableWAL(DISABLE_WAL);
+    }
+
+    // VisibleForTesting
+    public void initializeIterator() {
+        rocksIterator = db.newIterator();
+        rocksIterator.seekToFirst();
+    }
+
+    // VisibleForTesting
+    public void closeIterator() {
+        rocksIterator.close();
+    }
+
+    public long deserialize(DataTree dt, Map<Long, Integer> sessions)
+            throws IOException {
+        File[] files = snapDir.listFiles();
+        if (files == null || files.length == 0) {
+            LOG.info("No snapshot found in {}", snapDir.getName());
+            return -1L;
+        }
+        long lastProcessedZxid;
+        long start = Time.currentElapsedTime();
+        try {
+            byte[] zxidBytes = db.get(ZXID_KEY.getBytes(StandardCharsets.UTF_8));
+            if (zxidBytes == null) {
+                // We didn't find zxid infomation in RocksDB, which means
+                // there is no RocksDB snapshot in the snapDir.
+                LOG.info("No snapshot found in {}", snapDir.getName());
+                return -1L;
+            }
+            lastProcessedZxid = Long.parseLong(new String(zxidBytes, StandardCharsets.UTF_8));
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to deserialize last processed zxid in RocksDB. " + "error: " + e.getMessage(), e);
+        }
+        LOG.info("RocksDB: Reading snapshot 0x{} from {}", Long.toHexString(lastProcessedZxid), snapDir);
+        dt.lastProcessedZxid = lastProcessedZxid;
+
+        rocksIterator = db.newIterator();
+        rocksIterator.seekToFirst();
+        ByteArrayInputStream bais;
+        BinaryInputArchive bia;
+        while (rocksIterator.isValid()) {
+            String key = new String(rocksIterator.key(), StandardCharsets.UTF_8);
+            String prefix = key.substring(PREFIX_STARTING_INDEX, PREFIX_ENDING_INDEX);
+            switch (prefix) {
+                case SESSION_KEY_PREFIX:
+                    deserializeSessions(sessions);
+                    break;
+                case ACL_KEY_PREFIX:
+                    deserializeACL(dt.getReferenceCountedAclCache());
+                    break;
+                case DATATREE_KEY_PREFIX:
+                    dt.deserialize(this, "tree");
+                    break;
+                default:
+                    // last processed zxid or zxid digest
+                    rocksIterator.next();
+                    break;
+            }
+        }
+        rocksIterator.close();
+
+        deserializeZxidDigest(dt);
+        if (dt.getDigestFromLoadedSnapshot() != null) {
+            dt.compareSnapshotDigests(lastProcessedZxid);
+        }
+        long elapsed = Time.currentElapsedTime() - start;
+        LOG.info("RocksDBSnap deserialization takes " + elapsed + " ms");
+        ServerMetrics.getMetrics().ROCKSDB_SNAPSHOT_DESERIALIZATION_TIME.add(elapsed);
+        return lastProcessedZxid;
+    }
+
+    public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions,
+                                       long lastZxid, boolean fsync) throws IOException {
+        if (close) {
+            return;
+        }
+        if (fsync) {
+            // take a full snapshot when snap sync with the leader
+
+            // close RocksDB for cleaning up the old snapshot,
+            // because destroyDB will fail if the RocksDB is open and locked
+            db.close();
+            // clean up the old snapshot
+            try {
+                RocksDB.destroyDB(snapDir.getAbsolutePath(), options);
+            } catch (RocksDBException e) {
+                throw new IOException("Failed to clean old data in RocksDB files: " + "error: " + e.getMessage(), e);
+            }
+            // re-open RocksDB for taking a new snapshot
+            try {
+                db = RocksDB.open(options, snapDir.getAbsolutePath());
+            } catch (RocksDBException e) {
+                throw new IOException("Failed to open RocksDB. " + "error: " + e.getMessage(), e);
+            }
+
+            LOG.info("RocksDB: Snapshotting: 0x{} to {}", Long.toHexString(lastZxid), snapDir);
+
+            updateLastProcessedZxid(lastZxid, null);
+            serializeSessions(sessions);
+            serializeACL(dt.getReferenceCountedAclCache());
+            dt.serialize(this, "tree");
+            serializeZxidDigest(dt);
+        }
+        flush();
+    }
+
+    public void flush() throws IOException {
+        long start = Time.currentElapsedTime();
+        try (final FlushOptions flushOptions = new FlushOptions().setWaitForFlush(true)) {
+            db.flush(flushOptions);
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to flush in RocksDB: " + "error: " + e.getMessage(), e);
+        }
+        long elapsed = Time.currentElapsedTime() - start;
+        ServerMetrics.getMetrics().ROCKSDB_FLUSH_TIME.add(elapsed);
+    }
+
+    public File findMostRecentSnapshot() throws IOException {
+        // In RocksDB, we always apply transactions to the current snapshot.
+        // So we only keep one single folder for the RocksDB snapshot. If
+        // this snapshot cannot be loaded because of corrupted data, we will
+        // sync with leader to get the latest data. Keeping multiple snapshots
+        // won't help here, since we still need to take snapshot syncing with
+        // the old snapshot, that's why we only keep one here.
+        return snapDir;
+    }
+
+    @SuppressWarnings("unchecked")
+    public void applyTxn(List<TransactionChangeRecord> changeList, long zxid) throws IOException {
+        // We use RocksDB WriteBatch to make atomic updates.
+        // We didn't let applying client's requests wait until flush finished because
+        // flushing 200MB of data in memtables takes nearly 1.5 seconds, flushing 500MB
+        // of data takes 4.5 seconds, and flushing 1GB of data takes more than 10 seconds.
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            // update sessions, ACL, DataTree and ZxidDigest in RocksDB
+            for (int i = 0; i < changeList.size(); i++) {
+                TransactionChangeRecord change = changeList.get(i);
+                switch (change.getType()) {
+                    case TransactionChangeRecord.DATANODE:
+                        String path = (String) change.getKey();
+                        DataNode node = (DataNode) change.getValue();
+                        String operation = change.getOperation();
+                        if (operation.equals(TransactionChangeRecord.ADD)
+                            || operation.equals(TransactionChangeRecord.UPDATE)) {
+                            addNode(path, node, writeBatch);
+                        } else {
+                            removeNode(path, writeBatch);
+                        }
+                        break;
+                    case TransactionChangeRecord.ACL:
+                        Long index = (Long) change.getKey();
+                        List<ACL> aclList = (List<ACL>) change.getValue();
+                        if (change.getOperation().equals(TransactionChangeRecord.ADD)) {
+                            addACLKeyValue(index, aclList, writeBatch);
+                        } else {
+                            removeACLKeyValue(index, writeBatch);
+                        }
+                        break;
+                    case TransactionChangeRecord.SESSION:
+                        Long id = (Long) change.getKey();
+                        Integer timeout = (Integer) change.getValue();
+                        if (change.getOperation().equals(TransactionChangeRecord.ADD)) {
+                            addSessionKeyValue(id, timeout, writeBatch);
+                        } else {
+                            removeSessionKeyValue(id, writeBatch);
+                        }
+                        break;
+                    case TransactionChangeRecord.ZXIDDIGEST:
+                        ZxidDigest zxidDigest = (ZxidDigest) change.getValue();
+                        if (change.getOperation().equals(TransactionChangeRecord.UPDATE)) {
+                            updateZxidDigest(zxidDigest, writeBatch);
+                        }
+                        break;
+                    default:
+                        LOG.warn("Unknown TransactionChangeRecord type {}", change);
+                        break;
+                }
+            }
+            // update the zxid in RocksDB
+            updateLastProcessedZxid(zxid, writeBatch);
+
+            // even if RocksDB's memTable is auto flushed, we always have consistent data and zxid digest.
+            db.write(writeOpts, writeBatch);
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to apply txns to RocksDB " + "error: " + e.getMessage(), e);
+        }
+    }
+
+    private void updateLastProcessedZxid(long zxid, WriteBatch writeBatch) throws IOException {
+        try {
+            if (writeBatch != null) {
+                writeBatch.put(ZXID_KEY.getBytes(StandardCharsets.UTF_8),
+                        Long.toString(zxid).getBytes(StandardCharsets.UTF_8));
+            } else {
+                db.put(writeOpts, ZXID_KEY.getBytes(StandardCharsets.UTF_8),
+                        Long.toString(zxid).getBytes(StandardCharsets.UTF_8));
+            }
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to serialize last processed zxid in RocksDB. "
+                    + "error: " + e.getMessage(), e);
+        }
+    }
+
+    public void serializeSessions(Map<Long, Integer> sessions) throws IOException {
+        HashMap<Long, Integer> sessSnap = new HashMap<Long, Integer>(sessions);
+        for (Entry<Long, Integer> entry : sessSnap.entrySet()) {
+            addSessionKeyValue(entry.getKey(), entry.getValue(), null);
+        }
+    }
+
+    private void addSessionKeyValue(Long id, Integer timeout, WriteBatch writeBatch) throws IOException {
+        try {
+            String key = SESSION_KEY_PREFIX + id;
+            if (writeBatch != null) {
+                writeBatch.put(key.getBytes(StandardCharsets.UTF_8),
+                        timeout.toString().getBytes(StandardCharsets.UTF_8));
+            } else {
+                db.put(writeOpts, key.getBytes(StandardCharsets.UTF_8),
+                        timeout.toString().getBytes(StandardCharsets.UTF_8));
+            }
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to serialize sessions in RocksDB " + "error: " + e.getMessage(), e);
+        }
+    }
+
+    private void removeSessionKeyValue(Long id, WriteBatch writeBatch) throws IOException {
+        try {
+            String key = SESSION_KEY_PREFIX + id;
+            writeBatch.delete(key.getBytes(StandardCharsets.UTF_8));
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to delete the session in RocksDB " + "error: " + e.getMessage(), e);
+        }
+    }
+
+    public void deserializeSessions(Map<Long, Integer> sessions) throws IOException {
+        while (rocksIterator.isValid()) {
+            String key = new String(rocksIterator.key(), StandardCharsets.UTF_8);
+            if (!key.startsWith(SESSION_KEY_PREFIX)) {
+                break;
+            }
+            key = key.substring(PREFIX_ENDING_INDEX);
+            long id = Long.parseLong(key);
+            int to = Integer.parseInt(new String(rocksIterator.value(), StandardCharsets.UTF_8));
+            sessions.put(id, to);
+            rocksIterator.next();
+        }
+    }
+
+    public synchronized void serializeACL(ReferenceCountedACLCache aclCache) throws IOException {
+        Set<Map.Entry<Long, List<ACL>>> set = aclCache.getLongKeyMap().entrySet();
+        for (Map.Entry<Long, List<ACL>> val : set) {
+            addACLKeyValue(val.getKey(), val.getValue(), null);
+        }
+    }
+
+    private void addACLKeyValue(Long index, List<ACL> aclList, WriteBatch writeBatch) throws IOException {
+        String key = ACL_KEY_PREFIX + index;
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
+        boa.startVector(aclList, "acls");
+        for (ACL acl : aclList) {
+            acl.serialize(boa, "acl");
+        }
+        boa.endVector(aclList, "acls");
+        ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
+        try {
+            if (writeBatch != null) {
+                writeBatch.put(key.getBytes(StandardCharsets.UTF_8), bb.array());
+            } else {
+                db.put(writeOpts, key.getBytes(StandardCharsets.UTF_8), bb.array());
+            }
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to serialize ACL lists in RocksDB " + "error: " + e.getMessage(), e);
+        }
+    }
+
+    private void removeACLKeyValue(Long index, WriteBatch writeBatch) throws IOException {
+        try {
+            String key = ACL_KEY_PREFIX + index;
+            writeBatch.delete(key.getBytes(StandardCharsets.UTF_8));
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to delete the ACL list in RocksDB " + "error: " + e.getMessage(), e);
+        }
+    }
+
+    public synchronized void deserializeACL(ReferenceCountedACLCache aclCache) throws IOException {
+        aclCache.clear();
+        ByteArrayInputStream bais;
+        BinaryInputArchive bia;
+        while (rocksIterator.isValid()) {
+            String key = new String(rocksIterator.key(), StandardCharsets.UTF_8);
+            if (!key.startsWith(ACL_KEY_PREFIX)) {
+                break;
+            }
+            key = key.substring(PREFIX_ENDING_INDEX);
+            long val = Long.parseLong(key);
+            List<ACL> aclList = new ArrayList<ACL>();
+            bais = new ByteArrayInputStream(rocksIterator.value());
+            bia = BinaryInputArchive.getArchive(bais);
+            Index j = bia.startVector("acls");
+            if (j == null) {
+                throw new RuntimeException("Incorrent format of InputArchive when deserialize DataTree - missing acls");
+            }
+            while (!j.done()) {
+                ACL acl = new ACL();
+                acl.deserialize(bia, "acl");
+                aclList.add(acl);
+                j.incr();
+            }
+            aclCache.updateMaps(val, aclList);
+            rocksIterator.next();
+        }
+    }
+
+    public void writeNode(String pathString, DataNode node) throws IOException {
+        addNode(pathString, node, null);
+    }
+
+    private void addNode(String pathString, DataNode node, WriteBatch writeBatch) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
+        boa.writeRecord(node, "node");
+        ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
+        try {
+            pathString = DATATREE_KEY_PREFIX + pathString;
+            if (writeBatch != null) {
+                writeBatch.put(pathString.getBytes(StandardCharsets.UTF_8), bb.array());
+            } else {
+                db.put(writeOpts, pathString.getBytes(StandardCharsets.UTF_8), bb.array());
+            }
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to serialize data node in RocksDB " + "error: " + e.getMessage(), e);
+        }
+    }
+
+    private void removeNode(String pathString, WriteBatch writeBatch) throws IOException {
+        try {
+            pathString = DATATREE_KEY_PREFIX + pathString;
+            writeBatch.delete(pathString.getBytes(StandardCharsets.UTF_8));
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to delete the data node in RocksDB " + "error: " + e.getMessage(), e);
+        }
+    }
+
+    public void markEnd() throws IOException {
+        // nothing needs to be done here when taking a snapshot in RocksDB
+    }
+
+    public String readNode(DataNode node) throws IOException {
+        if (!rocksIterator.isValid()) {
+            // finished iterating over all data nodes in RocksDB snapshot
+            return "/";
+        }
+        String path = new String(rocksIterator.key(), StandardCharsets.UTF_8);
+        if (!path.startsWith(DATATREE_KEY_PREFIX)) {
+            // finished iterating over all data nodes in RocksDB snapshot
+            return "/";
+        }
+        path = path.substring(PREFIX_ENDING_INDEX);
+        ByteArrayInputStream bais = new ByteArrayInputStream(rocksIterator.value());
+        BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
+        bia.readRecord(node, "node");
+        rocksIterator.next();
+        return path;
+    }
+
+    public boolean serializeZxidDigest(DataTree dt) throws IOException {
+        if (dt.nodesDigestEnabled()) {
+            ZxidDigest zxidDigest = dt.getLastProcessedZxidDigest();
+            if (zxidDigest == null) {
+                zxidDigest = dt.getBlankDigest();
+            }
+            updateZxidDigest(zxidDigest, null);

Review comment:
       *THREAD_SAFETY_VIOLATION:*  Read/Write race. Non-private method `RocksDBSnap.serializeZxidDigest(...)` indirectly reads without synchronization from `this.db`. Potentially races with write in method `RocksDBSnap.serialize(...)`.
    Reporting because another access to the same memory occurs on a background thread, although this access may not.

##########
File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/RocksDBSnap.java
##########
@@ -0,0 +1,547 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.persistence;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.Index;
+import org.apache.zookeeper.common.Time;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.server.DataNode;
+import org.apache.zookeeper.server.DataTree;
+import org.apache.zookeeper.server.DataTree.ZxidDigest;
+import org.apache.zookeeper.server.ReferenceCountedACLCache;
+import org.apache.zookeeper.server.ServerMetrics;
+import org.apache.zookeeper.server.TransactionChangeRecord;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements the snapshot interface.
+ * It is responsible for storing, serializing
+ * and deserializing the right snapshot in RocksDB,
+ * and provides access to the snapshots.
+ */
+public class RocksDBSnap implements SnapShot {
+    File snapDir;
+    RocksDB db;
+    Options options;
+    WriteOptions writeOpts;
+    RocksIterator rocksIterator;
+
+    private volatile boolean close = false;
+
+    private static final boolean SYNC_WRITE = false;
+    private static final boolean DISABLE_WAL = true;
+
+    //VisibleForTesting
+    public static final String ROCKSDB_WRITE_BUFFER_SIZE = "zookeeper.rocksdbWriteBufferSize";
+
+    private static final int PREFIX_STARTING_INDEX = 0;
+    private static final int PREFIX_ENDING_INDEX = 3;
+
+    private static final String SESSION_KEY_PREFIX = "S::";
+    private static final String DATATREE_KEY_PREFIX = "T::";
+    private static final String ACL_KEY_PREFIX = "A::";
+
+    private static final String ZXID_KEY = "Zxid";
+    private static final String ZXIDDIGEST_KEY = "ZxidDigest";
+
+    private static final long DEFAULT_ROCKSDB_WRITE_BUFFER_SIZE = 4096 * 1024 * 1024;
+
+    private static final Logger LOG = LoggerFactory.getLogger(RocksDBSnap.class);
+
+    /**
+     * The constructor which takes the snapDir. This class is instantiated
+     * via SnapshotFactory
+     *
+     * @param snapDir the snapshot directory
+     */
+    public RocksDBSnap(File snapDir) throws IOException {
+        RocksDB.loadLibrary();
+        if (snapDir == null) {
+            throw new IllegalArgumentException("Snap Directory can't be null!");
+        }
+
+        this.snapDir = snapDir;
+
+        long rocksdbWriteBufferSize = Long.getLong(
+                ROCKSDB_WRITE_BUFFER_SIZE, DEFAULT_ROCKSDB_WRITE_BUFFER_SIZE);
+        this.options = new Options()
+                .setCreateIfMissing(true)
+                .setCreateMissingColumnFamilies(true)
+                .setDbWriteBufferSize(rocksdbWriteBufferSize);
+
+        try {
+            this.db = RocksDB.open(options, snapDir.getAbsolutePath());
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to open RocksDB. " + "error: " + e.getMessage(), e);
+        }
+        // setting Sync = true and DisableWAL = true will lead to writes failing
+        // and throwing an exception. So we set Sync = false here and let RocksDB
+        // flush after serialization.
+        this.writeOpts = new WriteOptions().setSync(SYNC_WRITE).setDisableWAL(DISABLE_WAL);
+    }
+
+    // VisibleForTesting
+    public void initializeIterator() {
+        rocksIterator = db.newIterator();
+        rocksIterator.seekToFirst();
+    }
+
+    // VisibleForTesting
+    public void closeIterator() {
+        rocksIterator.close();
+    }
+
+    public long deserialize(DataTree dt, Map<Long, Integer> sessions)
+            throws IOException {
+        File[] files = snapDir.listFiles();
+        if (files == null || files.length == 0) {
+            LOG.info("No snapshot found in {}", snapDir.getName());
+            return -1L;
+        }
+        long lastProcessedZxid;
+        long start = Time.currentElapsedTime();
+        try {
+            byte[] zxidBytes = db.get(ZXID_KEY.getBytes(StandardCharsets.UTF_8));
+            if (zxidBytes == null) {
+                // We didn't find zxid infomation in RocksDB, which means
+                // there is no RocksDB snapshot in the snapDir.
+                LOG.info("No snapshot found in {}", snapDir.getName());
+                return -1L;
+            }
+            lastProcessedZxid = Long.parseLong(new String(zxidBytes, StandardCharsets.UTF_8));
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to deserialize last processed zxid in RocksDB. " + "error: " + e.getMessage(), e);
+        }
+        LOG.info("RocksDB: Reading snapshot 0x{} from {}", Long.toHexString(lastProcessedZxid), snapDir);
+        dt.lastProcessedZxid = lastProcessedZxid;
+
+        rocksIterator = db.newIterator();
+        rocksIterator.seekToFirst();
+        ByteArrayInputStream bais;
+        BinaryInputArchive bia;
+        while (rocksIterator.isValid()) {
+            String key = new String(rocksIterator.key(), StandardCharsets.UTF_8);
+            String prefix = key.substring(PREFIX_STARTING_INDEX, PREFIX_ENDING_INDEX);
+            switch (prefix) {
+                case SESSION_KEY_PREFIX:
+                    deserializeSessions(sessions);
+                    break;
+                case ACL_KEY_PREFIX:
+                    deserializeACL(dt.getReferenceCountedAclCache());
+                    break;
+                case DATATREE_KEY_PREFIX:
+                    dt.deserialize(this, "tree");
+                    break;
+                default:
+                    // last processed zxid or zxid digest
+                    rocksIterator.next();
+                    break;
+            }
+        }
+        rocksIterator.close();
+
+        deserializeZxidDigest(dt);
+        if (dt.getDigestFromLoadedSnapshot() != null) {
+            dt.compareSnapshotDigests(lastProcessedZxid);
+        }
+        long elapsed = Time.currentElapsedTime() - start;
+        LOG.info("RocksDBSnap deserialization takes " + elapsed + " ms");
+        ServerMetrics.getMetrics().ROCKSDB_SNAPSHOT_DESERIALIZATION_TIME.add(elapsed);
+        return lastProcessedZxid;
+    }
+
+    public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions,
+                                       long lastZxid, boolean fsync) throws IOException {
+        if (close) {
+            return;
+        }
+        if (fsync) {
+            // take a full snapshot when snap sync with the leader
+
+            // close RocksDB for cleaning up the old snapshot,
+            // because destroyDB will fail if the RocksDB is open and locked
+            db.close();
+            // clean up the old snapshot
+            try {
+                RocksDB.destroyDB(snapDir.getAbsolutePath(), options);
+            } catch (RocksDBException e) {
+                throw new IOException("Failed to clean old data in RocksDB files: " + "error: " + e.getMessage(), e);
+            }
+            // re-open RocksDB for taking a new snapshot
+            try {
+                db = RocksDB.open(options, snapDir.getAbsolutePath());
+            } catch (RocksDBException e) {
+                throw new IOException("Failed to open RocksDB. " + "error: " + e.getMessage(), e);
+            }
+
+            LOG.info("RocksDB: Snapshotting: 0x{} to {}", Long.toHexString(lastZxid), snapDir);
+
+            updateLastProcessedZxid(lastZxid, null);
+            serializeSessions(sessions);
+            serializeACL(dt.getReferenceCountedAclCache());
+            dt.serialize(this, "tree");
+            serializeZxidDigest(dt);
+        }
+        flush();
+    }
+
+    public void flush() throws IOException {
+        long start = Time.currentElapsedTime();
+        try (final FlushOptions flushOptions = new FlushOptions().setWaitForFlush(true)) {
+            db.flush(flushOptions);
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to flush in RocksDB: " + "error: " + e.getMessage(), e);
+        }
+        long elapsed = Time.currentElapsedTime() - start;
+        ServerMetrics.getMetrics().ROCKSDB_FLUSH_TIME.add(elapsed);
+    }
+
+    public File findMostRecentSnapshot() throws IOException {
+        // In RocksDB, we always apply transactions to the current snapshot.
+        // So we only keep one single folder for the RocksDB snapshot. If
+        // this snapshot cannot be loaded because of corrupted data, we will
+        // sync with leader to get the latest data. Keeping multiple snapshots
+        // won't help here, since we still need to take snapshot syncing with
+        // the old snapshot, that's why we only keep one here.
+        return snapDir;
+    }
+
+    @SuppressWarnings("unchecked")
+    public void applyTxn(List<TransactionChangeRecord> changeList, long zxid) throws IOException {
+        // We use RocksDB WriteBatch to make atomic updates.
+        // We didn't let applying client's requests wait until flush finished because
+        // flushing 200MB of data in memtables takes nearly 1.5 seconds, flushing 500MB
+        // of data takes 4.5 seconds, and flushing 1GB of data takes more than 10 seconds.
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            // update sessions, ACL, DataTree and ZxidDigest in RocksDB
+            for (int i = 0; i < changeList.size(); i++) {
+                TransactionChangeRecord change = changeList.get(i);
+                switch (change.getType()) {
+                    case TransactionChangeRecord.DATANODE:
+                        String path = (String) change.getKey();
+                        DataNode node = (DataNode) change.getValue();
+                        String operation = change.getOperation();
+                        if (operation.equals(TransactionChangeRecord.ADD)
+                            || operation.equals(TransactionChangeRecord.UPDATE)) {
+                            addNode(path, node, writeBatch);
+                        } else {
+                            removeNode(path, writeBatch);
+                        }
+                        break;
+                    case TransactionChangeRecord.ACL:
+                        Long index = (Long) change.getKey();
+                        List<ACL> aclList = (List<ACL>) change.getValue();
+                        if (change.getOperation().equals(TransactionChangeRecord.ADD)) {
+                            addACLKeyValue(index, aclList, writeBatch);
+                        } else {
+                            removeACLKeyValue(index, writeBatch);
+                        }
+                        break;
+                    case TransactionChangeRecord.SESSION:
+                        Long id = (Long) change.getKey();
+                        Integer timeout = (Integer) change.getValue();
+                        if (change.getOperation().equals(TransactionChangeRecord.ADD)) {
+                            addSessionKeyValue(id, timeout, writeBatch);
+                        } else {
+                            removeSessionKeyValue(id, writeBatch);
+                        }
+                        break;
+                    case TransactionChangeRecord.ZXIDDIGEST:
+                        ZxidDigest zxidDigest = (ZxidDigest) change.getValue();
+                        if (change.getOperation().equals(TransactionChangeRecord.UPDATE)) {
+                            updateZxidDigest(zxidDigest, writeBatch);
+                        }
+                        break;
+                    default:
+                        LOG.warn("Unknown TransactionChangeRecord type {}", change);
+                        break;
+                }
+            }
+            // update the zxid in RocksDB
+            updateLastProcessedZxid(zxid, writeBatch);
+
+            // even if RocksDB's memTable is auto flushed, we always have consistent data and zxid digest.
+            db.write(writeOpts, writeBatch);
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to apply txns to RocksDB " + "error: " + e.getMessage(), e);
+        }
+    }
+
+    private void updateLastProcessedZxid(long zxid, WriteBatch writeBatch) throws IOException {
+        try {
+            if (writeBatch != null) {
+                writeBatch.put(ZXID_KEY.getBytes(StandardCharsets.UTF_8),
+                        Long.toString(zxid).getBytes(StandardCharsets.UTF_8));
+            } else {
+                db.put(writeOpts, ZXID_KEY.getBytes(StandardCharsets.UTF_8),
+                        Long.toString(zxid).getBytes(StandardCharsets.UTF_8));
+            }
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to serialize last processed zxid in RocksDB. "
+                    + "error: " + e.getMessage(), e);
+        }
+    }
+
+    public void serializeSessions(Map<Long, Integer> sessions) throws IOException {
+        HashMap<Long, Integer> sessSnap = new HashMap<Long, Integer>(sessions);
+        for (Entry<Long, Integer> entry : sessSnap.entrySet()) {
+            addSessionKeyValue(entry.getKey(), entry.getValue(), null);
+        }
+    }
+
+    private void addSessionKeyValue(Long id, Integer timeout, WriteBatch writeBatch) throws IOException {
+        try {
+            String key = SESSION_KEY_PREFIX + id;
+            if (writeBatch != null) {
+                writeBatch.put(key.getBytes(StandardCharsets.UTF_8),
+                        timeout.toString().getBytes(StandardCharsets.UTF_8));
+            } else {
+                db.put(writeOpts, key.getBytes(StandardCharsets.UTF_8),
+                        timeout.toString().getBytes(StandardCharsets.UTF_8));
+            }
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to serialize sessions in RocksDB " + "error: " + e.getMessage(), e);
+        }
+    }
+
+    private void removeSessionKeyValue(Long id, WriteBatch writeBatch) throws IOException {
+        try {
+            String key = SESSION_KEY_PREFIX + id;
+            writeBatch.delete(key.getBytes(StandardCharsets.UTF_8));
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to delete the session in RocksDB " + "error: " + e.getMessage(), e);
+        }
+    }
+
+    public void deserializeSessions(Map<Long, Integer> sessions) throws IOException {
+        while (rocksIterator.isValid()) {
+            String key = new String(rocksIterator.key(), StandardCharsets.UTF_8);
+            if (!key.startsWith(SESSION_KEY_PREFIX)) {
+                break;
+            }
+            key = key.substring(PREFIX_ENDING_INDEX);
+            long id = Long.parseLong(key);
+            int to = Integer.parseInt(new String(rocksIterator.value(), StandardCharsets.UTF_8));
+            sessions.put(id, to);
+            rocksIterator.next();
+        }
+    }
+
+    public synchronized void serializeACL(ReferenceCountedACLCache aclCache) throws IOException {
+        Set<Map.Entry<Long, List<ACL>>> set = aclCache.getLongKeyMap().entrySet();
+        for (Map.Entry<Long, List<ACL>> val : set) {
+            addACLKeyValue(val.getKey(), val.getValue(), null);
+        }
+    }
+
+    private void addACLKeyValue(Long index, List<ACL> aclList, WriteBatch writeBatch) throws IOException {
+        String key = ACL_KEY_PREFIX + index;
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
+        boa.startVector(aclList, "acls");
+        for (ACL acl : aclList) {
+            acl.serialize(boa, "acl");
+        }
+        boa.endVector(aclList, "acls");
+        ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
+        try {
+            if (writeBatch != null) {
+                writeBatch.put(key.getBytes(StandardCharsets.UTF_8), bb.array());
+            } else {
+                db.put(writeOpts, key.getBytes(StandardCharsets.UTF_8), bb.array());
+            }
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to serialize ACL lists in RocksDB " + "error: " + e.getMessage(), e);
+        }
+    }
+
+    private void removeACLKeyValue(Long index, WriteBatch writeBatch) throws IOException {
+        try {
+            String key = ACL_KEY_PREFIX + index;
+            writeBatch.delete(key.getBytes(StandardCharsets.UTF_8));
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to delete the ACL list in RocksDB " + "error: " + e.getMessage(), e);
+        }
+    }
+
+    public synchronized void deserializeACL(ReferenceCountedACLCache aclCache) throws IOException {
+        aclCache.clear();
+        ByteArrayInputStream bais;
+        BinaryInputArchive bia;
+        while (rocksIterator.isValid()) {
+            String key = new String(rocksIterator.key(), StandardCharsets.UTF_8);
+            if (!key.startsWith(ACL_KEY_PREFIX)) {
+                break;
+            }
+            key = key.substring(PREFIX_ENDING_INDEX);
+            long val = Long.parseLong(key);
+            List<ACL> aclList = new ArrayList<ACL>();
+            bais = new ByteArrayInputStream(rocksIterator.value());
+            bia = BinaryInputArchive.getArchive(bais);
+            Index j = bia.startVector("acls");
+            if (j == null) {
+                throw new RuntimeException("Incorrent format of InputArchive when deserialize DataTree - missing acls");
+            }
+            while (!j.done()) {
+                ACL acl = new ACL();
+                acl.deserialize(bia, "acl");
+                aclList.add(acl);
+                j.incr();
+            }
+            aclCache.updateMaps(val, aclList);
+            rocksIterator.next();
+        }
+    }
+
+    public void writeNode(String pathString, DataNode node) throws IOException {
+        addNode(pathString, node, null);

Review comment:
       *THREAD_SAFETY_VIOLATION:*  Read/Write race. Non-private method `RocksDBSnap.writeNode(...)` indirectly reads without synchronization from `this.db`. Potentially races with write in method `RocksDBSnap.serialize(...)`.
    Reporting because another access to the same memory occurs on a background thread, although this access may not.

##########
File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java
##########
@@ -127,12 +148,44 @@ public long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOExcep
      * @throws IOException
      */
     public void deserialize(DataTree dt, Map<Long, Integer> sessions, InputArchive ia) throws IOException {
+        this.ia = ia;
         FileHeader header = new FileHeader();
         header.deserialize(ia, "fileheader");
         if (header.getMagic() != SNAP_MAGIC) {
             throw new IOException("mismatching magic headers " + header.getMagic() + " !=  " + FileSnap.SNAP_MAGIC);
         }
-        SerializeUtils.deserializeSnapshot(dt, ia, sessions);
+        deserializeSessions(sessions);
+        deserializeACL(dt.getReferenceCountedAclCache());
+        dt.deserialize(this, "tree");
+    }
+
+    public void serialize(DataTree dt, Map<Long, Integer> sessions, long lastZxid, boolean fsync)
+            throws IOException {
+        File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));

Review comment:
       *PATH_TRAVERSAL_IN:*  This API (java/io/File.<init>(Ljava/io/File;Ljava/lang/String;)V) reads a file whose location might be specified by user input [(details)](https://find-sec-bugs.github.io/bugs.htm#PATH_TRAVERSAL_IN)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org