You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ar...@apache.org on 2022/02/08 15:49:51 UTC
[zookeeper] branch branch-3.5 updated: ZOOKEEPER-4433 : Backport ZOOKEEPER-2872 for branch-3.5
This is an automated email from the ASF dual-hosted git repository.
arshad pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new 6f6757b ZOOKEEPER-4433 : Backport ZOOKEEPER-2872 for branch-3.5
6f6757b is described below
commit 6f6757b2680af60bd8dd752d81480e71cb458def
Author: Ananya Singh <pu...@gmail.com>
AuthorDate: Tue Feb 8 21:12:00 2022 +0530
ZOOKEEPER-4433 : Backport ZOOKEEPER-2872 for branch-3.5
Author: Ananya Singh <pu...@gmail.com>
Reviewers: Brahma Reddy Battula <br...@apache.org>, Norbert Kalmar <nk...@apache.org>, Mohammad Arshad <ar...@apache.org>
Closes #1790 from AnanyaSingh2121/ZOOKEEPER-4433
---
.../java/org/apache/zookeeper/server/ZooKeeperServer.java | 8 ++++++--
.../org/apache/zookeeper/server/persistence/FileSnap.java | 12 ++++++++----
.../apache/zookeeper/server/persistence/FileTxnSnapLog.java | 8 +++++---
.../org/apache/zookeeper/server/persistence/SnapShot.java | 8 +++++---
.../java/org/apache/zookeeper/server/quorum/Learner.java | 8 ++++++--
.../java/org/apache/zookeeper/server/quorum/Zab1_0Test.java | 6 +++---
.../test/java/org/apache/zookeeper/test/TruncateTest.java | 2 +-
7 files changed, 34 insertions(+), 18 deletions(-)
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 67737c5..2bab937 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -338,9 +338,13 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
takeSnapshot();
}
- public void takeSnapshot(){
+ public void takeSnapshot() {
+ takeSnapshot(false);
+ }
+
+ public void takeSnapshot(boolean syncSnap){
try {
- txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts());
+ txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap);
} catch (IOException e) {
LOG.error("Severe unrecoverable error, exiting", e);
// This is a severe error that we cannot recover from,
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java
index 6bce62d..9d9a3e2 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java
@@ -37,6 +37,7 @@ import org.apache.jute.BinaryInputArchive;
import org.apache.jute.BinaryOutputArchive;
import org.apache.jute.InputArchive;
import org.apache.jute.OutputArchive;
+import org.apache.zookeeper.common.AtomicFileOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.server.DataTree;
@@ -215,12 +216,15 @@ public class FileSnap implements SnapShot {
* @param dt the datatree to be serialized
* @param sessions the sessions to be serialized
* @param snapShot the file to store snapshot into
+ * @param fsync sync the file immediately after write
*/
- public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot)
+ public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot, boolean fsync)
throws IOException {
if (!close) {
- try (OutputStream sessOS = new BufferedOutputStream(new FileOutputStream(snapShot));
- CheckedOutputStream crcOut = new CheckedOutputStream(sessOS, new Adler32())) {
+ try (CheckedOutputStream crcOut =
+ new CheckedOutputStream(new BufferedOutputStream(fsync ? new AtomicFileOutputStream(snapShot) :
+ new FileOutputStream(snapShot)),
+ new Adler32())) {
//CheckedOutputStream cout = new CheckedOutputStream()
OutputArchive oa = BinaryOutputArchive.getArchive(crcOut);
FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId);
@@ -228,7 +232,7 @@ public class FileSnap implements SnapShot {
long val = crcOut.getChecksum().getValue();
oa.writeLong(val, "val");
oa.writeString("/", "path");
- sessOS.flush();
+ crcOut.flush();
}
} else {
throw new IOException("FileSnap has already been closed");
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
index 8508413..4881e28 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
@@ -245,7 +245,7 @@ public class FileTxnSnapLog {
}
/* TODO: (br33d) we should either put a ConcurrentHashMap on restore()
* or use Map on save() */
- save(dt, (ConcurrentHashMap<Long, Integer>)sessions);
+ save(dt, (ConcurrentHashMap<Long, Integer>)sessions, false);
/* return a zxid of zero, since we the database is empty */
return 0;
}
@@ -394,16 +394,18 @@ public class FileTxnSnapLog {
* @param dataTree the datatree to be serialized onto disk
* @param sessionsWithTimeouts the session timeouts to be
* serialized onto disk
+ * @param syncSnap sync the snapshot immediately after write
* @throws IOException
*/
public void save(DataTree dataTree,
- ConcurrentHashMap<Long, Integer> sessionsWithTimeouts)
+ ConcurrentHashMap<Long, Integer> sessionsWithTimeouts,
+ boolean syncSnap)
throws IOException {
long lastZxid = dataTree.lastProcessedZxid;
File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));
LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid),
snapshotFile);
- snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile);
+ snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile, syncSnap);
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapShot.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapShot.java
index c964afc..257c12d 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapShot.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapShot.java
@@ -44,11 +44,13 @@ public interface SnapShot {
/**
* persist the datatree and the sessions into a persistence storage
* @param dt the datatree to be serialized
- * @param sessions
+ * @param sessions the session timeouts to be serialized
+ * @param name the object name to store snapshot into
+ * @param fsync sync the snapshot immediately after write
* @throws IOException
*/
- void serialize(DataTree dt, Map<Long, Integer> sessions,
- File name)
+ void serialize(DataTree dt, Map<Long, Integer> sessions,
+ File name, boolean fsync)
throws IOException;
/**
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
index 3f282ca..51b1038 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
@@ -383,6 +383,7 @@ public class Learner {
// In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot
// For SNAP and TRUNC the snapshot is needed to save that history
boolean snapshotNeeded = true;
+ boolean syncSnapshot = false;
readPacket(qp);
LinkedList<Long> packetsCommitted = new LinkedList<Long>();
LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>();
@@ -409,6 +410,9 @@ public class Learner {
throw new IOException("Missing signature");
}
zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
+
+ // immediately persist the latest snapshot when there is txn log gap
+ syncSnapshot = true;
} else if (qp.getType() == Leader.TRUNC) {
//we need to truncate the log to the lastzxid of the leader
LOG.warn("Truncating log to get in sync with the leader 0x"
@@ -535,7 +539,7 @@ public class Learner {
}
}
if (isPreZAB1_0) {
- zk.takeSnapshot();
+ zk.takeSnapshot(syncSnapshot);
self.setCurrentEpoch(newEpoch);
}
self.setZooKeeperServer(zk);
@@ -555,7 +559,7 @@ public class Learner {
}
if (snapshotNeeded) {
- zk.takeSnapshot();
+ zk.takeSnapshot(syncSnapshot);
}
self.setCurrentEpoch(newEpoch);
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java
index 754e40b..4c24ba8 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java
@@ -366,7 +366,7 @@ public class Zab1_0Test extends ZKTestCase {
Assert.assertTrue(zxid > ZxidUtils.makeZxid(1, 0));
// Generate snapshot and close files.
- snapLog.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts());
+ snapLog.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), false);
snapLog.close();
QuorumPeer peer = createQuorumPeer(tmpDir);
@@ -649,7 +649,7 @@ public class Zab1_0Test extends ZKTestCase {
Assert.assertEquals(1, f.self.getAcceptedEpoch());
Assert.assertEquals(1, f.self.getCurrentEpoch());
//Make sure that we did take the snapshot now
- verify(f.zk).takeSnapshot();
+ verify(f.zk).takeSnapshot(true);
Assert.assertEquals(firstZxid, f.fzk.getLastProcessedZxid());
// Make sure the data was recorded in the filesystem ok
@@ -1246,7 +1246,7 @@ public class Zab1_0Test extends ZKTestCase {
FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir);
File version2 = new File(tmpDir, "version-2");
version2.mkdir();
- logFactory.save(new DataTree(), new ConcurrentHashMap<Long, Integer>());
+ logFactory.save(new DataTree(), new ConcurrentHashMap<Long, Integer>(), false);
long zxid = ZxidUtils.makeZxid(3, 3);
logFactory.append(new Request(1, 1, ZooDefs.OpCode.error,
new TxnHeader(1, 1, zxid, 1, ZooDefs.OpCode.error),
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/TruncateTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/TruncateTest.java
index 6be1d36..211a293 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/TruncateTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/TruncateTest.java
@@ -77,7 +77,7 @@ public class TruncateTest extends ZKTestCase {
ZKDatabase zkdb = new ZKDatabase(snaplog);
// make sure to snapshot, so that we have something there when
// truncateLog reloads the db
- snaplog.save(zkdb.getDataTree(), zkdb.getSessionWithTimeOuts());
+ snaplog.save(zkdb.getDataTree(), zkdb.getSessionWithTimeOuts(), false);
for (int i = 1; i <= 100; i++) {
append(zkdb, i);