You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by an...@apache.org on 2019/05/14 15:11:22 UTC

[zookeeper] branch master updated: ZOOKEEPER-3244: Add option to snapshot based on log size

This is an automated email from the ASF dual-hosted git repository.

andor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 31e92e0  ZOOKEEPER-3244: Add option to snapshot based on log size
31e92e0 is described below

commit 31e92e0a4f287ca840e0c9418e3a9b706e9721db
Author: Brian Nixon <ni...@fb.com>
AuthorDate: Tue May 14 17:11:14 2019 +0200

    ZOOKEEPER-3244: Add option to snapshot based on log size
    
    Author: Brian Nixon <ni...@fb.com>
    
    Reviewers: fangmin@apache.org, nkalmar@apache.org, andor@apache.org
    
    Closes #770 from enixon/snap-on-size and squashes the following commits:
    
    3911f984a [Brian Nixon] allow admin to disable snapSizeLimitInKb feature
    315404a12 [Brian Nixon] Change property name from zookeeper.snapSize to zookeeper.snapSizeLimitInKb
    f13e7d327 [Brian Nixon] add documentation for zookeeper.snapSize and extend RestoreCommittedLogTest to support it
    0d4666d2d [Brian Nixon] address sync issue with FileTxnLog::getTotalLogSize exposed by findbugs
    20e9ccc44 [Brian Nixon] ZOOKEEPER-3244: Add option to snapshot based on log size
---
 .../src/main/resources/markdown/zookeeperAdmin.md  | 29 +++++++----
 .../zookeeper/server/SyncRequestProcessor.java     | 42 +++++++++++++---
 .../org/apache/zookeeper/server/ZKDatabase.java    | 30 ++++++++++++
 .../apache/zookeeper/server/ZooKeeperServer.java   |  8 ++++
 .../zookeeper/server/persistence/FileTxnLog.java   | 17 +++++++
 .../server/persistence/FileTxnSnapLog.java         | 10 +++-
 .../zookeeper/server/persistence/TxnLog.java       | 12 ++++-
 .../zookeeper/test/RestoreCommittedLogTest.java    | 56 ++++++++++++++++++++--
 8 files changed, 182 insertions(+), 22 deletions(-)

diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
index 19468fb..9996db9 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
@@ -612,7 +612,7 @@ property, when available, is noted below.
     transaction log file in blocks of preAllocSize kilobytes. The
     default block size is 64M. One reason for changing the size of
     the blocks is to reduce the block size if snapshots are taken
-    more often. (Also, see **snapCount**).
+    more often. (Also, see **snapCount** and **snapSizeLimitInKb**).
 
 * *snapCount* :
     (Java system property: **zookeeper.snapCount**)
@@ -626,6 +626,19 @@ property, when available, is noted below.
     reaches a runtime generated random value in the \[snapCount/2+1, snapCount]
     range.The default snapCount is 100,000.
 
+* *snapSizeLimitInKb* :
+    (Java system property: **zookeeper.snapSizeLimitInKb**)
+    ZooKeeper records its transactions using snapshots and
+    a transaction log (think write-ahead log). The total size in bytes allowed
+    in the set of transactions recorded in the transaction log before a snapshot
+    can be taken (and the transaction log rolled) is determined
+    by snapSize. In order to prevent all of the machines in the quorum
+    from taking a snapshot at the same time, each ZooKeeper server
+    will take a snapshot when the size in bytes of the set of transactions in the
+    transaction log reaches a runtime generated random value in the \[snapSize/2+1, snapSize]
+    range. The default snapSizeLimitInKb is 4,194,304 (4GB). A non-positive
+    value will disable the feature.
+
 * *txnLogSizeLimitInKb* :
     (Java system property: **zookeeper.txnLogSizeLimitInKb**)
     Zookeeper transaction log file can also be controlled more
@@ -633,14 +646,14 @@ property, when available, is noted below.
     slower follower syncs when sync is done using transaction log.
     This is because leader has to scan through the appropriate log
     file on disk to find the transaction to start sync from.
