You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by an...@apache.org on 2020/01/30 08:43:16 UTC

[zookeeper] branch master updated: ZOOKEEPER-3231: Purge task may lost data when the recent snapshots are all invalid

This is an automated email from the ASF dual-hosted git repository.

andor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 2abdfbc  ZOOKEEPER-3231: Purge task may lost data when the recent snapshots are all invalid
2abdfbc is described below

commit 2abdfbc261d4ba2f9d17b51f2b46ab748a7d451f
Author: maoling <ma...@sina.com>
AuthorDate: Thu Jan 30 09:43:09 2020 +0100

    ZOOKEEPER-3231: Purge task may lost data when the recent snapshots are all invalid
    
    - Purge task uses `FileTxnSnapLog#findNRecentSnapshot`, which's likely to lost data when the recent 3 snapshots are all invalid(a new valid snapshot has not generated yet) and at the same time, Purge task(`e.g ./zkCleanup.sh -n 3`) has started a new round work to clean up the preceding snapshots. we will lose all the data.that's a small probability events, but it's reproducible.
    - Overall, using `snaplog.findNValidSnapshots` to make sure the purge task tries to retain N valid snapshots(rather than N snapshots) to avoid a risk of data loss.
    - For the unit test, it's not easy to use the `mock` way for the following reasons:
       - when we want to test the `dataDir` which some Snapshots are valid, others not.Just writing a little data contents to the snapshot to make it valid/invalid has a better flexibility.
       - too much code changes in the `PurgeTxnTest.java`(pass the original UT) and `FileTxnSnapLog.java`(have some handles)
    - more details in the [ZOOKEEPER-3231](https://issues.apache.org/jira/browse/ZOOKEEPER-3231)
    
    Author: maoling <ma...@sina.com>
    
    Reviewers: enixon@apache.org, andor@apache.org
    
    Closes #1079 from maoling/ZOOKEEPER-3231 and squashes the following commits:
    
    674175bef [maoling] setUp() & tearDown().
    472dfd33c [maoling] ZOOKEEPER-3231:Purge task may lost data when the recent snapshots are all invalid
---
 .../org/apache/zookeeper/server/PurgeTxnLog.java   |   2 +-
 .../zookeeper/server/persistence/FileSnap.java     |   2 +-
 .../server/persistence/FileTxnSnapLog.java         |  12 +++
 .../org/apache/zookeeper/server/PurgeTxnTest.java  | 114 ++++++++++++++++-----
 4 files changed, 103 insertions(+), 27 deletions(-)

diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PurgeTxnLog.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PurgeTxnLog.java
index 0c92066..d152fab 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PurgeTxnLog.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PurgeTxnLog.java
@@ -79,7 +79,7 @@ public class PurgeTxnLog {
 
         FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir);
 
-        List<File> snaps = txnLog.findNRecentSnapshots(num);
+        List<File> snaps = txnLog.findNValidSnapshots(num);
         int numSnaps = snaps.size();
         if (numSnaps > 0) {
             purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1));
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java
index b608c21..4c09232 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java
@@ -159,7 +159,7 @@ public class FileSnap implements SnapShot {
      * less than n in case enough snapshots are not available).
      * @throws IOException
      */
