You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ar...@apache.org on 2022/02/08 15:49:51 UTC

[zookeeper] branch branch-3.5 updated: ZOOKEEPER-4433 : Backport ZOOKEEPER-2872 for branch-3.5

This is an automated email from the ASF dual-hosted git repository.

arshad pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 6f6757b  ZOOKEEPER-4433 : Backport ZOOKEEPER-2872 for branch-3.5
6f6757b is described below

commit 6f6757b2680af60bd8dd752d81480e71cb458def
Author: Ananya Singh <pu...@gmail.com>
AuthorDate: Tue Feb 8 21:12:00 2022 +0530

    ZOOKEEPER-4433 : Backport ZOOKEEPER-2872 for branch-3.5
    
    Author: Ananya Singh <pu...@gmail.com>
    
    Reviewers: Brahma Reddy Battula <br...@apache.org>, Norbert Kalmar <nk...@apache.org>, Mohammad Arshad <ar...@apache.org>
    
    Closes #1790 from AnanyaSingh2121/ZOOKEEPER-4433
---
 .../java/org/apache/zookeeper/server/ZooKeeperServer.java    |  8 ++++++--
 .../org/apache/zookeeper/server/persistence/FileSnap.java    | 12 ++++++++----
 .../apache/zookeeper/server/persistence/FileTxnSnapLog.java  |  8 +++++---
 .../org/apache/zookeeper/server/persistence/SnapShot.java    |  8 +++++---
 .../java/org/apache/zookeeper/server/quorum/Learner.java     |  8 ++++++--
 .../java/org/apache/zookeeper/server/quorum/Zab1_0Test.java  |  6 +++---
 .../test/java/org/apache/zookeeper/test/TruncateTest.java    |  2 +-
 7 files changed, 34 insertions(+), 18 deletions(-)

diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 67737c5..2bab937 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -338,9 +338,13 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         takeSnapshot();
     }
 
-    public void takeSnapshot(){
+    public void takeSnapshot() {
+        takeSnapshot(false);
+    }
+
+    public void takeSnapshot(boolean syncSnap){
         try {
-            txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts());
+            txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap);
         } catch (IOException e) {
             LOG.error("Severe unrecoverable error, exiting", e);
             // This is a severe error that we cannot recover from,
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java
index 6bce62d..9d9a3e2 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java
@@ -37,6 +37,7 @@ import org.apache.jute.BinaryInputArchive;
 import org.apache.jute.BinaryOutputArchive;
 import org.apache.jute.InputArchive;
 import org.apache.jute.OutputArchive;
+import org.apache.zookeeper.common.AtomicFileOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.server.DataTree;
@@ -215,12 +216,15 @@ public class FileSnap implements SnapShot {
      * @param dt the datatree to be serialized
      * @param sessions the sessions to be serialized
      * @param snapShot the file to store snapshot into
+     * @param fsync sync the file immediately after write
      */
-    public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot)
+    public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot, boolean fsync)
             throws IOException {
         if (!close) {
-            try (OutputStream sessOS = new BufferedOutputStream(new FileOutputStream(snapShot));
-                 CheckedOutputStream crcOut = new CheckedOutputStream(sessOS, new Adler32())) {
+            try (CheckedOutputStream crcOut =
+                         new CheckedOutputStream(new BufferedOutputStream(fsync ? new AtomicFileOutputStream(snapShot) :
+                                                                                  new FileOutputStream(snapShot)),
+                                                 new Adler32())) {
                 //CheckedOutputStream cout = new CheckedOutputStream()
                 OutputArchive oa = BinaryOutputArchive.getArchive(crcOut);
                 FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId);
@@ -228,7 +232,7 @@ public class FileSnap implements SnapShot {
                 long val = crcOut.getChecksum().getValue();
                 oa.writeLong(val, "val");
                 oa.writeString("/", "path");
-                sessOS.flush();
+                crcOut.flush();
             }
         } else {
             throw new IOException("FileSnap has already been closed");
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
index 8508413..4881e28 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
@@ -245,7 +245,7 @@ public class FileTxnSnapLog {
             }
             /* TODO: (br33d) we should either put a ConcurrentHashMap on restore()
              *       or use Map on save() */
-            save(dt, (ConcurrentHashMap<Long, Integer>)sessions);
+            save(dt, (ConcurrentHashMap<Long, Integer>)sessions, false);
             /* return a zxid of zero, since we the database is empty */
             return 0;
         }
@@ -394,16 +394,18 @@ public class FileTxnSnapLog {
      * @param dataTree the datatree to be serialized onto disk
      * @param sessionsWithTimeouts the session timeouts to be
      * serialized onto disk
+     * @param syncSnap sync the snapshot immediately after write
      * @throws IOException
      */
     public void save(DataTree dataTree,
-            ConcurrentHashMap<Long, Integer> sessionsWithTimeouts)
+                     ConcurrentHashMap<Long, Integer> sessionsWithTimeouts,
+                     boolean syncSnap)
         throws IOException {
         long lastZxid = dataTree.lastProcessedZxid;
         File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));
         LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid),
                 snapshotFile);