-    This feature is turned off by this default and snapCount is the
-    only value that limits transaction log size. When enabled
-    Zookeeper will roll the log when either of the limit is hit.
+    This feature is turned off by default and snapCount and snapSizeLimitInKb are the
+    only values that limit transaction log size. When enabled
+    Zookeeper will roll the log when any of the limits is hit.
     Please note that actual log size can exceed this value by the size
     of the serialized transaction. On the other hand, if this value is
     set too close to (or smaller than) **preAllocSize**,
-    it can cause Zookeeper to roll the log for every tranasaction. While
-    this is not a correctness issue, this may cause severly degraded
+    it can cause Zookeeper to roll the log for every transaction. While
+    this is not a correctness issue, this may cause severely degraded
     performance. To avoid this and to get most out of this feature, it is
     recommended to set the value to N * **preAllocSize**
     where N >= 2.
@@ -1759,8 +1772,8 @@ that represents the update is written to non-volatile storage. A new
 log file is started when the number of transactions written to the
 current log file reaches a (variable) threshold. The threshold is
 computed using the same parameter which influences the frequency of
-snapshotting (see snapCount above). The log file's suffix is the first
-zxid written to that log.
+snapshotting (see snapCount and snapSizeLimitInKb above). The log file's
+suffix is the first zxid written to that log.
 
 <a name="sc_filemanagement"></a>
 
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
index 9d661c4..044c456 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
@@ -60,6 +60,17 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements
     /** The number of log entries to log before starting a snapshot */
     private static int snapCount = ZooKeeperServer.getSnapCount();
 
+    /**
+     * The total size of log entries before starting a snapshot
+     */
+    private static long snapSizeInBytes = ZooKeeperServer.getSnapSizeInBytes();
+
+    /**
+     * Random numbers used to vary snapshot timing
+     */
+    private int randRoll;
+    private long randSize;
+
     private final BlockingQueue<Request> queuedRequests =
         new LinkedBlockingQueue<Request>();
 
@@ -101,14 +112,33 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements
         return snapCount;
     }
 