-    private List<File> findNValidSnapshots(int n) throws IOException {
+    protected List<File> findNValidSnapshots(int n) throws IOException {
         List<File> files = Util.sortDataDir(snapDir.listFiles(), SNAPSHOT_FILE_PREFIX, false);
         int count = 0;
         List<File> list = new ArrayList<File>();
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
index 0ec12d8..eddeae8 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
@@ -548,6 +548,18 @@ public class FileTxnSnapLog {
     }
 
     /**
+     * the n recent valid snapshots
+     * @param n the number of recent valid snapshots
+     * @return the list of n recent valid snapshots, with
+     * the most recent in front
+     * @throws IOException
+     */
+    public List<File> findNValidSnapshots(int n) throws IOException {
+        FileSnap snaplog = new FileSnap(snapDir);
+        return snaplog.findNValidSnapshots(n);
+    }
+
+    /**
      * get the snapshot logs which may contain transactions newer than the given zxid.
      * This includes logs with starting zxid greater than given zxid, as well as the
      * newest transaction log with starting zxid less than given zxid.  The latter log
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PurgeTxnTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PurgeTxnTest.java
index 52d3360..29c1027 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PurgeTxnTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PurgeTxnTest.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import java.io.File;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -32,16 +33,23 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.zip.CheckedOutputStream;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.OutputArchive;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.PortAssignment;
 import org.apache.zookeeper.ZKTestCase;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.persistence.FileHeader;
+import org.apache.zookeeper.server.persistence.FileSnap;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.persistence.SnapStream;
 import org.apache.zookeeper.server.persistence.Util;
 import org.apache.zookeeper.test.ClientBase;
 import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,6 +62,11 @@ public class PurgeTxnTest extends ZKTestCase {
     private static final long OP_TIMEOUT_IN_MILLIS = 90000;
     private File tmpDir;
 
+    @Before
+    public void setUp() throws Exception {
+        tmpDir = ClientBase.createTmpDir();
+    }
+
     @After
     public void teardown() {
         if (null != tmpDir) {
@@ -67,7 +80,6 @@ public class PurgeTxnTest extends ZKTestCase {
      */
     @Test
     public void testPurge() throws Exception {
-        tmpDir = ClientBase.createTmpDir();
         ClientBase.setupTestEnv();
         ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
         SyncRequestProcessor.setSnapCount(100);
@@ -89,7 +101,7 @@ public class PurgeTxnTest extends ZKTestCase {
         // now corrupt the snapshot
         PurgeTxnLog.purge(tmpDir, tmpDir, 3);
         FileTxnSnapLog snaplog = new FileTxnSnapLog(tmpDir, tmpDir);
-        List<File> listLogs = snaplog.findNRecentSnapshots(4);
+        List<File> listLogs = snaplog.findNValidSnapshots(4);
         int numSnaps = 0;
         for (File ff : listLogs) {
             if (ff.getName().startsWith("snapshot")) {
@@ -111,7 +123,6 @@ public class PurgeTxnTest extends ZKTestCase {
      */
     @Test
     public void testPurgeWhenLogRollingInProgress() throws Exception {
-        tmpDir = ClientBase.createTmpDir();
         ClientBase.setupTestEnv();
         ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
         SyncRequestProcessor.setSnapCount(30);
@@ -158,21 +169,20 @@ public class PurgeTxnTest extends ZKTestCase {
     }
 
     /**
-     * Tests finding n recent snapshots from set of snapshots and data logs
+     * Tests finding n recent valid snapshots from set of snapshots and data logs
      */
     @Test
-    public void testFindNRecentSnapshots() throws Exception {
+    public void testFindNValidSnapshots() throws Exception {
         int nRecentSnap = 4; // n recent snap shots
         int nRecentCount = 30;
         int offset = 0;
 
-        tmpDir = ClientBase.createTmpDir();
         File version2 = new File(tmpDir.toString(), "version-2");
         assertTrue("Failed to create version_2 dir:" + version2.toString(), version2.mkdir());
 
-        // Test that with no snaps, findNRecentSnapshots returns empty list
+        // Test that with no snaps, findNValidSnapshots returns empty list
         FileTxnSnapLog txnLog = new FileTxnSnapLog(tmpDir, tmpDir);
-        List<File> foundSnaps = txnLog.findNRecentSnapshots(1);
+        List<File> foundSnaps = txnLog.findNValidSnapshots(1);
         assertEquals(0, foundSnaps.size());
 
         List<File> expectedNRecentSnapFiles = new ArrayList<File>();
@@ -184,6 +194,7 @@ public class PurgeTxnTest extends ZKTestCase {
             // simulate snapshot file
             File snapFile = new File(version2 + "/snapshot." + Long.toHexString(--counter));
             assertTrue("Failed to create snap File:" + snapFile.toString(), snapFile.createNewFile());
+            makeValidSnapshot(snapFile);
             // add the n recent snap files for assertion
             if (i < nRecentSnap) {
                 expectedNRecentSnapFiles.add(snapFile);
@@ -192,17 +203,17 @@ public class PurgeTxnTest extends ZKTestCase {
 
         // Test that when we ask for recent snaps we get the number we asked for and
         // the files we expected
-        List<File> nRecentSnapFiles = txnLog.findNRecentSnapshots(nRecentSnap);
-        assertEquals("exactly 4 snapshots ", 4, nRecentSnapFiles.size());
-        expectedNRecentSnapFiles.removeAll(nRecentSnapFiles);
+        List<File> nRecentValidSnapFiles = txnLog.findNValidSnapshots(nRecentSnap);
+        assertEquals("exactly 4 snapshots ", 4, nRecentValidSnapFiles.size());
+        expectedNRecentSnapFiles.removeAll(nRecentValidSnapFiles);
         assertEquals("Didn't get the recent snap files", 0, expectedNRecentSnapFiles.size());
 
         // Test that when asking for more snaps than we created, we still only get snaps
         // not logs or anything else (per ZOOKEEPER-2420)
-        nRecentSnapFiles = txnLog.findNRecentSnapshots(nRecentCount + 5);
-        assertEquals(nRecentCount, nRecentSnapFiles.size());
-        for (File f : nRecentSnapFiles) {
-            assertTrue("findNRecentSnapshots() returned a non-snapshot: "
+        nRecentValidSnapFiles = txnLog.findNValidSnapshots(nRecentCount + 5);
+        assertEquals(nRecentCount, nRecentValidSnapFiles.size());
+        for (File f : nRecentValidSnapFiles) {
+            assertTrue("findNValidSnapshots() returned a non-snapshot: "
                                       + f.getPath(), (Util.getZxidFromName(f.getName(), "snapshot") != -1));
         }
 
@@ -220,7 +231,6 @@ public class PurgeTxnTest extends ZKTestCase {
         int fileAboveRecentCount = 4;
         int fileToPurgeCount = 2;
         AtomicInteger offset = new AtomicInteger(0);
-        tmpDir = ClientBase.createTmpDir();
         File version2 = new File(tmpDir.toString(), "version-2");
         assertTrue("Failed to create version_2 dir:" + version2.toString(), version2.mkdir());
         List<File> snapsToPurge = new ArrayList<File>();
@@ -272,7 +282,6 @@ public class PurgeTxnTest extends ZKTestCase {
     public void internalTestSnapFilesEqualsToRetain(boolean testWithPrecedingLogFile) throws Exception {
         int nRecentCount = 3;
         AtomicInteger offset = new AtomicInteger(0);
-        tmpDir = ClientBase.createTmpDir();
         File version2 = new File(tmpDir.toString(), "version-2");
         assertTrue("Failed to create version_2 dir:" + version2.toString(), version2.mkdir());
         List<File> snaps = new ArrayList<File>();
@@ -295,7 +304,6 @@ public class PurgeTxnTest extends ZKTestCase {
         int nRecentCount = 4;
         int fileToPurgeCount = 2;
         AtomicInteger offset = new AtomicInteger(0);
-        tmpDir = ClientBase.createTmpDir();
         File version2 = new File(tmpDir.toString(), "version-2");
         assertTrue("Failed to create version_2 dir:" + version2.toString(), version2.mkdir());
         List<File> snapsToPurge = new ArrayList<File>();
@@ -327,7 +335,6 @@ public class PurgeTxnTest extends ZKTestCase {
      */
     @Test
     public void testPurgeTxnLogWithDataDir() throws Exception {
-        tmpDir = ClientBase.createTmpDir();
         File dataDir = new File(tmpDir, "dataDir");
         File dataLogDir = new File(tmpDir, "dataLogDir");
 
@@ -348,6 +355,7 @@ public class PurgeTxnTest extends ZKTestCase {
             // simulate snapshot file
             File snapFile = new File(dataDirVersion2, "snapshot." + Long.toHexString(i));
             snapFile.createNewFile();
+            makeValidSnapshot(snapFile);
         }
 
         int numberOfSnapFilesToKeep = 10;
@@ -358,8 +366,6 @@ public class PurgeTxnTest extends ZKTestCase {
         assertEquals(numberOfSnapFilesToKeep, dataDirVersion2.listFiles().length);
         // Since for each snapshot we have a log file with same zxid, expect same # logs as snaps to be kept
         assertEquals(numberOfSnapFilesToKeep, dataLogDirVersion2.listFiles().length);
-        ClientBase.recursiveDelete(tmpDir);
-
     }
 
     /**
@@ -368,7 +374,6 @@ public class PurgeTxnTest extends ZKTestCase {
      */
     @Test
     public void testPurgeTxnLogWithoutDataDir() throws Exception {
-        tmpDir = ClientBase.createTmpDir();
         File dataDir = new File(tmpDir, "dataDir");
         File dataLogDir = new File(tmpDir, "dataLogDir");
 
@@ -388,6 +393,7 @@ public class PurgeTxnTest extends ZKTestCase {
             // simulate snapshot file
             File snapFile = new File(dataLogDirVersion2, "snapshot." + Long.toHexString(i));
             snapFile.createNewFile();
+            makeValidSnapshot(snapFile);
         }
 
         int numberOfSnapFilesToKeep = 10;
@@ -398,8 +404,6 @@ public class PurgeTxnTest extends ZKTestCase {
                 numberOfSnapFilesToKeep
                         * 2, // Since for each snapshot we have a log file with same zxid, expect same # logs as snaps to be kept
                 dataLogDirVersion2.listFiles().length);
-        ClientBase.recursiveDelete(tmpDir);
-
     }
 
     /**
@@ -423,7 +427,6 @@ public class PurgeTxnTest extends ZKTestCase {
         SyncRequestProcessor.setSnapCount(SNAP_RETAIN_COUNT * NUM_ZNODES_PER_SNAPSHOT * 10);
 
         // Create Zookeeper and connect to it.
-        tmpDir = ClientBase.createTmpDir();
         ClientBase.setupTestEnv();
         ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
         final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
@@ -484,6 +487,45 @@ public class PurgeTxnTest extends ZKTestCase {
         zks.shutdown();
     }
 
+    @Test
+    public void testPurgeTxnLogWhenRecentSnapshotsAreAllInvalid() throws Exception {
+        File dataDir = new File(tmpDir, "dataDir");
+        File dataLogDir = new File(tmpDir, "dataLogDir");
+
+        File dataDirVersion2 = new File(dataDir, "version-2");
+        dataDirVersion2.mkdirs();
+        File dataLogDirVersion2 = new File(dataLogDir, "version-2");
+        dataLogDirVersion2.mkdirs();
+
+        // create dummy log and transaction file
+        int totalFiles = 10;
+        int numberOfSnapFilesToKeep = 3;
+
+        // create transaction and snapshot files in different-different
+        // directories
+        for (int i = 0; i < totalFiles; i++) {
+            // simulate log file
+            File logFile = new File(dataLogDirVersion2, "log." + Long.toHexString(i));
+            logFile.createNewFile();
+            // simulate snapshot file
+            File snapFile = new File(dataDirVersion2, "snapshot." + Long.toHexString(i));
+            snapFile.createNewFile();
+            if (i < (totalFiles - numberOfSnapFilesToKeep)) {
+                makeValidSnapshot(snapFile);
+            } else {
+                makeInvalidSnapshot(snapFile);
+            }
+        }
+
+        // scenario where four parameter are passed
+        String[] args = new String[]{dataLogDir.getAbsolutePath(), dataDir.getAbsolutePath(), "-n", Integer.toString(numberOfSnapFilesToKeep)};
+        PurgeTxnLog.main(args);
+        //Since the recent 3 snapshots are all invalid,when purging, we can assert that 6 snapshot files are retained(3 invalid snapshots and 3 retained valid snapshots)
+        assertEquals(numberOfSnapFilesToKeep + numberOfSnapFilesToKeep, dataDirVersion2.listFiles().length);
+        // Since for each snapshot we have a log file with same zxid, expect same # logs as snaps to be kept
+        assertEquals(numberOfSnapFilesToKeep + numberOfSnapFilesToKeep, dataLogDirVersion2.listFiles().length);
+    }
+
     private File createDataDirLogFile(File version_2, int Zxid) throws IOException {
         File logFile = new File(version_2 + "/log." + Long.toHexString(Zxid));
         assertTrue("Failed to create log File:" + logFile.toString(), logFile.createNewFile());
@@ -553,4 +595,26 @@ public class PurgeTxnTest extends ZKTestCase {
         return znodes;
     }
 
+    private void makeValidSnapshot(File snapFile) throws IOException {
+        SnapStream.setStreamMode(SnapStream.StreamMode.CHECKED);
+        CheckedOutputStream os = SnapStream.getOutputStream(snapFile);
+        OutputArchive oa = BinaryOutputArchive.getArchive(os);
+        FileHeader header = new FileHeader(FileSnap.SNAP_MAGIC, 2, 1);
+        header.serialize(oa, "fileheader");
+        SnapStream.sealStream(os, oa);
+        os.flush();
+        os.close();
+
+        assertTrue(SnapStream.isValidSnapshot(snapFile));
+    }
+
+    private void makeInvalidSnapshot(File snapFile) throws IOException {
+        SnapStream.setStreamMode(SnapStream.StreamMode.CHECKED);
+        OutputStream os = SnapStream.getOutputStream(snapFile);
+        os.write(1);
+        os.flush();
+        os.close();
+
+        assertFalse(SnapStream.isValidSnapshot(snapFile));
+    }
 }