-        snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile);
+        snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile, syncSnap);
 
     }
 
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapShot.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapShot.java
index c964afc..257c12d 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapShot.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapShot.java
@@ -44,11 +44,13 @@ public interface SnapShot {
     /**
      * persist the datatree and the sessions into a persistence storage
      * @param dt the datatree to be serialized
-     * @param sessions 
+     * @param sessions the session timeouts to be serialized
+     * @param name the object name to store snapshot into
+     * @param fsync sync the snapshot immediately after write
      * @throws IOException
      */
-    void serialize(DataTree dt, Map<Long, Integer> sessions, 
-            File name) 
+    void serialize(DataTree dt, Map<Long, Integer> sessions,
+                   File name, boolean fsync)
         throws IOException;
     
     /**
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
index 3f282ca..51b1038 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
@@ -383,6 +383,7 @@ public class Learner {
         // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot
         // For SNAP and TRUNC the snapshot is needed to save that history
         boolean snapshotNeeded = true;
+        boolean syncSnapshot = false;
         readPacket(qp);
         LinkedList<Long> packetsCommitted = new LinkedList<Long>();
         LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>();
@@ -409,6 +410,9 @@ public class Learner {
                     throw new IOException("Missing signature");                   
                 }
                 zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
+
+                // immediately persist the latest snapshot when there is txn log gap
+                syncSnapshot = true;
             } else if (qp.getType() == Leader.TRUNC) {
                 //we need to truncate the log to the lastzxid of the leader
                 LOG.warn("Truncating log to get in sync with the leader 0x"
@@ -535,7 +539,7 @@ public class Learner {
                        }
                     }
                     if (isPreZAB1_0) {
-                        zk.takeSnapshot();
+                        zk.takeSnapshot(syncSnapshot);
                         self.setCurrentEpoch(newEpoch);
                     }
                     self.setZooKeeperServer(zk);
@@ -555,7 +559,7 @@ public class Learner {
                    }
 
                    if (snapshotNeeded) {
-                       zk.takeSnapshot();
+                       zk.takeSnapshot(syncSnapshot);
                    }
                    
                     self.setCurrentEpoch(newEpoch);
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java
index 754e40b..4c24ba8 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java
@@ -366,7 +366,7 @@ public class Zab1_0Test extends ZKTestCase {
             Assert.assertTrue(zxid > ZxidUtils.makeZxid(1, 0));
             
             // Generate snapshot and close files.
-            snapLog.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts());
+            snapLog.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), false);
             snapLog.close();
             
             QuorumPeer peer = createQuorumPeer(tmpDir);
@@ -649,7 +649,7 @@ public class Zab1_0Test extends ZKTestCase {
                     Assert.assertEquals(1, f.self.getAcceptedEpoch());
                     Assert.assertEquals(1, f.self.getCurrentEpoch());
                     //Make sure that we did take the snapshot now
-                    verify(f.zk).takeSnapshot();
+                    verify(f.zk).takeSnapshot(true);
                     Assert.assertEquals(firstZxid, f.fzk.getLastProcessedZxid());
                     
                     // Make sure the data was recorded in the filesystem ok
@@ -1246,7 +1246,7 @@ public class Zab1_0Test extends ZKTestCase {
             FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir);
             File version2 = new File(tmpDir, "version-2");
             version2.mkdir();
-            logFactory.save(new DataTree(), new ConcurrentHashMap<Long, Integer>());
+            logFactory.save(new DataTree(), new ConcurrentHashMap<Long, Integer>(), false);
             long zxid = ZxidUtils.makeZxid(3, 3);
             logFactory.append(new Request(1, 1, ZooDefs.OpCode.error,
                     new TxnHeader(1, 1, zxid, 1, ZooDefs.OpCode.error),
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/TruncateTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/TruncateTest.java
index 6be1d36..211a293 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/TruncateTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/TruncateTest.java
@@ -77,7 +77,7 @@ public class TruncateTest extends ZKTestCase {
         ZKDatabase zkdb = new ZKDatabase(snaplog);
         // make sure to snapshot, so that we have something there when
         // truncateLog reloads the db
-        snaplog.save(zkdb.getDataTree(), zkdb.getSessionWithTimeOuts());
+        snaplog.save(zkdb.getDataTree(), zkdb.getSessionWithTimeOuts(), false);
 
         for (int i = 1; i <= 100; i++) {
             append(zkdb, i);