+    /**
+     * used by tests to check for changing
+     * snapcounts
+     * @param size
+     */
+    public static void setSnapSizeInBytes(long size) {
+        snapSizeInBytes = size;
+    }
+
+    private boolean shouldSnapshot() {
+        int logCount = zks.getZKDatabase().getTxnCount();
+        long logSize = zks.getZKDatabase().getTxnSize();
+        return (logCount > (snapCount / 2 + randRoll)) ||
+                (snapSizeInBytes > 0 && logSize > (snapSizeInBytes / 2 + randSize));
+    }
+
+    private void resetSnapshotStats() {
+        randRoll = ThreadLocalRandom.current().nextInt(snapCount/2);
+        randSize = Math.abs(ThreadLocalRandom.current().nextLong() % (snapSizeInBytes/2));
+    }
+
     @Override
     public void run() {
         try {
-            int logCount = 0;
-
             // we do this in an attempt to ensure that not all of the servers
             // in the ensemble take a snapshot at the same time
-            int randRoll = ThreadLocalRandom.current().nextInt(snapCount / 2, snapCount);
+            resetSnapshotStats();
             while (true) {
                 Request si = queuedRequests.poll();
                 if (si == null) {
@@ -122,9 +152,8 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements
 
                 // track the number of records written to the log
                 if (zks.getZKDatabase().append(si)) {
-                    logCount++;
-                    if (logCount > randRoll) {
-                        randRoll = ThreadLocalRandom.current().nextInt(snapCount / 2, snapCount);
+                    if (shouldSnapshot()) {
+                        resetSnapshotStats();
                         // roll the log
                         zks.getZKDatabase().rollLog();
                         // take a snapshot
@@ -143,7 +172,6 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements
                                 }
                             }.start();
                         }
-                        logCount = 0;
                     }
                 } else if (toFlush.isEmpty()) {
                     // optimization for read heavy workloads
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
index da08ba5..8ad79ca 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
@@ -31,6 +31,7 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -92,6 +93,11 @@ public class ZKDatabase {
     volatile private boolean initialized = false;
 
     /**
+     * Number of txn since last snapshot;
+     */
+    private AtomicInteger txnCount = new AtomicInteger(0);
+
+    /**
      * the filetxnsnaplog that this zk database
      * maps to. There is a one to one relationship
      * between a filetxnsnaplog and zkdatabase.
@@ -595,6 +601,7 @@ public class ZKDatabase {
      * @return true if the append was succesfull and false if not
      */
     public boolean append(Request si) throws IOException {
+        txnCount.incrementAndGet();
         return this.snapLog.append(si);
     }
 
@@ -603,6 +610,7 @@ public class ZKDatabase {
      */
     public void rollLog() throws IOException {
         this.snapLog.rollLog();
+        resetTxnCount();
     }
 
     /**
@@ -675,4 +683,26 @@ public class ZKDatabase {
     public DataTree createDataTree() {
         return new DataTree();
     }
+
+    /**
+     * Reset the number of txn since last rollLog
+     */
+    public void resetTxnCount() {
+        txnCount.set(0);
+        snapLog.setTotalLogSize(0);
+    }
+
+    /**
+     * Get the number of txn since last snapshot
+     */
+    public int getTxnCount() {
+        return txnCount.get();
+    }
+
+    /**
+     * Get the size of txn since last snapshot
+     */
+    public long getTxnSize() {
+        return snapLog.getTotalLogSize();
+    }
 }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 262df5f..d714591 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -946,6 +946,14 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         return limit;
     }
 
+    public static long getSnapSizeInBytes() {
+        long size = Long.getLong("zookeeper.snapSizeLimitInKb", 4194304L); // 4GB by default
+        if (size <= 0) {
+            LOG.info("zookeeper.snapSizeLimitInKb set to a non-positive value {}; disabling feature", size);
+        }
+        return size * 1024; // Convert to bytes
+    }
+
     public void setServerCnxnFactory(ServerCnxnFactory factory) {
         serverCnxnFactory = factory;
     }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnLog.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnLog.java
index 3f0047e..50cdf27 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnLog.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnLog.java
@@ -158,6 +158,12 @@ public class FileTxnLog implements TxnLog {
     private volatile long syncElapsedMS = -1L;
 
     /**
+     * A running total of all complete log files
+     * This does not include the current file being written to
+     */
+    private long prevLogsRunningTotal;
+
+    /**
      * constructor for FileTxnLog. Take the directory
      * where the txnlogs are stored
      * @param logDir the directory where the txnlogs are stored
@@ -202,6 +208,14 @@ public class FileTxnLog implements TxnLog {
         return 0;
     }
 
+    public synchronized void setTotalLogSize(long size) {
+        prevLogsRunningTotal = size;
+    }
+
+    public synchronized long getTotalLogSize() {
+        return prevLogsRunningTotal + getCurrentLogSize();
+    }
+
     /**
      * creates a checksum algorithm to be used
      * @return the checksum used for this txnlog
@@ -217,8 +231,11 @@ public class FileTxnLog implements TxnLog {
     public synchronized void rollLog() throws IOException {
         if (logStream != null) {
             this.logStream.flush();
+            prevLogsRunningTotal += getCurrentLogSize();
             this.logStream = null;
             oa = null;
+
+            // Roll over the current log file into the running total
         }
     }
 
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
index e6d2998..7409af8 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
@@ -491,7 +491,7 @@ public class FileTxnSnapLog {
     /**
      * append the request to the transaction logs
      * @param si the request to be appended
-     * returns true iff something appended, otw false
+     * @return true iff something appended, otw false
      * @throws IOException
      */
     public boolean append(Request si) throws IOException {
@@ -554,4 +554,12 @@ public class FileTxnSnapLog {
             super(msg);
         }
     }
+
+    public void setTotalLogSize(long size) {
+        txnLog.setTotalLogSize(size);
+    }
+
+    public long getTotalLogSize() {
+        return txnLog.getTotalLogSize();
+    }
 }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/TxnLog.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/TxnLog.java
index 200c933..7753c98 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/TxnLog.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/TxnLog.java
@@ -47,7 +47,7 @@ public interface TxnLog extends Closeable {
      * Append a request to the transaction log
      * @param hdr the transaction header
      * @param r the transaction itself
-     * returns true iff something appended, otw false 
+     * @return true iff something appended, otw false
      * @throws IOException
      */
     boolean append(TxnHeader hdr, Record r) throws IOException;
@@ -98,6 +98,16 @@ public interface TxnLog extends Closeable {
     long getTxnLogSyncElapsedTime();
    
     /**
+     * Sets the total size of all log files
+     */
+    void setTotalLogSize(long size);
+
+    /**
+     * Gets the total size of all log files
+     */
+    long getTotalLogSize();
+
+    /**
      * an iterating interface for reading 
      * transaction logs. 
      */
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/RestoreCommittedLogTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/RestoreCommittedLogTest.java
index 951458e..e630b85 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/RestoreCommittedLogTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/RestoreCommittedLogTest.java
@@ -43,16 +43,56 @@ public class RestoreCommittedLogTest extends ZKTestCase{
     private static final Logger LOG = LoggerFactory.getLogger(RestoreCommittedLogTest.class);
     private static final String HOSTPORT = "127.0.0.1:" + PortAssignment.unique();
     private static final int CONNECTION_TIMEOUT = 3000;
+
+    /**
+     * Verify the logs can be used to restore when they are rolled
+     * based on the size of the transactions received
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testRestoreCommittedLogWithSnapSize() throws Exception {
+        final int minExpectedSnapshots = 5;
+        final int minTxnsToSnap = 40;
+        final int numTransactions = minExpectedSnapshots * minTxnsToSnap;
+        final StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < 4*1024; i++) {
+            sb.append("0");
+        }
+        final byte[] data = sb.toString().getBytes();
+
+        SyncRequestProcessor.setSnapCount(numTransactions * 1000);
+        SyncRequestProcessor.setSnapSizeInBytes(minTxnsToSnap * data.length);
+
+        testRestoreCommittedLog(numTransactions, data, minExpectedSnapshots);
+
+    }
+
+    /**
+     * Verify the logs can be used to restore when they are rolled
+     * based on the number of transactions received
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testRestoreCommittedLogWithSnapCount() throws Exception {
+        final int minExpectedSnapshots = 30;
+        final int snapCount = 100;
+
+        SyncRequestProcessor.setSnapCount(snapCount);
+        SyncRequestProcessor.setSnapSizeInBytes(4294967296L);
+
+        testRestoreCommittedLog(minExpectedSnapshots * snapCount, new byte[0], minExpectedSnapshots);
+    }
+
     /**
      * test the purge
      * @throws Exception an exception might be thrown here
      */
-    @Test
-    public void testRestoreCommittedLog() throws Exception {
+    private void testRestoreCommittedLog(int totalTransactions, byte[] data, int minExpectedSnapshots) throws Exception {
         File tmpDir = ClientBase.createTmpDir();
         ClientBase.setupTestEnv();
         ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
-        SyncRequestProcessor.setSnapCount(100);
         final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
         ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1);
         f.startup(zks);
@@ -60,18 +100,24 @@ public class RestoreCommittedLogTest extends ZKTestCase{
                 ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT));
         ZooKeeper zk = ClientBase.createZKClient(HOSTPORT);
         try {
-            for (int i = 0; i< 2000; i++) {
-                zk.create("/invalidsnap-" + i, new byte[0], Ids.OPEN_ACL_UNSAFE,
+            for (int i = 0; i< totalTransactions; i++) {
+                zk.create("/invalidsnap-" + i, data, Ids.OPEN_ACL_UNSAFE,
                         CreateMode.PERSISTENT);
             }
         } finally {
             zk.close();
         }
+        final int numSnaps = zks.getTxnLogFactory().findNRecentSnapshots(10 * minExpectedSnapshots).size();
+        LOG.info("number of snapshots taken {}", numSnaps);
+
         f.shutdown();
         zks.shutdown();
         Assert.assertTrue("waiting for server to shutdown",
                 ClientBase.waitForServerDown(HOSTPORT, CONNECTION_TIMEOUT));
 
+        Assert.assertTrue("too few snapshot files", numSnaps > minExpectedSnapshots);
+        Assert.assertTrue("too many snapshot files", numSnaps <= minExpectedSnapshots * 2);
+
         // start server again
         zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
         zks.startdata();