You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by eo...@apache.org on 2019/12/27 21:33:53 UTC

[zookeeper] branch branch-3.6 updated: ZOOKEEPER-3512: Real time data consistency check during broadcast time

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-3.6
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/branch-3.6 by this push:
     new 034bcda  ZOOKEEPER-3512: Real time data consistency check during broadcast time
034bcda is described below

commit 034bcda589ae9d64ab3467b254179ed37f9b1635
Author: Fangmin Lyu <fa...@apache.org>
AuthorDate: Fri Dec 27 22:33:26 2019 +0100

    ZOOKEEPER-3512: Real time data consistency check during broadcast time
    
    This is the 2nd part of data consistency based on digest, it checks the DataTree digest on every txn during broadcast time.
    
    Author: Fangmin Lyu <fa...@apache.org>
    Author: Enrico Olivelli <eo...@apache.org>
    
    Reviewers: Enrico Olivelli <eo...@apache.org>, Andor Molnar <an...@apache.org>, Michael Han <ha...@apache.org>
    
    Closes #1059 from lvfangmin/ZOOKEEPER-3512
    
    (cherry picked from commit 2805e8982c0e8964e5f8d86f4b01a293d75d4aed)
    Signed-off-by: Enrico Olivelli <eo...@apache.org>
---
 pom.xml                                            |   6 +
 .../org/apache/zookeeper/graph/TxnLogSource.java   |  12 +-
 .../src/main/resources/markdown/zookeeperAdmin.md  |  59 ++++-
 zookeeper-jute/src/main/resources/zookeeper.jute   |   4 +
 zookeeper-server/pom.xml                           |   7 +-
 .../java/org/apache/zookeeper/server/DataNode.java |   2 +-
 .../java/org/apache/zookeeper/server/DataTree.java | 135 ++++++++--
 .../apache/zookeeper/server/DigestCalculator.java  |   6 +-
 .../org/apache/zookeeper/server/LogFormatter.java  |  13 +-
 .../org/apache/zookeeper/server/NodeHashMap.java   |   6 -
 .../apache/zookeeper/server/NodeHashMapImpl.java   |  30 ++-
 .../zookeeper/server/PrepRequestProcessor.java     | 193 +++++++++++++-
 .../java/org/apache/zookeeper/server/Request.java  |  12 +-
 .../apache/zookeeper/server/SnapshotFormatter.java |   5 +
 .../org/apache/zookeeper/server/TxnLogEntry.java   |  50 ++++
 .../zookeeper/server/TxnLogProposalIterator.java   |  17 +-
 .../org/apache/zookeeper/server/ZKDatabase.java    |  21 +-
 .../apache/zookeeper/server/ZooKeeperServer.java   |  28 ++-
 .../zookeeper/server/persistence/FileSnap.java     |  13 +-
 .../zookeeper/server/persistence/FileTxnLog.java   |  20 +-
 .../server/persistence/FileTxnSnapLog.java         |  10 +-
 .../zookeeper/server/persistence/TxnLog.java       |  28 +++
 .../server/persistence/TxnLogToolkit.java          |   6 +-
 .../apache/zookeeper/server/persistence/Util.java  |   9 +
 .../zookeeper/server/quorum/CommitProcessor.java   |   1 +
 .../apache/zookeeper/server/quorum/Follower.java   |  10 +-
 .../server/quorum/FollowerZooKeeperServer.java     |   4 +-
 .../org/apache/zookeeper/server/quorum/Leader.java |   2 +-
 .../server/quorum/LeaderZooKeeperServer.java       |   2 +-
 .../apache/zookeeper/server/quorum/Learner.java    |  24 +-
 .../zookeeper/server/quorum/LearnerHandler.java    |   9 -
 .../apache/zookeeper/server/quorum/Observer.java   |  21 +-
 .../apache/zookeeper/server/quorum/QuorumBean.java |  10 +
 .../zookeeper/server/quorum/QuorumMXBean.java      |   3 +
 .../org/apache/zookeeper/server/util/AdHash.java   |  14 +-
 .../apache/zookeeper/server/util/LogChopper.java   |   6 +-
 .../zookeeper/server/util/SerializeUtils.java      |  35 +--
 .../org/apache/zookeeper/server/DataTreeTest.java  |  35 ++-
 .../server/PrepRequestProcessorMetricsTest.java    |   7 +-
 .../zookeeper/server/PrepRequestProcessorTest.java |  64 ++++-
 .../apache/zookeeper/server/TxnLogDigestTest.java  | 277 +++++++++++++++++++++
 .../server/persistence/FileTxnSnapLogTest.java     |  46 +++-
 .../server/quorum/FuzzySnapshotRelatedTest.java    | 127 ++++++++++
 .../zookeeper/server/quorum/LeaderBeanTest.java    |  18 +-
 .../zookeeper/server/quorum/QuorumDigestTest.java  | 263 +++++++++++++++++++
 .../server/quorum/QuorumPeerTestBase.java          |  39 ++-
 .../apache/zookeeper/server/quorum/Zab1_0Test.java |  12 +-
 .../apache/zookeeper/server/util/AdHashTest.java   |   2 +-
 .../zookeeper/test/GetProposalFromTxnTest.java     |   7 +-
 .../java/org/apache/zookeeper/test/JMXEnv.java     |   1 +
 50 files changed, 1557 insertions(+), 174 deletions(-)

diff --git a/pom.xml b/pom.xml
index c501bcd..3f5d9bd 100755
--- a/pom.xml
+++ b/pom.xml
@@ -280,6 +280,7 @@
     <!-- dependency versions -->
     <slf4j.version>1.7.25</slf4j.version>
     <audience-annotations.version>0.5.0</audience-annotations.version>
+    <jmockit.version>1.48</jmockit.version>
     <junit.version>4.12</junit.version>
     <log4j.version>1.2.17</log4j.version>
     <mockito.version>2.27.0</mockito.version>
@@ -400,6 +401,11 @@
         <version>${log4j.version}</version>
       </dependency>
       <dependency>
+        <groupId>org.jmockit</groupId>
+        <artifactId>jmockit</artifactId>
+        <version>${jmockit.version}</version>
+      </dependency>
+      <dependency>
         <groupId>junit</groupId>
         <artifactId>junit</artifactId>
         <version>${junit.version}</version>
diff --git a/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/TxnLogSource.java b/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/TxnLogSource.java
index 809c455..ad2e258 100644
--- a/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/TxnLogSource.java
+++ b/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/TxnLogSource.java
@@ -31,6 +31,7 @@ import org.apache.jute.BinaryInputArchive;
 import org.apache.jute.InputArchive;
 import org.apache.jute.Record;
 import org.apache.zookeeper.server.TraceFormatter;
+import org.apache.zookeeper.server.TxnLogEntry;
 import org.apache.zookeeper.server.persistence.FileHeader;
 import org.apache.zookeeper.server.persistence.FileTxnLog;
 import org.apache.zookeeper.server.util.SerializeUtils;
@@ -180,8 +181,10 @@ public class TxnLogSource implements LogSource {
 		    throw new IOException("CRC doesn't match " + crcValue +
 					  " vs " + crc.getValue());
 		}
-		TxnHeader hdr = new TxnHeader();
-		Record r = SerializeUtils.deserializeTxn(bytes, hdr);
+    
+		TxnLogEntry logEntry = SerializeUtils.deserializeTxn(bytes);
+		TxnHeader hdr = logEntry.getHeader();
+		Record r = logEntry.getTxn();
 
 		switch (hdr.getType()) {
 		case OpCode.createSession: {
@@ -327,8 +330,9 @@ public class TxnLogSource implements LogSource {
 		if (logStream.readByte("EOR") != 'B') {
 		    throw new EOFException("Last transaction was partial.");
 		}
-		TxnHeader hdr = new TxnHeader();
-		Record r = SerializeUtils.deserializeTxn(bytes, hdr);
+		TxnLogEntry logEntry = SerializeUtils.deserializeTxn(bytes);
+		TxnHeader hdr = logEntry.getHeader();
+		Record r = logEntry.getTxn();
 		
 		if (starttime == 0) {
 		    starttime = hdr.getTime();
diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
index 8e0574c..fb7d789 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
@@ -964,8 +964,63 @@ property, when available, is noted below.
 * *digest.enabled* :
     (Java system property only: **zookeeper.digest.enabled**)
     **New in 3.6.0:**
-    The digest feature is added to self-verify the correctness inside
-    ZooKeeper when loading database from disk, and syncing with leader.
+    The digest feature is added to detect the data inconsistency inside
+    ZooKeeper when loading database from disk, catching up and following
+    leader, its doing incrementally hash check for the DataTree based on 
+    the adHash paper mentioned in
+
+        https://cseweb.ucsd.edu/~daniele/papers/IncHash.pdf
+
+    The idea is simple, the hash value of DataTree will be updated incrementally 
+    based on the changes to the set of data. When the leader is preparing the txn, 
+    it will pre-calculate the hash of the tree based on the changes happened with 
+    formula:
+
+        current_hash = current_hash + hash(new node data) - hash(old node data)
+
+    If it’s creating a new node, the hash(old node data) will be 0, and if it’s a 
+    delete node op, the hash(new node data) will be 0.
+
+    This hash will be associated with each txn to represent the expected hash value 
+    after applying the txn to the data tree, it will be sent to followers with 
+    original proposals. Learner will compare the actual hash value with the one in 
+    the txn after applying the txn to the data tree, and report mismatch if it’s not 
+    the same.
+
+    These digest value will also be persisted with each txn and snapshot on the disk, 
+    so when servers restarted and load data from disk, it will compare and see if 
+    there is hash mismatch, which will help detect data loss issue on disk.
+
+    For the actual hash function, we’re using CRC internally, it’s not a collisionless 
+    hash function, but it’s more efficient compared to collisionless hash, and the 
+    collision possibility is really really rare and can already meet our needs here.
+
+    This feature is backward and forward compatible, so it can safely rolling upgrade, 
+    downgrade, enabled and later disabled without any compatible issue. Here are the 
+    scenarios have been covered and tested:
+
+    1. When leader runs with new code while follower runs with old one, the digest will 
+       be append to the end of each txn, follower will only read header and txn data, 
+       digest value in the txn will be ignored. It won't affect the follower reads and 
+       processes the next txn.
+    2. When leader runs with old code while follower runs with new one, the digest won't
+       be sent with txn, when follower tries to read the digest, it will throw EOF which 
+       is caught and handled gracefully with digest value set to null.
+    3. When loading old snapshot with new code, it will throw IOException when trying to
+       read the non-exist digest value, and the exception will be caught and digest will
+       be set to null, which means we won't compare digest when loading this snapshot, 
+       which is expected to happen during rolling upgrade
+    4. When loading new snapshot with old code, it will finish successfully after deserialzing 
+       the data tree, the digest value at the end of snapshot file will be ignored
+    5. The scenarios of rolling restart with flags change are similar to the 1st and 2nd 
+       scenarios discussed above, if the leader enabled but follower not, digest value will
+       be ignored, and follower won't compare the digest during runtime; if leader disabled
+       but follower enabled, follower will get EOF exception which is handled gracefully.
+
+    Note: the current digest calculation excluded nodes under /zookeeper 
+    due to the potential inconsistency in the /zookeeper/quota stat node, 
+    we can include that after that issue is fixed.
+
     By default, this feautre is disabled, set "true" to enable it.
 
 * *snapshot.trust.empty* :
diff --git a/zookeeper-jute/src/main/resources/zookeeper.jute b/zookeeper-jute/src/main/resources/zookeeper.jute
index 6d55365..898838f 100644
--- a/zookeeper-jute/src/main/resources/zookeeper.jute
+++ b/zookeeper-jute/src/main/resources/zookeeper.jute
@@ -278,6 +278,10 @@ module org.apache.zookeeper.server.persistence {
 }
 
 module org.apache.zookeeper.txn {
+    class TxnDigest {
+        int version;
+        long treeDigest;
+    }
     class TxnHeader {
         long clientId;
         int cxid;
diff --git a/zookeeper-server/pom.xml b/zookeeper-server/pom.xml
index 307e650..7a72b37 100755
--- a/zookeeper-server/pom.xml
+++ b/zookeeper-server/pom.xml
@@ -149,6 +149,11 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.jmockit</groupId>
+      <artifactId>jmockit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
@@ -281,7 +286,7 @@
           </includes>
           <forkCount>${surefire-forkcount}</forkCount>
           <reuseForks>false</reuseForks>
-          <argLine>-Xmx512m -Dtest.junit.threads=${surefire-forkcount} -Dzookeeper.junit.threadid=${surefire.forkNumber}</argLine>
+          <argLine>-Xmx512m -Dtest.junit.threads=${surefire-forkcount} -Dzookeeper.junit.threadid=${surefire.forkNumber} -javaagent:${org.jmockit:jmockit:jar}</argLine>
           <basedir>${project.basedir}</basedir>
           <redirectTestOutputToFile>true</redirectTestOutputToFile>
           <systemPropertyVariables>
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataNode.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataNode.java
index e35482c..8ac8f61 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataNode.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataNode.java
@@ -204,7 +204,7 @@ public class DataNode implements Record {
         this.digest = digest;
     }
 
-    public byte[] getData() {
+    public synchronized byte[] getData() {
         return data;
     }
 
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
index 0dc5458..d3529cf 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
@@ -77,6 +77,7 @@ import org.apache.zookeeper.txn.MultiTxn;
 import org.apache.zookeeper.txn.SetACLTxn;
 import org.apache.zookeeper.txn.SetDataTxn;
 import org.apache.zookeeper.txn.Txn;
+import org.apache.zookeeper.txn.TxnDigest;
 import org.apache.zookeeper.txn.TxnHeader;
 import org.apache.zookeeper.util.ServiceUtils;
 import org.slf4j.Logger;
@@ -178,6 +179,8 @@ public class DataTree {
     // The digest associated with the highest zxid in the data tree.
     private volatile ZxidDigest lastProcessedZxidDigest;
 
+    private boolean firstMismatchTxn = true;
+
     // Will be notified when digest mismatch event triggered.
     private final List<DigestWatcher> digestWatchers = new ArrayList<>();
 
@@ -465,15 +468,7 @@ public class DataTree {
         int lastSlash = path.lastIndexOf('/');
         String parentName = path.substring(0, lastSlash);
         String childName = path.substring(lastSlash + 1);
-        StatPersisted stat = new StatPersisted();
-        stat.setCtime(time);
-        stat.setMtime(time);
-        stat.setCzxid(zxid);
-        stat.setMzxid(zxid);
-        stat.setPzxid(zxid);
-        stat.setVersion(0);
-        stat.setAversion(0);
-        stat.setEphemeralOwner(ephemeralOwner);
+        StatPersisted stat = createStat(zxid, time, ephemeralOwner);
         DataNode parent = nodes.get(parentName);
         if (parent == null) {
             throw new KeeperException.NoNodeException();
@@ -777,7 +772,7 @@ public class DataTree {
             return nodes.size() - 2;
         }
 
-        return (int) nodes.keySet().parallelStream().filter(key -> key.startsWith(path + "/")).count();
+        return (int) nodes.entrySet().parallelStream().filter(entry -> entry.getKey().startsWith(path + "/")).count();
     }
 
     public Stat setACL(String path, List<ACL> acl, int version) throws KeeperException.NoNodeException {
@@ -868,6 +863,12 @@ public class DataTree {
 
     public volatile long lastProcessedZxid = 0;
 
+    public ProcessTxnResult processTxn(TxnHeader header, Record txn, TxnDigest digest) {
+        ProcessTxnResult result = processTxn(header, txn);
+        compareDigest(header, txn, digest);
+        return result;
+    }
+
     public ProcessTxnResult processTxn(TxnHeader header, Record txn) {
         return this.processTxn(header, txn, false);
     }
@@ -1713,9 +1714,10 @@ public class DataTree {
      * digestFromLoadedSnapshot.
      *
      * @param ia the input stream to read from
+     * @param startZxidOfSnapshot the zxid of snapshot file
      * @return the true if it deserialized successfully
      */
-    public boolean deserializeZxidDigest(InputArchive ia) throws IOException {
+    public boolean deserializeZxidDigest(InputArchive ia, long startZxidOfSnapshot) throws IOException {
         if (!ZooKeeperServer.isDigestEnabled()) {
             return false;
         }
@@ -1725,7 +1727,41 @@ public class DataTree {
             zxidDigest.deserialize(ia);
             if (zxidDigest.zxid > 0) {
                 digestFromLoadedSnapshot = zxidDigest;
+                LOG.info("The digest in the snapshot has digest version of {}, "
+                        + ", with zxid as 0x{}, and digest value as {}",
+                        digestFromLoadedSnapshot.digestVersion,
+                        Long.toHexString(digestFromLoadedSnapshot.zxid),
+                        digestFromLoadedSnapshot.digest);
+            } else {
+                digestFromLoadedSnapshot = null;
+                LOG.info("The digest value is empty in snapshot");
+            }
+
+            // There is possibility that the start zxid of a snapshot might
+            // be larger than the digest zxid in snapshot.
+            //
+            // Known cases:
+            //
+            // The new leader set the last processed zxid to be the new
+            // epoch + 0, which is not mapping to any txn, and it uses
+            // this to take snapshot, which is possible if we don't
+            // clean database before switching to LOOKING. In this case
+            // the currentZxidDigest will be the zxid of last epoch and
+            // it's smaller than the zxid of the snapshot file.
+            //
+            // It's safe to reset the targetZxidDigest to null and start
+            // to compare digest when replaying the first txn, since it's
+            // a non fuzzy snapshot.
+            if (digestFromLoadedSnapshot != null && digestFromLoadedSnapshot.zxid < startZxidOfSnapshot) {
+                LOG.info("The zxid of snapshot digest 0x{} is smaller "
+                        + "than the known snapshot highest zxid, the snapshot "
+                        + "started with zxid 0x{}. It will be invalid to use "
+                        + "this snapshot digest associated with this zxid, will "
+                        + "ignore comparing it.", Long.toHexString(digestFromLoadedSnapshot.zxid),
+                        Long.toHexString(startZxidOfSnapshot));
+                digestFromLoadedSnapshot = null;
             }
+
             return true;
         } catch (EOFException e) {
             LOG.warn("Got EOF exception while reading the digest, likely due to the reading an older snapshot.");
@@ -1754,9 +1790,56 @@ public class DataTree {
             }
             digestFromLoadedSnapshot = null;
         } else if (digestFromLoadedSnapshot.zxid != 0 && zxid > digestFromLoadedSnapshot.zxid) {
-            LOG.error(
-                "Watching for zxid 0x{} during snapshot recovery, but it wasn't found.",
-                Long.toHexString(digestFromLoadedSnapshot.zxid));
+            RATE_LOGGER.rateLimitLog("The txn 0x{} of snapshot digest does not "
+                    + "exist.", Long.toHexString(digestFromLoadedSnapshot.zxid));
+        }
+    }
+
+    /**
+     * Compares the digest of the tree with the digest present in transaction digest.
+     * If there is any error, logs and alerts the watchers.
+     *
+     * @param header transaction header being applied
+     * @param txn    transaction
+     * @param digest transaction digest
+     *
+     * @return false if digest in the txn doesn't match what we have now in
+     *               the data tree
+     */
+    public boolean compareDigest(TxnHeader header, Record txn, TxnDigest digest) {
+        long zxid = header.getZxid();
+
+        if (!ZooKeeperServer.isDigestEnabled() || digest == null) {
+            return true;
+        }
+        // do not compare digest if we're still in fuzzy state
+        if (digestFromLoadedSnapshot != null) {
+            return true;
+        }
+        // do not compare digest if there is digest version change
+        if (digestCalculator.getDigestVersion() != digest.getVersion()) {
+            RATE_LOGGER.rateLimitLog("Digest version not the same on zxid.",
+                    String.valueOf(zxid));
+            return true;
+        }
+
+        long logDigest = digest.getTreeDigest();
+        long actualDigest = getTreeDigest();
+        if (logDigest != actualDigest) {
+            reportDigestMismatch(zxid);
+            LOG.debug("Digest in log: {}, actual tree: {}", logDigest, actualDigest);
+            if (firstMismatchTxn) {
+                LOG.error("First digest mismatch on txn: {}, {}, "
+                        + "expected digest is {}, actual digest is {}, ",
+                        header, txn, digest, actualDigest);
+                firstMismatchTxn = false;
+            }
+            return false;
+        } else {
+            RATE_LOGGER.flush();
+            LOG.debug("Digests are matching for Zxid: {}, Digest in log "
+                    + "and actual tree: {}", Long.toHexString(zxid), logDigest);
+            return true;
         }
     }
 
@@ -1838,7 +1921,7 @@ public class DataTree {
             if (digestVersion < 2) {
                 String d = ia.readString("digest");
                 if (d != null) {
-                    digest = Long.parseLong(d);
+                    digest = Long.parseLong(d, 16);
                 }
             } else {
                 digest = ia.readLong("digest");
@@ -1853,10 +1936,30 @@ public class DataTree {
             return digestVersion;
         }
 
-        public Long getDigest() {
+        public long getDigest() {
             return digest;
         }
 
     }
 
+    /**
+     * Create a node stat from the given params.
+     *
+     * @param zxid the zxid associated with the txn
+     * @param time the time when the txn is created
+     * @param ephemeralOwner the owner if the node is an ephemeral
+     * @return the stat
+     */
+    public static StatPersisted createStat(long zxid, long time, long ephemeralOwner) {
+        StatPersisted stat = new StatPersisted();
+        stat.setCtime(time);
+        stat.setMtime(time);
+        stat.setCzxid(zxid);
+        stat.setMzxid(zxid);
+        stat.setPzxid(zxid);
+        stat.setVersion(0);
+        stat.setAversion(0);
+        stat.setEphemeralOwner(ephemeralOwner);
+        return stat;
+    }
 }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DigestCalculator.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DigestCalculator.java
index ca5041f..1209aba 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DigestCalculator.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DigestCalculator.java
@@ -22,15 +22,11 @@ import java.nio.ByteBuffer;
 import java.util.zip.CRC32;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.StatPersisted;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Defines how to calculate the digest for a given node.
  */
-class DigestCalculator {
-
-    private static final Logger LOG = LoggerFactory.getLogger(DigestCalculator.class);
+public class DigestCalculator {
 
     // The hardcoded digest version, should bump up this version whenever
     // we changed the digest method or fields.
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/LogFormatter.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/LogFormatter.java
index e02a63d..81392df 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/LogFormatter.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/LogFormatter.java
@@ -32,6 +32,7 @@ import org.apache.zookeeper.ZKUtil;
 import org.apache.zookeeper.server.persistence.FileHeader;
 import org.apache.zookeeper.server.persistence.FileTxnLog;
 import org.apache.zookeeper.server.util.SerializeUtils;
+import org.apache.zookeeper.txn.TxnDigest;
 import org.apache.zookeeper.txn.TxnHeader;
 import org.apache.zookeeper.util.ServiceUtils;
 import org.slf4j.Logger;
@@ -75,6 +76,9 @@ public class LogFormatter {
                            + " txnlog format version "
                            + fhdr.getVersion());
 
+        // enable digest
+        ZooKeeperServer.setDigestEnabled(true);
+
         int count = 0;
         while (true) {
             long crcValue;
@@ -98,15 +102,18 @@ public class LogFormatter {
             if (crcValue != crc.getValue()) {
                 throw new IOException("CRC doesn't match " + crcValue + " vs " + crc.getValue());
             }
-            TxnHeader hdr = new TxnHeader();
-            Record txn = SerializeUtils.deserializeTxn(bytes, hdr);
+            TxnLogEntry entry = SerializeUtils.deserializeTxn(bytes);
+            TxnHeader hdr = entry.getHeader();
+            Record txn = entry.getTxn();
+            TxnDigest digest = entry.getDigest();
             System.out.println(
                 DateFormat.getDateTimeInstance(DateFormat.SHORT, DateFormat.LONG).format(new Date(hdr.getTime()))
                 + " session 0x" + Long.toHexString(hdr.getClientId())
                 + " cxid 0x" + Long.toHexString(hdr.getCxid())
                 + " zxid 0x" + Long.toHexString(hdr.getZxid())
                 + " " + Request.op2String(hdr.getType())
-                + " " + txn);
+                + " " + txn
+                + " " + digest);
             if (logStream.readByte("EOR") != 'B') {
                 LOG.error("Last transaction was partial.");
                 throw new EOFException("Last transaction was partial.");
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NodeHashMap.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NodeHashMap.java
index f19bbfb..136ff7b 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NodeHashMap.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NodeHashMap.java
@@ -20,7 +20,6 @@ package org.apache.zookeeper.server;
 
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * The interface defined to manage the hash based on the entries in the
@@ -60,11 +59,6 @@ public interface NodeHashMap {
     DataNode remove(String path);
 
     /**
-     * Return all key set view inside this map.
-     */
-    ConcurrentHashMap.KeySetView<String, DataNode> keySet();
-
-    /**
      * Return all the entries inside this map.
      */
     Set<Map.Entry<String, DataNode>> entrySet();
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NodeHashMapImpl.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NodeHashMapImpl.java
index b28130b..eb56e90 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NodeHashMapImpl.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NodeHashMapImpl.java
@@ -21,6 +21,7 @@ package org.apache.zookeeper.server;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.server.util.AdHash;
 
 /**
@@ -29,13 +30,17 @@ import org.apache.zookeeper.server.util.AdHash;
  */
 public class NodeHashMapImpl implements NodeHashMap {
 
-    private final ConcurrentHashMap<String, DataNode> nodes = new ConcurrentHashMap<String, DataNode>();
-
-    private AdHash hash = new AdHash();
+    private final ConcurrentHashMap<String, DataNode> nodes;
+    private final boolean digestEnabled;
     private final DigestCalculator digestCalculator;
 
+    private final AdHash hash;
+
     public NodeHashMapImpl(DigestCalculator digestCalculator) {
         this.digestCalculator = digestCalculator;
+        nodes = new ConcurrentHashMap<>();
+        hash = new AdHash();
+        digestEnabled = ZooKeeperServer.isDigestEnabled();
     }
 
     @Override
@@ -68,11 +73,6 @@ public class NodeHashMapImpl implements NodeHashMap {
     }
 
     @Override
-    public ConcurrentHashMap.KeySetView<String, DataNode> keySet() {
-        return nodes.keySet();
-    }
-
-    @Override
     public Set<Map.Entry<String, DataNode>> entrySet() {
         return nodes.entrySet();
     }
@@ -80,7 +80,7 @@ public class NodeHashMapImpl implements NodeHashMap {
     @Override
     public void clear() {
         nodes.clear();
-        hash = new AdHash();
+        hash.clear();
     }
 
     @Override
@@ -102,13 +102,21 @@ public class NodeHashMapImpl implements NodeHashMap {
     }
 
     private void addDigest(String path, DataNode node) {
-        if (ZooKeeperServer.isDigestEnabled()) {
+        // Excluding everything under '/zookeeper/' for digest calculation.
+        if (path.startsWith(ZooDefs.ZOOKEEPER_NODE_SUBTREE)) {
+            return;
+        }
+        if (digestEnabled) {
             hash.addDigest(digestCalculator.calculateDigest(path, node));
         }
     }
 
     private void removeDigest(String path, DataNode node) {
-        if (ZooKeeperServer.isDigestEnabled()) {
+        // Excluding everything under '/zookeeper/' for digest calculation.
+        if (path.startsWith(ZooDefs.ZOOKEEPER_NODE_SUBTREE)) {
+            return;
+        }
+        if (digestEnabled) {
             hash.removeDigest(digestCalculator.calculateDigest(path, node));
         }
     }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
index 70d989a..74720ed 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
@@ -56,6 +56,7 @@ import org.apache.zookeeper.proto.ReconfigRequest;
 import org.apache.zookeeper.proto.SetACLRequest;
 import org.apache.zookeeper.proto.SetDataRequest;
 import org.apache.zookeeper.server.ZooKeeperServer.ChangeRecord;
+import org.apache.zookeeper.server.ZooKeeperServer.PrecalculatedDigest;
 import org.apache.zookeeper.server.auth.ProviderRegistry;
 import org.apache.zookeeper.server.auth.ServerAuthenticationProvider;
 import org.apache.zookeeper.server.quorum.LeaderZooKeeperServer;
@@ -76,6 +77,7 @@ import org.apache.zookeeper.txn.MultiTxn;
 import org.apache.zookeeper.txn.SetACLTxn;
 import org.apache.zookeeper.txn.SetDataTxn;
 import org.apache.zookeeper.txn.Txn;
+import org.apache.zookeeper.txn.TxnDigest;
 import org.apache.zookeeper.txn.TxnHeader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -100,9 +102,15 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
     LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();
 
     private final RequestProcessor nextProcessor;
+    private final boolean digestEnabled;
+    private DigestCalculator digestCalculator;
 
     ZooKeeperServer zks;
 
+    public enum DigestOpCode {
+        NOOP, ADD, REMOVE, UPDATE;
+    }
+
     public PrepRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) {
         super(
             "ProcessThread(sid:" + zks.getServerId()
@@ -110,6 +118,10 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
             + "):", zks.getZooKeeperServerListener());
         this.nextProcessor = nextProcessor;
         this.zks = zks;
+        this.digestEnabled = ZooKeeperServer.isDigestEnabled();
+        if (this.digestEnabled) {
+            this.digestCalculator = new DigestCalculator();
+        }
     }
 
     /**
@@ -159,6 +171,12 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
                         children = n.getChildren();
                     }
                     lastChange = new ChangeRecord(-1, path, n.stat, children.size(), zks.getZKDatabase().aclForNode(n));
+
+                    if (digestEnabled) {
+                        lastChange.precalculatedDigest = new PrecalculatedDigest(
+                                digestCalculator.calculateDigest(path, n), 0);
+                    }
+                    lastChange.data = n.getData();
                 }
             }
         }
@@ -297,8 +315,12 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
      * @param record
      */
     protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) throws KeeperException, IOException, RequestProcessorException {
-        request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), type));
+        if (request.getHdr() == null) {
+            request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
+                    Time.currentWallTime(), type));
+        }
 
+        PrecalculatedDigest precalculatedDigest;
         switch (type) {
         case OpCode.create:
         case OpCode.create2:
@@ -321,8 +343,15 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
             request.setTxn(new DeleteTxn(path));
             parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
             parentRecord.childCount--;
+            parentRecord.stat.setPzxid(request.getHdr().getZxid());
+            parentRecord.precalculatedDigest = precalculateDigest(
+                    DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
             addChangeRecord(parentRecord);
-            addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path, null, -1, null));
+
+            nodeRecord = new ChangeRecord(request.getHdr().getZxid(), path, null, -1, null);
+            nodeRecord.precalculatedDigest = precalculateDigest(DigestOpCode.REMOVE, path);
+            setTxnDigest(request, nodeRecord.precalculatedDigest);
+            addChangeRecord(nodeRecord);
             break;
         }
         case OpCode.delete:
@@ -343,8 +372,15 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
             request.setTxn(new DeleteTxn(path));
             parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
             parentRecord.childCount--;
+            parentRecord.stat.setPzxid(request.getHdr().getZxid());
+            parentRecord.precalculatedDigest = precalculateDigest(
+                    DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
             addChangeRecord(parentRecord);
-            addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path, null, -1, null));
+
+            nodeRecord = new ChangeRecord(request.getHdr().getZxid(), path, null, -1, null);
+            nodeRecord.precalculatedDigest = precalculateDigest(DigestOpCode.REMOVE, path);
+            setTxnDigest(request, nodeRecord.precalculatedDigest);
+            addChangeRecord(nodeRecord);
             break;
         case OpCode.setData:
             zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
@@ -360,6 +396,12 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
             request.setTxn(new SetDataTxn(path, setDataRequest.getData(), newVersion));
             nodeRecord = nodeRecord.duplicate(request.getHdr().getZxid());
             nodeRecord.stat.setVersion(newVersion);
+            nodeRecord.stat.setMtime(request.getHdr().getTime());
+            nodeRecord.stat.setMzxid(zxid);
+            nodeRecord.data = setDataRequest.getData();
+            nodeRecord.precalculatedDigest = precalculateDigest(
+                    DigestOpCode.UPDATE, path, nodeRecord.data, nodeRecord.stat);
+            setTxnDigest(request, nodeRecord.precalculatedDigest);
             addChangeRecord(nodeRecord);
             break;
         case OpCode.reconfig:
@@ -490,10 +532,20 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
 
             nodeRecord = getRecordForPath(ZooDefs.CONFIG_NODE);
             zks.checkACL(request.cnxn, nodeRecord.acl, ZooDefs.Perms.WRITE, request.authInfo, null, null);
-            request.setTxn(new SetDataTxn(ZooDefs.CONFIG_NODE, request.qv.toString().getBytes(), -1));
+            SetDataTxn setDataTxn = new SetDataTxn(ZooDefs.CONFIG_NODE, request.qv.toString().getBytes(), -1);
+            request.setTxn(setDataTxn);
             nodeRecord = nodeRecord.duplicate(request.getHdr().getZxid());
             nodeRecord.stat.setVersion(-1);
+            nodeRecord.stat.setMtime(request.getHdr().getTime());
+            nodeRecord.stat.setMzxid(zxid);
+            nodeRecord.data = setDataTxn.getData();
+            // Reconfig is currently a noop from digest computation
+            // perspective since config node is not covered by the digests.
+            nodeRecord.precalculatedDigest = precalculateDigest(
+                    DigestOpCode.NOOP, ZooDefs.CONFIG_NODE, nodeRecord.data, nodeRecord.stat);
+            setTxnDigest(request, nodeRecord.precalculatedDigest);
             addChangeRecord(nodeRecord);
+
             break;
         case OpCode.setACL:
             zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
@@ -510,6 +562,9 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
             request.setTxn(new SetACLTxn(path, listACL, newVersion));
             nodeRecord = nodeRecord.duplicate(request.getHdr().getZxid());
             nodeRecord.stat.setAversion(newVersion);
+            nodeRecord.precalculatedDigest = precalculateDigest(
+                    DigestOpCode.UPDATE, path, nodeRecord.data, nodeRecord.stat);
+            setTxnDigest(request, nodeRecord.precalculatedDigest);
             addChangeRecord(nodeRecord);
             break;
         case OpCode.createSession:
@@ -542,7 +597,20 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
                     }
                 }
                 for (String path2Delete : es) {
-                    addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path2Delete, null, 0, null));
+                    if (digestEnabled) {
+                        parentPath = getParentPathAndValidate(path2Delete);
+                        parentRecord = getRecordForPath(parentPath);
+                        parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
+                        parentRecord.stat.setPzxid(request.getHdr().getZxid());
+                        parentRecord.precalculatedDigest = precalculateDigest(
+                                DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
+                        addChangeRecord(parentRecord);
+                    }
+                    nodeRecord = new ChangeRecord(
+                            request.getHdr().getZxid(), path2Delete, null, 0, null);
+                    nodeRecord.precalculatedDigest = precalculateDigest(
+                            DigestOpCode.REMOVE, path2Delete);
+                    addChangeRecord(nodeRecord);
                 }
                 if (ZooKeeperServer.isCloseSessionTxnEnabled()) {
                     request.setTxn(new CloseSessionTxn(new ArrayList<String>(es)));
@@ -569,6 +637,12 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
             LOG.warn("unknown type {}", type);
             break;
         }
+
+        // If the txn is not going to mutate anything, like createSession,
+        // we just set the current tree digest in it
+        if (request.getTxnDigest() == null && digestEnabled) {
+            setTxnDigest(request);
+        }
     }
 
     private void pRequest2TxnCreate(int type, Request request, Record record, boolean deserialize) throws IOException, KeeperException {
@@ -628,15 +702,31 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
         } else {
             request.setTxn(new CreateTxn(path, data, listACL, createMode.isEphemeral(), newCversion));
         }
-        StatPersisted s = new StatPersisted();
-        if (createMode.isEphemeral()) {
-            s.setEphemeralOwner(request.sessionId);
+
+        TxnHeader hdr = request.getHdr();
+        long ephemeralOwner = 0;
+        if (createMode.isContainer()) {
+            ephemeralOwner = EphemeralType.CONTAINER_EPHEMERAL_OWNER;
+        } else if (createMode.isTTL()) {
+            ephemeralOwner = EphemeralType.TTL.toEphemeralOwner(ttl);
+        } else if (createMode.isEphemeral()) {
+            ephemeralOwner = request.sessionId;
         }
+        StatPersisted s = DataTree.createStat(hdr.getZxid(), hdr.getTime(), ephemeralOwner);
         parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
         parentRecord.childCount++;
         parentRecord.stat.setCversion(newCversion);
+        parentRecord.stat.setPzxid(request.getHdr().getZxid());
+        parentRecord.precalculatedDigest = precalculateDigest(
+                DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
         addChangeRecord(parentRecord);
-        addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path, s, 0, listACL));
+        ChangeRecord nodeRecord = new ChangeRecord(
+                request.getHdr().getZxid(), path, s, 0, listACL);
+        nodeRecord.data = data;
+        nodeRecord.precalculatedDigest = precalculateDigest(
+                DigestOpCode.ADD, path, nodeRecord.data, s);
+        setTxnDigest(request, nodeRecord.precalculatedDigest);
+        addChangeRecord(nodeRecord);
     }
 
     private void validatePath(String path, long sessionId) throws BadArgumentsException {
@@ -724,6 +814,8 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
 
                 //Store off current pending change records in case we need to rollback
                 Map<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest);
+                request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
+                        Time.currentWallTime(), request.type));
 
                 for (Op op : multiRequest) {
                     Record subrequest = op.toRequestRecord();
@@ -741,7 +833,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
                         /* Prep the request and convert to a Txn */
                         try {
                             pRequest2Txn(op.getType(), zxid, request, subrequest, false);
-                            type = request.getHdr().getType();
+                            type = op.getType();
                             txn = request.getTxn();
                         } catch (KeeperException e) {
                             ke = e;
@@ -774,8 +866,10 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
                     }
                 }
 
-                request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), request.type));
                 request.setTxn(new MultiTxn(txns));
+                if (digestEnabled) {
+                    setTxnDigest(request);
+                }
 
                 break;
 
@@ -956,4 +1050,81 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
         nextProcessor.shutdown();
     }
 
+    /**
+     * Calculate the node digest and tree digest after the change.
+     *
+     * @param type the type of operations about the digest change
+     * @param path the path of the node
+     * @param data the data of the node
+     * @param s the stat of the node
+     *
+     * @return PrecalculatedDigest the pair of node and tree digest
+     */
+    private PrecalculatedDigest precalculateDigest(DigestOpCode type, String path,
+            byte[] data, StatPersisted s) throws KeeperException.NoNodeException {
+
+        if (!digestEnabled) {
+            return null;
+        }
+
+        long prevNodeDigest;
+        long newNodeDigest;
+
+        switch (type) {
+            case ADD:
+                prevNodeDigest = 0;
+                newNodeDigest = digestCalculator.calculateDigest(path, data, s);
+                break;
+            case REMOVE:
+                prevNodeDigest = getRecordForPath(path).precalculatedDigest.nodeDigest;
+                newNodeDigest = 0;
+                break;
+            case UPDATE:
+                prevNodeDigest = getRecordForPath(path).precalculatedDigest.nodeDigest;
+                newNodeDigest = digestCalculator.calculateDigest(path, data, s);
+                break;
+            case NOOP:
+                newNodeDigest = prevNodeDigest = 0;
+                break;
+            default:
+                return null;
+        }
+        long treeDigest = getCurrentTreeDigest() - prevNodeDigest + newNodeDigest;
+        return new PrecalculatedDigest(newNodeDigest, treeDigest);
+    }
+
+    private PrecalculatedDigest precalculateDigest(
+            DigestOpCode type, String path) throws KeeperException.NoNodeException {
+        return precalculateDigest(type, path, null, null);
+    }
+
+    /**
+     * Query the current tree digest from DataTree or outstandingChanges list.
+     *
+     * @return current tree digest
+     */
+    private long getCurrentTreeDigest() {
+        long digest;
+        synchronized (zks.outstandingChanges) {
+            if (zks.outstandingChanges.isEmpty()) {
+                digest = zks.getZKDatabase().getDataTree().getTreeDigest();
+                LOG.debug("Digest got from data tree is: {}", digest);
+            } else {
+                digest = zks.outstandingChanges.peekLast().precalculatedDigest.treeDigest;
+                LOG.debug("Digest got from outstandingChanges is: {}", digest);
+            }
+        }
+        return digest;
+    }
+
+    private void setTxnDigest(Request request) {
+        request.setTxnDigest(new TxnDigest(digestCalculator.getDigestVersion(), getCurrentTreeDigest()));
+    }
+
+    private void setTxnDigest(Request request, PrecalculatedDigest preCalculatedDigest) {
+        if (preCalculatedDigest == null) {
+            return;
+        }
+        request.setTxnDigest(new TxnDigest(digestCalculator.getDigestVersion(), preCalculatedDigest.treeDigest));
+    }
 }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
index 63cc30b..d0fb7da 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
@@ -29,6 +29,7 @@ import org.apache.zookeeper.metrics.Summary;
 import org.apache.zookeeper.metrics.SummarySet;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.server.util.AuthUtil;
+import org.apache.zookeeper.txn.TxnDigest;
 import org.apache.zookeeper.txn.TxnHeader;
 
 /**
@@ -105,6 +106,8 @@ public class Request {
 
     public QuorumVerifier qv = null;
 
+    private TxnDigest txnDigest;
+
     /**
      * If this is a create or close request for a local-only session.
      */
@@ -435,7 +438,6 @@ public class Request {
         logLatency(metric, key, Time.currentWallTime());
     }
 
-
     /**
      * Returns comma separated list of users authenticated in the current
      * session
@@ -462,4 +464,12 @@ public class Request {
         }
         return users.toString();
     }
+
+    public TxnDigest getTxnDigest() {
+        return txnDigest;
+    }
+
+    public void setTxnDigest(TxnDigest txnDigest) {
+        this.txnDigest = txnDigest;
+    }
 }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SnapshotFormatter.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SnapshotFormatter.java
index a276ef3..5fce206 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SnapshotFormatter.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SnapshotFormatter.java
@@ -115,6 +115,11 @@ public class SnapshotFormatter {
     private void printDetails(DataTree dataTree, Map<Long, Integer> sessions, boolean dumpData, long fileNameZxid) {
         long dtZxid = printZnodeDetails(dataTree, dumpData);
         printSessionDetails(dataTree, sessions);
+        DataTree.ZxidDigest targetZxidDigest = dataTree.getDigestFromLoadedSnapshot();
+        if (targetZxidDigest != null) {
+            System.out.println(String.format("Target zxid digest is: %s, %s",
+                    Long.toHexString(targetZxidDigest.zxid), targetZxidDigest.digest));
+        }
         System.out.println(String.format("----%nLast zxid: 0x%s", Long.toHexString(Math.max(fileNameZxid, dtZxid))));
     }
 
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogEntry.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogEntry.java
new file mode 100644
index 0000000..352eb81
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogEntry.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import org.apache.jute.Record;
+import org.apache.zookeeper.txn.TxnDigest;
+import org.apache.zookeeper.txn.TxnHeader;
+
+/**
+ * A helper class to represent the txn entry.
+ */
+public final class TxnLogEntry {
+    private final Record txn;
+    private final TxnHeader header;
+    private final TxnDigest digest;
+
+    public TxnLogEntry(Record txn, TxnHeader header, TxnDigest digest) {
+        this.txn = txn;
+        this.header = header;
+        this.digest = digest;
+    }
+
+    public Record getTxn() {
+        return txn;
+    }
+
+    public TxnHeader getHeader() {
+        return header;
+    }
+
+    public TxnDigest getDigest() {
+        return digest;
+    }
+}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogProposalIterator.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogProposalIterator.java
index 5e080e1..847e3b2 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogProposalIterator.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogProposalIterator.java
@@ -18,16 +18,13 @@
 
 package org.apache.zookeeper.server;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.Iterator;
-import org.apache.jute.BinaryOutputArchive;
-import org.apache.jute.Record;
 import org.apache.zookeeper.server.persistence.TxnLog.TxnIterator;
+import org.apache.zookeeper.server.persistence.Util;
 import org.apache.zookeeper.server.quorum.Leader;
 import org.apache.zookeeper.server.quorum.Leader.Proposal;
 import org.apache.zookeeper.server.quorum.QuorumPacket;
-import org.apache.zookeeper.txn.TxnHeader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,19 +58,11 @@ public class TxnLogProposalIterator implements Iterator<Proposal> {
     @Override
     public Proposal next() {
 
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
         Proposal p = new Proposal();
         try {
-            TxnHeader hdr = itr.getHeader();
-            Record txn = itr.getTxn();
-            hdr.serialize(boa, "hdr");
-            if (txn != null) {
-                txn.serialize(boa, "txn");
-            }
-            baos.close();
+            byte[] serializedData = Util.marshallTxnEntry(itr.getHeader(), itr.getTxn(), itr.getDigest());
 
-            QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, itr.getHeader().getZxid(), baos.toByteArray(), null);
+            QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, itr.getHeader().getZxid(), serializedData, null);
             p.packet = pp;
             p.request = null;
 
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
index c9c6d54..f758f5d 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
@@ -55,6 +55,7 @@ import org.apache.zookeeper.server.quorum.Leader.Proposal;
 import org.apache.zookeeper.server.quorum.QuorumPacket;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.server.util.SerializeUtils;
+import org.apache.zookeeper.txn.TxnDigest;
 import org.apache.zookeeper.txn.TxnHeader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -268,8 +269,8 @@ public class ZKDatabase {
     }
 
     private final PlayBackListener commitProposalPlaybackListener = new PlayBackListener() {
-        public void onTxnLoaded(TxnHeader hdr, Record txn) {
-            addCommittedProposal(hdr, txn);
+        public void onTxnLoaded(TxnHeader hdr, Record txn, TxnDigest digest) {
+            addCommittedProposal(hdr, txn, digest);
         }
     };
 
@@ -285,7 +286,8 @@ public class ZKDatabase {
         initialized = true;
         long loadTime = Time.currentElapsedTime() - startTime;
         ServerMetrics.getMetrics().DB_INIT_TIME.add(loadTime);
-        LOG.info("Snapshot loaded in {} ms", loadTime);
+        LOG.info("Snapshot loaded in {} ms, highest zxid is 0x{}, digest is {}",
+                loadTime, Long.toHexString(zxid), dataTree.getTreeDigest());
         return zxid;
     }
 
@@ -300,8 +302,9 @@ public class ZKDatabase {
         return zxid;
     }
 
-    private void addCommittedProposal(TxnHeader hdr, Record txn) {
+    private void addCommittedProposal(TxnHeader hdr, Record txn, TxnDigest digest) {
         Request r = new Request(0, hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
+        r.setTxnDigest(digest);
         addCommittedProposal(r);
     }
 
@@ -468,14 +471,15 @@ public class ZKDatabase {
     }
 
     /**
-     * the process txn on the data
+     * the process txn on the data and perform digest comparision.
      * @param hdr the txnheader for the txn
      * @param txn the transaction that needs to be processed
+     * @param digest the expected digest. A null value would skip the check
      * @return the result of processing the transaction on this
      * datatree/zkdatabase
      */
-    public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
-        return dataTree.processTxn(hdr, txn);
+    public ProcessTxnResult processTxn(TxnHeader hdr, Record txn, TxnDigest digest) {
+        return dataTree.processTxn(hdr, txn, digest);
     }
 
     /**
@@ -745,4 +749,7 @@ public class ZKDatabase {
         return snapLog.getTotalLogSize();
     }
 
+    public boolean compareDigest(TxnHeader header, Record txn, TxnDigest digest) {
+        return dataTree.compareDigest(header, txn, digest);
+    }
 }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 1c9dda7..1a2d9a7 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -79,6 +79,7 @@ import org.apache.zookeeper.server.util.JvmPauseMonitor;
 import org.apache.zookeeper.server.util.OSMXBean;
 import org.apache.zookeeper.server.util.RequestPathMetricsCollector;
 import org.apache.zookeeper.txn.CreateSessionTxn;
+import org.apache.zookeeper.txn.TxnDigest;
 import org.apache.zookeeper.txn.TxnHeader;
 import org.apache.zookeeper.util.ServiceUtils;
 import org.slf4j.Logger;
@@ -875,11 +876,24 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         return 0;
     }
 
+    static class PrecalculatedDigest {
+        final long nodeDigest;
+        final long treeDigest;
+
+        PrecalculatedDigest(long nodeDigest, long treeDigest) {
+            this.nodeDigest = nodeDigest;
+            this.treeDigest = treeDigest;
+        }
+    }
+
+
     /**
      * This structure is used to facilitate information sharing between PrepRP
      * and FinalRP.
      */
     static class ChangeRecord {
+        PrecalculatedDigest precalculatedDigest;
+        byte[] data;
 
         ChangeRecord(long zxid, String path, StatPersisted stat, int childCount, List<ACL> acl) {
             this.zxid = zxid;
@@ -904,7 +918,11 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
             if (this.stat != null) {
                 DataTree.copyStatPersisted(this.stat, stat);
             }
-            return new ChangeRecord(zxid, path, stat, childCount, acl == null ? new ArrayList<>() : new ArrayList<>(acl));
+            ChangeRecord changeRecord = new ChangeRecord(zxid, path, stat, childCount,
+                    acl == null ? new ArrayList<>() : new ArrayList<>(acl));
+            changeRecord.precalculatedDigest = precalculatedDigest;
+            changeRecord.data = data;
+            return changeRecord;
         }
 
     }
@@ -1677,7 +1695,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     // entry point for quorum/Learner.java
     public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
         processTxnForSessionEvents(null, hdr, txn);
-        return processTxnInDB(hdr, txn);
+        return processTxnInDB(hdr, txn, null);
     }
 
     // entry point for FinalRequestProcessor.java
@@ -1693,7 +1711,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
             return new ProcessTxnResult();
         }
         synchronized (outstandingChanges) {
-            ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn());
+            ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest());
 
             // request.hdr is set for write requests, which are the only ones
             // that add to outstandingChanges.
@@ -1739,11 +1757,11 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         }
     }
 
-    private ProcessTxnResult processTxnInDB(TxnHeader hdr, Record txn) {
+    private ProcessTxnResult processTxnInDB(TxnHeader hdr, Record txn, TxnDigest digest) {
         if (hdr == null) {
             return new ProcessTxnResult();
         } else {
-            return getZKDatabase().processTxn(hdr, txn);
+            return getZKDatabase().processTxn(hdr, txn, digest);
         }
     }
 
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java
index 9cc7118..fde577a 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java
@@ -78,15 +78,24 @@ public class FileSnap implements SnapShot {
             return -1L;
         }
         File snap = null;
+        long snapZxid = -1;
         boolean foundValid = false;
         for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) {
             snap = snapList.get(i);
             LOG.info("Reading snapshot {}", snap);
+            snapZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);
             try (CheckedInputStream snapIS = SnapStream.getInputStream(snap)) {
                 InputArchive ia = BinaryInputArchive.getArchive(snapIS);
                 deserialize(dt, sessions, ia);
                 SnapStream.checkSealIntegrity(snapIS, ia);
-                if (dt.deserializeZxidDigest(ia)) {
+
+                // Digest feature was added after the CRC to make it backward
+                // compatible, the older code can still read snapshots which
+                // includes digest.
+                //
+                // To check the intact, after adding digest we added another
+                // CRC check.
+                if (dt.deserializeZxidDigest(ia, snapZxid)) {
                     SnapStream.checkSealIntegrity(snapIS, ia);
                 }
 
@@ -99,7 +108,7 @@ public class FileSnap implements SnapShot {
         if (!foundValid) {
             throw new IOException("Not able to find valid snapshots in " + snapDir);
         }
-        dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);
+        dt.lastProcessedZxid = snapZxid;
         lastSnapshotInfo = new SnapshotInfo(dt.lastProcessedZxid, snap.lastModified() / 1000);
 
         // compare the digest if this is not a fuzzy snapshot, we want to compare
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnLog.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnLog.java
index 4d35e83..973e741 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnLog.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnLog.java
@@ -44,7 +44,9 @@ import org.apache.jute.OutputArchive;
 import org.apache.jute.Record;
 import org.apache.zookeeper.server.ServerMetrics;
 import org.apache.zookeeper.server.ServerStats;
+import org.apache.zookeeper.server.TxnLogEntry;
 import org.apache.zookeeper.server.util.SerializeUtils;
+import org.apache.zookeeper.txn.TxnDigest;
 import org.apache.zookeeper.txn.TxnHeader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -260,6 +262,11 @@ public class FileTxnLog implements TxnLog {
      * returns true iff something appended, otw false
      */
     public synchronized boolean append(TxnHeader hdr, Record txn) throws IOException {
+              return append(hdr, txn, null);
+    }
+
+    @Override
+    public synchronized boolean append(TxnHeader hdr, Record txn, TxnDigest digest) throws IOException {
         if (hdr == null) {
             return false;
         }
@@ -287,7 +294,7 @@ public class FileTxnLog implements TxnLog {
             streamsToFlush.add(fos);
         }
         filePadding.padFile(fos.getChannel());
-        byte[] buf = Util.marshallTxnEntry(hdr, txn);
+        byte[] buf = Util.marshallTxnEntry(hdr, txn, digest);
         if (buf == null || buf.length == 0) {
             throw new IOException("Faulty serialization for header " + "and txn");
         }
@@ -612,6 +619,7 @@ public class FileTxnLog implements TxnLog {
         long zxid;
         TxnHeader hdr;
         Record record;
+        TxnDigest digest;
         File logFile;
         InputArchive ia;
         static final String CRC_ERROR = "CRC check failed";
@@ -768,8 +776,10 @@ public class FileTxnLog implements TxnLog {
                 if (crcValue != crc.getValue()) {
                     throw new IOException(CRC_ERROR);
                 }
-                hdr = new TxnHeader();
-                record = SerializeUtils.deserializeTxn(bytes, hdr);
+                TxnLogEntry logEntry = SerializeUtils.deserializeTxn(bytes);
+                hdr = logEntry.getHeader();
+                record = logEntry.getTxn();
+                digest = logEntry.getDigest();
             } catch (EOFException e) {
                 LOG.debug("EOF exception", e);
                 inputStream.close();
@@ -808,6 +818,10 @@ public class FileTxnLog implements TxnLog {
             return record;
         }
 
+        public TxnDigest getDigest() {
+            return digest;
+        }
+
         /**
          * close the iterator
          * and release the resources.
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
index 51f2dbb..661beb2 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
@@ -38,6 +38,7 @@ import org.apache.zookeeper.server.ServerStats;
 import org.apache.zookeeper.server.ZooTrace;
 import org.apache.zookeeper.server.persistence.TxnLog.TxnIterator;
 import org.apache.zookeeper.txn.CreateSessionTxn;
+import org.apache.zookeeper.txn.TxnDigest;
 import org.apache.zookeeper.txn.TxnHeader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -85,7 +86,9 @@ public class FileTxnSnapLog {
      * restored.
      */
     public interface PlayBackListener {
-        void onTxnLoaded(TxnHeader hdr, Record rec);
+
+        void onTxnLoaded(TxnHeader hdr, Record rec, TxnDigest digest);
+
     }
 
     /**
@@ -336,6 +339,7 @@ public class FileTxnSnapLog {
                 }
                 try {
                     processTransaction(hdr, dt, sessions, itr.getTxn());
+                    dt.compareDigest(hdr, itr.getTxn(), itr.getDigest());
                     txnLoaded++;
                 } catch (KeeperException.NoNodeException e) {
                     throw new IOException("Failed to process transaction type: "
@@ -344,7 +348,7 @@ public class FileTxnSnapLog {
                                           + e.getMessage(),
                                           e);
                 }
-                listener.onTxnLoaded(hdr, itr.getTxn());
+                listener.onTxnLoaded(hdr, itr.getTxn(), itr.getDigest());
                 if (!itr.next()) {
                     break;
                 }
@@ -558,7 +562,7 @@ public class FileTxnSnapLog {
      * @throws IOException
      */
     public boolean append(Request si) throws IOException {
-        return txnLog.append(si.getHdr(), si.getTxn());
+        return txnLog.append(si.getHdr(), si.getTxn(), si.getTxnDigest());
     }
 
     /**
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/TxnLog.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/TxnLog.java
index af2a684..b557208 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/TxnLog.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/TxnLog.java
@@ -22,6 +22,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import org.apache.jute.Record;
 import org.apache.zookeeper.server.ServerStats;
+import org.apache.zookeeper.txn.TxnDigest;
 import org.apache.zookeeper.txn.TxnHeader;
 
 /**
@@ -52,6 +53,16 @@ public interface TxnLog extends Closeable {
     boolean append(TxnHeader hdr, Record r) throws IOException;
 
     /**
+     * Append a request to the transaction log with a digset
+     * @param hdr the transaction header
+     * @param r the transaction itself
+     * @param digest transaction digest
+     * returns true iff something appended, otw false
+     * @throws IOException
+     */
+    boolean append(TxnHeader hdr, Record r, TxnDigest digest) throws IOException;
+
+    /**
      * Start reading the transaction logs
      * from a given zxid
      * @param zxid
@@ -97,6 +108,11 @@ public interface TxnLog extends Closeable {
     long getTxnLogSyncElapsedTime();
 
     /**
+     * close the transactions logs
+     */
+    void close() throws IOException;
+
+    /**
      * Sets the total size of all log files
      */
     void setTotalLogSize(long size);
@@ -125,12 +141,24 @@ public interface TxnLog extends Closeable {
         Record getTxn();
 
         /**
+         * @return the digest associated with the transaction.
+         */
+        TxnDigest getDigest();
+
+        /**
          * go to the next transaction record.
          * @throws IOException
          */
         boolean next() throws IOException;
 
         /**
+         * close files and release the
+         * resources
+         * @throws IOException
+         */
+        void close() throws IOException;
+
+        /**
          * Get an estimated storage space used to store transaction records
          * that will return by this iterator
          * @throws IOException
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/TxnLogToolkit.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/TxnLogToolkit.java
index 984fb03..389caea 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/TxnLogToolkit.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/TxnLogToolkit.java
@@ -49,6 +49,7 @@ import org.apache.jute.BinaryOutputArchive;
 import org.apache.jute.Record;
 import org.apache.zookeeper.server.ExitCode;
 import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.TxnLogEntry;
 import org.apache.zookeeper.server.util.LogChopper;
 import org.apache.zookeeper.server.util.SerializeUtils;
 import org.apache.zookeeper.txn.CreateContainerTxn;
@@ -283,8 +284,9 @@ public class TxnLogToolkit implements Closeable {
     }
 
     private void printTxn(byte[] bytes, String prefix) throws IOException {
-        TxnHeader hdr = new TxnHeader();
-        Record txn = SerializeUtils.deserializeTxn(bytes, hdr);
+        TxnLogEntry logEntry = SerializeUtils.deserializeTxn(bytes);
+        TxnHeader hdr = logEntry.getHeader();
+        Record txn = logEntry.getTxn();
         String txnStr = getFormattedTxnStr(txn);
         String txns = String.format(
             "%s session 0x%s cxid 0x%s zxid 0x%s %s %s",
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/Util.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/Util.java
index 0a35293..a4d8344 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/Util.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/Util.java
@@ -34,6 +34,7 @@ import org.apache.jute.BinaryOutputArchive;
 import org.apache.jute.InputArchive;
 import org.apache.jute.OutputArchive;
 import org.apache.jute.Record;
+import org.apache.zookeeper.txn.TxnDigest;
 import org.apache.zookeeper.txn.TxnHeader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -180,6 +181,11 @@ public class Util {
      * @throws IOException
      */
     public static byte[] marshallTxnEntry(TxnHeader hdr, Record txn) throws IOException {
+        return marshallTxnEntry(hdr, txn, null);
+    }
+
+    public static byte[] marshallTxnEntry(TxnHeader hdr, Record txn, TxnDigest digest)
+            throws IOException {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         OutputArchive boa = BinaryOutputArchive.getArchive(baos);
 
@@ -187,6 +193,9 @@ public class Util {
         if (txn != null) {
             txn.serialize(boa, "txn");
         }
+        if (digest != null) {
+            digest.serialize(boa, "digest");
+        }
         return baos.toByteArray();
     }
 
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java
index 4e52187..01f9f0d 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java
@@ -345,6 +345,7 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements RequestP
                                  */
                                 topPending.setHdr(request.getHdr());
                                 topPending.setTxn(request.getTxn());
+                                topPending.setTxnDigest(request.getTxnDigest());
                                 topPending.zxid = request.zxid;
                                 topPending.commitRecvTime = request.commitRecvTime;
                                 request = topPending;
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
index 4d80d93..eb6742f 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
@@ -27,11 +27,13 @@ import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.common.Time;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.ServerMetrics;
+import org.apache.zookeeper.server.TxnLogEntry;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.server.util.SerializeUtils;
 import org.apache.zookeeper.server.util.ZxidUtils;
 import org.apache.zookeeper.txn.SetDataTxn;
+import org.apache.zookeeper.txn.TxnDigest;
 import org.apache.zookeeper.txn.TxnHeader;
 
 /**
@@ -160,8 +162,10 @@ public class Follower extends Learner {
             break;
         case Leader.PROPOSAL:
             ServerMetrics.getMetrics().LEARNER_PROPOSAL_RECEIVED_COUNT.add(1);
-            TxnHeader hdr = new TxnHeader();
-            Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
+            TxnLogEntry logEntry = SerializeUtils.deserializeTxn(qp.getData());
+            TxnHeader hdr = logEntry.getHeader();
+            Record txn = logEntry.getTxn();
+            TxnDigest digest = logEntry.getDigest();
             if (hdr.getZxid() != lastQueued + 1) {
                 LOG.warn(
                     "Got zxid 0x{} expected 0x{}",
@@ -176,7 +180,7 @@ public class Follower extends Learner {
                 self.setLastSeenQuorumVerifier(qv, true);
             }
 
-            fzk.logRequest(hdr, txn);
+            fzk.logRequest(hdr, txn, digest);
             if (hdr != null) {
                 /*
                  * Request header is created only by the leader, so this is only set
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
index 12e552f..8d371ae 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
@@ -33,6 +33,7 @@ import org.apache.zookeeper.server.ServerMetrics;
 import org.apache.zookeeper.server.SyncRequestProcessor;
 import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.txn.TxnDigest;
 import org.apache.zookeeper.txn.TxnHeader;
 import org.apache.zookeeper.util.ServiceUtils;
 import org.slf4j.Logger;
@@ -78,8 +79,9 @@ public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
 
     LinkedBlockingQueue<Request> pendingTxns = new LinkedBlockingQueue<Request>();
 
-    public void logRequest(TxnHeader hdr, Record txn) {
+    public void logRequest(TxnHeader hdr, Record txn, TxnDigest digest) {
         Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
+        request.setTxnDigest(digest);
         if ((request.zxid & 0xffffffffL) != 0) {
             pendingTxns.add(request);
         }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
index f0eca51..5371bea 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
@@ -279,7 +279,7 @@ public class Leader extends LearnerMaster {
 
     private final List<ServerSocket> serverSockets = new LinkedList<>();
 
-    Leader(QuorumPeer self, LeaderZooKeeperServer zk) throws IOException {
+    public Leader(QuorumPeer self, LeaderZooKeeperServer zk) throws IOException {
         this.self = self;
         this.proposalStats = new BufferStats();
 
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
index 27de451..8834b5f 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
@@ -53,7 +53,7 @@ public class LeaderZooKeeperServer extends QuorumZooKeeperServer {
     /**
      * @throws IOException
      */
-    LeaderZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) throws IOException {
+    public LeaderZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) throws IOException {
         super(logFactory, self.tickTime, self.minSessionTimeout, self.maxSessionTimeout, self.clientPortListenBacklog, zkDb, self);
     }
 
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
index 3d6ff1b..ad9b0bb 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
@@ -50,6 +50,7 @@ import org.apache.zookeeper.common.X509Exception;
 import org.apache.zookeeper.server.ExitCode;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.ServerCnxn;
+import org.apache.zookeeper.server.TxnLogEntry;
 import org.apache.zookeeper.server.ZooTrace;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
@@ -57,6 +58,7 @@ import org.apache.zookeeper.server.util.MessageTracker;
 import org.apache.zookeeper.server.util.SerializeUtils;
 import org.apache.zookeeper.server.util.ZxidUtils;
 import org.apache.zookeeper.txn.SetDataTxn;
+import org.apache.zookeeper.txn.TxnDigest;
 import org.apache.zookeeper.txn.TxnHeader;
 import org.apache.zookeeper.util.ServiceUtils;
 import org.slf4j.Logger;
@@ -73,6 +75,7 @@ public class Learner {
 
         TxnHeader hdr;
         Record rec;
+        TxnDigest digest;
 
     }
 
@@ -551,6 +554,7 @@ public class Learner {
             //If we are not going to take the snapshot be sure the transactions are not applied in memory
             // but written out to the transaction log
             boolean writeToTxnLog = !snapshotNeeded;
+            TxnLogEntry logEntry;
             // we are now going to start getting transactions to apply followed by an UPTODATE
             outerLoop:
             while (self.isRunning()) {
@@ -558,8 +562,10 @@ public class Learner {
                 switch (qp.getType()) {
                 case Leader.PROPOSAL:
                     PacketInFlight pif = new PacketInFlight();
-                    pif.hdr = new TxnHeader();
-                    pif.rec = SerializeUtils.deserializeTxn(qp.getData(), pif.hdr);
+                    logEntry = SerializeUtils.deserializeTxn(qp.getData());
+                    pif.hdr = logEntry.getHeader();
+                    pif.rec = logEntry.getTxn();
+                    pif.digest = logEntry.getDigest();
                     if (pif.hdr.getZxid() != lastQueued + 1) {
                         LOG.warn(
                             "Got zxid 0x{} expected 0x{}",
@@ -606,21 +612,26 @@ public class Learner {
                 case Leader.INFORM:
                 case Leader.INFORMANDACTIVATE:
                     PacketInFlight packet = new PacketInFlight();
-                    packet.hdr = new TxnHeader();
 
                     if (qp.getType() == Leader.INFORMANDACTIVATE) {
                         ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
                         long suggestedLeaderId = buffer.getLong();
                         byte[] remainingdata = new byte[buffer.remaining()];
                         buffer.get(remainingdata);
-                        packet.rec = SerializeUtils.deserializeTxn(remainingdata, packet.hdr);
+                        logEntry = SerializeUtils.deserializeTxn(remainingdata);
+                        packet.hdr = logEntry.getHeader();
+                        packet.rec = logEntry.getTxn();
+                        packet.digest = logEntry.getDigest();
                         QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) packet.rec).getData()));
                         boolean majorChange = self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
                         if (majorChange) {
                             throw new Exception("changes proposed in reconfig");
                         }
                     } else {
-                        packet.rec = SerializeUtils.deserializeTxn(qp.getData(), packet.hdr);
+                        logEntry = SerializeUtils.deserializeTxn(qp.getData());
+                        packet.rec = logEntry.getTxn();
+                        packet.hdr = logEntry.getHeader();
+                        packet.digest = logEntry.getDigest();
                         // Log warning message if txn comes out-of-order
                         if (packet.hdr.getZxid() != lastQueued + 1) {
                             LOG.warn(
@@ -697,7 +708,7 @@ public class Learner {
         if (zk instanceof FollowerZooKeeperServer) {
             FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
             for (PacketInFlight p : packetsNotCommitted) {
-                fzk.logRequest(p.hdr, p.rec);
+                fzk.logRequest(p.hdr, p.rec, p.digest);
             }
             for (Long zxid : packetsCommitted) {
                 fzk.commit(zxid);
@@ -721,6 +732,7 @@ public class Learner {
                 Request request = new Request(null, p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), null, null);
                 request.setTxn(p.rec);
                 request.setHdr(p.hdr);
+                request.setTxnDigest(p.digest);
                 ozk.commitRequest(request);
             }
         } else {
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
index e7d1d40..3bab398 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
@@ -50,9 +50,7 @@ import org.apache.zookeeper.server.quorum.Leader.Proposal;
 import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
 import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
 import org.apache.zookeeper.server.util.MessageTracker;
-import org.apache.zookeeper.server.util.SerializeUtils;
 import org.apache.zookeeper.server.util.ZxidUtils;
-import org.apache.zookeeper.txn.TxnHeader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -397,13 +395,6 @@ public class LearnerHandler extends ZooKeeperThread {
             break;
         case Leader.PROPOSAL:
             type = "PROPOSAL";
-            TxnHeader hdr = new TxnHeader();
-            try {
-                SerializeUtils.deserializeTxn(p.getData(), hdr);
-                // mess = "transaction: " + txn.toString();
-            } catch (IOException e) {
-                LOG.warn("Unexpected exception", e);
-            }
             break;
         case Leader.REQUEST:
             type = "REQUEST";
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java
index 02683e4..47c58b9 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java
@@ -24,10 +24,12 @@ import org.apache.jute.Record;
 import org.apache.zookeeper.server.ObserverBean;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.ServerMetrics;
+import org.apache.zookeeper.server.TxnLogEntry;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.server.util.SerializeUtils;
 import org.apache.zookeeper.txn.SetDataTxn;
+import org.apache.zookeeper.txn.TxnDigest;
 import org.apache.zookeeper.txn.TxnHeader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -168,6 +170,10 @@ public class Observer extends Learner {
      * @throws Exception
      */
     protected void processPacket(QuorumPacket qp) throws Exception {
+        TxnLogEntry logEntry;
+        TxnHeader hdr;
+        TxnDigest digest;
+        Record txn;
         switch (qp.getType()) {
         case Leader.PING:
             ping(qp);
@@ -189,26 +195,31 @@ public class Observer extends Learner {
             break;
         case Leader.INFORM:
             ServerMetrics.getMetrics().LEARNER_COMMIT_RECEIVED_COUNT.add(1);
-            TxnHeader hdr = new TxnHeader();
-            Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
+            logEntry = SerializeUtils.deserializeTxn(qp.getData());
+            hdr = logEntry.getHeader();
+            txn = logEntry.getTxn();
+            digest = logEntry.getDigest();
             Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, 0);
             request.logLatency(ServerMetrics.getMetrics().COMMIT_PROPAGATION_LATENCY);
+            request.setTxnDigest(digest);
             ObserverZooKeeperServer obs = (ObserverZooKeeperServer) zk;
             obs.commitRequest(request);
             break;
         case Leader.INFORMANDACTIVATE:
-            hdr = new TxnHeader();
-
             // get new designated leader from (current) leader's message
             ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
             long suggestedLeaderId = buffer.getLong();
 
             byte[] remainingdata = new byte[buffer.remaining()];
             buffer.get(remainingdata);
-            txn = SerializeUtils.deserializeTxn(remainingdata, hdr);
+            logEntry = SerializeUtils.deserializeTxn(remainingdata);
+            hdr = logEntry.getHeader();
+            txn = logEntry.getTxn();
+            digest = logEntry.getDigest();
             QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) txn).getData()));
 
             request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, 0);
+            request.setTxnDigest(digest);
             obs = (ObserverZooKeeperServer) zk;
 
             boolean majorChange = self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumBean.java
index 456bb57..aae4552 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumBean.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumBean.java
@@ -19,6 +19,7 @@
 package org.apache.zookeeper.server.quorum;
 
 import org.apache.zookeeper.jmx.ZKMBeanInfo;
+import org.apache.zookeeper.server.ZooKeeperServer;
 
 public class QuorumBean implements QuorumMXBean, ZKMBeanInfo {
 
@@ -81,4 +82,13 @@ public class QuorumBean implements QuorumMXBean, ZKMBeanInfo {
         Observer.setObserverElectionDelayMs(delayMS);
     }
 
+    @Override
+    public boolean getDigestEnabled() {
+        return ZooKeeperServer.isDigestEnabled();
+    }
+
+    @Override
+    public void disableDigest() {
+        ZooKeeperServer.setDigestEnabled(false);
+    }
 }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumMXBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumMXBean.java
index 33dac06..a7c4506 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumMXBean.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumMXBean.java
@@ -73,4 +73,7 @@ public interface QuorumMXBean {
      */
     void setObserverElectionDelayMS(long delayMS);
 
+    boolean getDigestEnabled();
+
+    void disableDigest();
 }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/AdHash.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/AdHash.java
index e74cb05..f3de65e 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/AdHash.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/AdHash.java
@@ -29,7 +29,7 @@ package org.apache.zookeeper.server.util;
 public class AdHash {
 
     /* we use 64 bits so that we can be fast an efficient */
-    private long hash;
+    private volatile long hash;
 
     /**
      * Add new digest to the hash value maintained in this class.
@@ -54,13 +54,6 @@ public class AdHash {
     }
 
     /**
-     * Return hex string of the hash value.
-     */
-    public String toHexString() {
-        return Long.toHexString(hash);
-    }
-
-    /**
      * Return the long value of the hash.
      */
     public long getHash() {
@@ -79,7 +72,10 @@ public class AdHash {
 
     @Override
     public String toString() {
-        return toHexString();
+        return Long.toHexString(hash);
     }
 
+    public void clear() {
+        hash = 0;
+    }
 }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/LogChopper.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/LogChopper.java
index e0e52a9..ccc8733 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/LogChopper.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/LogChopper.java
@@ -33,6 +33,7 @@ import org.apache.jute.BinaryOutputArchive;
 import org.apache.jute.Record;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.server.ExitCode;
+import org.apache.zookeeper.server.TxnLogEntry;
 import org.apache.zookeeper.server.persistence.FileHeader;
 import org.apache.zookeeper.server.persistence.FileTxnLog;
 import org.apache.zookeeper.txn.TxnHeader;
@@ -111,8 +112,9 @@ public class LogChopper {
             if (crcValue != crc.getValue()) {
                 throw new IOException("CRC doesn't match " + crcValue + " vs " + crc.getValue());
             }
-            TxnHeader hdr = new TxnHeader();
-            Record txn = SerializeUtils.deserializeTxn(bytes, hdr);
+            TxnLogEntry entry = SerializeUtils.deserializeTxn(bytes);
+            TxnHeader hdr = entry.getHeader();
+            Record txn = entry.getTxn();
             if (logStream.readByte("EOR") != 'B') {
                 System.out.println("Last transaction was partial.");
                 throw new EOFException("Last transaction was partial.");
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/SerializeUtils.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/SerializeUtils.java
index 2454c43..fcc5c8f 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/SerializeUtils.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/SerializeUtils.java
@@ -19,23 +19,22 @@
 package org.apache.zookeeper.server.util;
 
 import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 import org.apache.jute.BinaryInputArchive;
-import org.apache.jute.BinaryOutputArchive;
 import org.apache.jute.InputArchive;
 import org.apache.jute.OutputArchive;
 import org.apache.jute.Record;
 import org.apache.zookeeper.ZooDefs.OpCode;
-import org.apache.zookeeper.common.IOUtils;
 import org.apache.zookeeper.server.DataTree;
 import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.TxnLogEntry;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.ZooTrace;
+import org.apache.zookeeper.server.persistence.Util;
 import org.apache.zookeeper.txn.CloseSessionTxn;
 import org.apache.zookeeper.txn.CreateContainerTxn;
 import org.apache.zookeeper.txn.CreateSessionTxn;
@@ -47,6 +46,7 @@ import org.apache.zookeeper.txn.ErrorTxn;
 import org.apache.zookeeper.txn.MultiTxn;
 import org.apache.zookeeper.txn.SetACLTxn;
 import org.apache.zookeeper.txn.SetDataTxn;
+import org.apache.zookeeper.txn.TxnDigest;
 import org.apache.zookeeper.txn.TxnHeader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,7 +55,8 @@ public class SerializeUtils {
 
     private static final Logger LOG = LoggerFactory.getLogger(SerializeUtils.class);
 
-    public static Record deserializeTxn(byte[] txnBytes, TxnHeader hdr) throws IOException {
+    public static TxnLogEntry deserializeTxn(byte[] txnBytes) throws IOException {
+        TxnHeader hdr = new TxnHeader();
         final ByteArrayInputStream bais = new ByteArrayInputStream(txnBytes);
         InputArchive ia = BinaryInputArchive.getArchive(bais);
 
@@ -128,7 +129,19 @@ public class SerializeUtils {
                 }
             }
         }
-        return txn;
+        TxnDigest digest = null;
+
+        if (ZooKeeperServer.isDigestEnabled()) {
+            digest = new TxnDigest();
+            try {
+                digest.deserialize(ia, "digest");
+            } catch (EOFException exception) {
+                // may not have digest in the txn
+                digest = null;
+            }
+        }
+
+        return new TxnLogEntry(txn, hdr, digest);
     }
 
     public static void deserializeSnapshot(DataTree dt, InputArchive ia, Map<Long, Integer> sessions) throws IOException {
@@ -162,19 +175,13 @@ public class SerializeUtils {
         if (request == null || request.getHdr() == null) {
             return null;
         }
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
+        byte[] data = new byte[32];
         try {
-            request.getHdr().serialize(boa, "hdr");
-            if (request.getTxn() != null) {
-                request.getTxn().serialize(boa, "txn");
-            }
+            data = Util.marshallTxnEntry(request.getHdr(), request.getTxn(), request.getTxnDigest());
         } catch (IOException e) {
             LOG.error("This really should be impossible", e);
-        } finally {
-            IOUtils.cleanup(LOG, baos);
         }
-        return baos.toByteArray();
+        return data;
     }
 
 }
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java
index 11eb836..da9a12a 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java
@@ -21,6 +21,8 @@ package org.apache.zookeeper.server;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -37,6 +39,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.jute.BinaryInputArchive;
 import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.InputArchive;
 import org.apache.jute.Record;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
@@ -217,7 +220,7 @@ public class DataTreeTest extends ZKTestCase {
             dt.processTxn(new TxnHeader(13, 1000, 1, 30, ZooDefs.OpCode.create), new CreateTxn("/foo", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 2));
 
             // check the current digest value
-            assertEquals(dt.getTreeDigest(), dt.getLastProcessedZxidDigest().digest);
+            assertEquals(dt.getTreeDigest(), dt.getLastProcessedZxidDigest().getDigest());
         } finally {
             ZooKeeperServer.setDigestEnabled(false);
         }
@@ -481,6 +484,36 @@ public class DataTreeTest extends ZKTestCase {
     }
 
     @Test
+    public void testDeserializeZxidDigest() throws Exception {
+        try {
+            ZooKeeperServer.setDigestEnabled(true);
+            DataTree dt = new DataTree();
+            dt.processTxn(new TxnHeader(13, 1000, 1, 30, ZooDefs.OpCode.create),
+                    new CreateTxn("/foo", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1), null);
+
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            BinaryOutputArchive oa = BinaryOutputArchive.getArchive(baos);
+            dt.serializeZxidDigest(oa);
+            baos.flush();
+
+            DataTree.ZxidDigest zd = dt.getLastProcessedZxidDigest();
+            assertNotNull(zd);
+
+            // deserialize data tree
+            InputArchive ia = BinaryInputArchive.getArchive(
+                    new ByteArrayInputStream(baos.toByteArray()));
+            dt.deserializeZxidDigest(ia, zd.getZxid());
+            assertNotNull(dt.getDigestFromLoadedSnapshot());
+
+            ia = BinaryInputArchive.getArchive(new ByteArrayInputStream(baos.toByteArray()));
+            dt.deserializeZxidDigest(ia, zd.getZxid() + 1);
+            assertNull(dt.getDigestFromLoadedSnapshot());
+        } finally {
+            ZooKeeperServer.setDigestEnabled(false);
+        }
+    }
+
+    @Test
     public void testDataTreeMetrics() throws Exception {
         ServerMetrics.getMetrics().resetAll();
 
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorMetricsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorMetricsTest.java
index 7aa74d3..77dc6c7 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorMetricsTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorMetricsTest.java
@@ -76,6 +76,9 @@ public class PrepRequestProcessorMetricsTest extends ZKTestCase {
         DataNode node = new DataNode(new byte[1], null, mock(StatPersisted.class));
         when(db.getNode(anyString())).thenReturn(node);
 
+        DataTree dataTree = mock(DataTree.class);
+        when(db.getDataTree()).thenReturn(dataTree);
+
         Set<String> ephemerals = new HashSet<>();
         ephemerals.add("/crystalmountain");
         ephemerals.add("/stevenspass");
@@ -158,7 +161,9 @@ public class PrepRequestProcessorMetricsTest extends ZKTestCase {
         assertEquals(1L, values.get("cnt_close_session_prep_time"));
         assertThat((long) values.get("max_close_session_prep_time"), greaterThanOrEqualTo(0L));
 
-        assertEquals(5L, values.get("outstanding_changes_queued"));
+        // With digest feature, we have two more OUTSTANDING_CHANGES_QUEUED than w/o digest
+        // The expected should 5 in open source until we upstream the digest feature
+        assertEquals(7L, values.get("outstanding_changes_queued"));
     }
 
     private class SimpleWatcher implements Watcher {
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java
index 264601d..48e5890 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
@@ -36,6 +38,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.apache.jute.BinaryOutputArchive;
 import org.apache.jute.Record;
+import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.SessionExpiredException;
 import org.apache.zookeeper.KeeperException.SessionMovedException;
@@ -45,9 +48,18 @@ import org.apache.zookeeper.PortAssignment;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.proto.CreateRequest;
+import org.apache.zookeeper.proto.ReconfigRequest;
 import org.apache.zookeeper.proto.RequestHeader;
 import org.apache.zookeeper.proto.SetDataRequest;
 import org.apache.zookeeper.server.ZooKeeperServer.ChangeRecord;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.quorum.Leader;
+import org.apache.zookeeper.server.quorum.LeaderBeanTest;
+import org.apache.zookeeper.server.quorum.LeaderZooKeeperServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.test.ClientBase;
 import org.apache.zookeeper.txn.ErrorTxn;
 import org.junit.After;
@@ -56,6 +68,7 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 public class PrepRequestProcessorTest extends ClientBase {
 
     private static final Logger LOG = LoggerFactory.getLogger(PrepRequestProcessorTest.class);
@@ -108,13 +121,21 @@ public class PrepRequestProcessorTest extends ClientBase {
     }
 
     private Request createRequest(Record record, int opCode, long sessionId) throws IOException {
+        return createRequest(record, opCode, sessionId, false);
+    }
+
+    private Request createRequest(Record record, int opCode, boolean admin) throws IOException {
+        return createRequest(record, opCode, 1L, admin);
+    }
+
+    private Request createRequest(Record record, int opCode, long sessionId, boolean admin) throws IOException {
         // encoding
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
         record.serialize(boa, "request");
         baos.close();
         // Id
-        List<Id> ids = Arrays.asList(Ids.ANYONE_ID_UNSAFE);
+        List<Id> ids = Arrays.asList(admin ? new Id("super", "super user") : Ids.ANYONE_ID_UNSAFE);
         return new Request(null, sessionId, 0, opCode, ByteBuffer.wrap(baos.toByteArray()), ids);
     }
 
@@ -123,7 +144,7 @@ public class PrepRequestProcessorTest extends ClientBase {
         processor = new PrepRequestProcessor(zks, new MyRequestProcessor());
 
         Record record = new MultiOperationRecord(ops);
-        Request req = createRequest(record, OpCode.multi);
+        Request req = createRequest(record, OpCode.multi, false);
 
         processor.pRequest(req);
         assertTrue("request hasn't been processed in chain", pLatch.await(5, TimeUnit.SECONDS));
@@ -156,6 +177,43 @@ public class PrepRequestProcessorTest extends ClientBase {
         assertEquals("Record zxid wasn't set correctly", 2, cr.zxid);
     }
 
+    @Test
+    public void testReconfigWithAnotherOutstandingChange() throws Exception {
+        QuorumPeer qp = new QuorumPeer();
+        QuorumVerifier quorumVerifierMock = mock(QuorumVerifier.class);
+        when(quorumVerifierMock.getAllMembers()).thenReturn(LeaderBeanTest.getMockedPeerViews(qp.getId()));
+
+        qp.setQuorumVerifier(quorumVerifierMock, false);
+        FileTxnSnapLog snapLog = new FileTxnSnapLog(tmpDir, tmpDir);
+        LeaderZooKeeperServer lzks = new LeaderZooKeeperServer(snapLog, qp, new ZKDatabase(snapLog));
+        qp.leader = new Leader(qp, lzks);
+        lzks.sessionTracker = new MySessionTracker();
+        ZooKeeperServer.setDigestEnabled(true);
+        processor = new PrepRequestProcessor(lzks, new MyRequestProcessor());
+
+        Record record = new CreateRequest("/foo", "data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT.toFlag());
+        pLatch = new CountDownLatch(1);
+        processor.pRequest(createRequest(record, OpCode.create, false));
+        assertTrue("request hasn't been processed in chain", pLatch.await(5, TimeUnit.SECONDS));
+
+        boolean isReconfigEnabledPreviously = QuorumPeerConfig.isReconfigEnabled();
+        boolean isStandaloneEnabledPreviously = QuorumPeerConfig.isStandaloneEnabled();
+        QuorumPeerConfig.setReconfigEnabled(true);
+        QuorumPeerConfig.setStandaloneEnabled(false);
+        try {
+            String newMember = "server.0=localhost:" + PortAssignment.unique()  + ":" + PortAssignment.unique() + ":participant";
+            record = new ReconfigRequest(null, null, newMember, 0);
+            pLatch = new CountDownLatch(1);
+            processor.pRequest(createRequest(record, OpCode.reconfig, true));
+            assertTrue("request hasn't been processed in chain", pLatch.await(5, TimeUnit.SECONDS));
+            assertEquals(outcome.getHdr().getType(), OpCode.reconfig);   // Verifies that there was no error.
+        } finally {
+            // reset the reconfig option
+            QuorumPeerConfig.setReconfigEnabled(isReconfigEnabledPreviously);
+            QuorumPeerConfig.setStandaloneEnabled(isStandaloneEnabledPreviously);
+        }
+    }
+
     /**
      * ZOOKEEPER-2052:
      * This test checks that if a multi operation aborted, and during the multi there is side effect
@@ -234,7 +292,7 @@ public class PrepRequestProcessorTest extends ClientBase {
         processor = new PrepRequestProcessor(zks, new MyRequestProcessor());
 
         SetDataRequest record = new SetDataRequest("", new byte[0], -1);
-        Request req = createRequest(record, OpCode.setData);
+        Request req = createRequest(record, OpCode.setData, false);
         processor.pRequest(req);
         pLatch.await();
         assertEquals(outcome.getHdr().getType(), OpCode.error);
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/TxnLogDigestTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/TxnLogDigestTest.java
new file mode 100644
index 0000000..ae914af
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/TxnLogDigestTest.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertThat;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import mockit.Invocation;
+import mockit.Mock;
+import mockit.MockUp;
+import org.apache.jute.Record;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.server.metric.SimpleCounter;
+import org.apache.zookeeper.server.persistence.FileTxnLog;
+import org.apache.zookeeper.server.persistence.TxnLog.TxnIterator;
+import org.apache.zookeeper.server.quorum.QuorumPeerMainTest;
+import org.apache.zookeeper.test.ClientBase;
+import org.apache.zookeeper.txn.TxnDigest;
+import org.apache.zookeeper.txn.TxnHeader;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TxnLogDigestTest extends ClientBase {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(TxnLogDigestTest.class);
+
+    private ZooKeeper zk;
+    private ZooKeeperServer server;
+
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        server = serverFactory.getZooKeeperServer();
+        zk = createClient();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        // server will be closed in super.tearDown
+        super.tearDown();
+
+        if (zk != null) {
+            zk.close();
+        }
+        MockedFileTxnLog.reset();
+    }
+
+    @Override
+    public void setupCustomizedEnv() {
+        ZooKeeperServer.setDigestEnabled(true);
+    }
+
+    @Override
+    public void cleanUpCustomizedEnv() {
+        ZooKeeperServer.setDigestEnabled(false);
+    }
+
+    @BeforeClass
+    public static void applyMockUps() {
+        new MockedFileTxnLog();
+    }
+
+    /**
+     * Check that the digest stored in the txn matches the digest calculated
+     * from DataTree.
+     */
+    @Test
+    public void digestFromTxnLogsMatchesTree() throws Exception {
+        // reset the mismatch metrics
+        SimpleCounter digestMistachesCount = (SimpleCounter) ServerMetrics.getMetrics().DIGEST_MISMATCHES_COUNT;
+        digestMistachesCount.reset();
+
+        // trigger some write ops
+        performOperations(createClient(), "/digestFromTxnLogsMatchesTree");
+
+        // make sure there is no digest mismatch
+        Assert.assertEquals(0, digestMistachesCount.get());
+
+        // verify that the digest is wrote to disk with txn
+        TxnDigest lastDigest = getLastTxnLogDigest();
+        Assert.assertNotNull(lastDigest);
+        Assert.assertEquals(server.getZKDatabase().getDataTree().getTreeDigest(),
+                lastDigest.getTreeDigest());
+    }
+
+    /**
+     * Test the compatible when enable/disable digest:
+     *
+     * * check that txns which were written with digest can be read when
+     *   digest is disabled
+     * * check that txns which were written without digest can be read
+     *   when digest is enabled.
+     */
+    @Test
+    public void checkTxnCompatibleWithAndWithoutDigest() throws Exception {
+        // 1. start server with digest disabled
+        restartServerWithDigestFlag(false);
+
+        // trigger some write ops
+        Map<String, String> expectedNodes = performOperations(createClient(), "/p1");
+
+        // reset the mismatch metrics
+        SimpleCounter digestMistachesCount = (SimpleCounter) ServerMetrics.getMetrics().DIGEST_MISMATCHES_COUNT;
+        digestMistachesCount.reset();
+
+        // 2. restart server with digest enabled
+        restartServerWithDigestFlag(true);
+
+        // make sure the data wrote when digest was disabled can be
+        // successfully read
+        checkNodes(expectedNodes);
+
+        Map<String, String> expectedNodes1 = performOperations(createClient(), "/p2");
+
+        // make sure there is no digest mismatch
+        Assert.assertEquals(0, digestMistachesCount.get());
+
+        // 3. disable the digest again and make sure everything is fine
+        restartServerWithDigestFlag(false);
+
+        checkNodes(expectedNodes);
+        checkNodes(expectedNodes1);
+    }
+
+    /**
+     * Simulate the scenario where txn is missing, and make sure the
+     * digest code can catch this issue.
+     */
+    @Test
+    public void testTxnMissing() throws Exception {
+        // updated MockedFileTxnLog to skip append txn on specific txn
+        MockedFileTxnLog.skipAppendZxid = 3;
+
+        // trigger some write operations
+        performOperations(createClient(), "/testTxnMissing");
+
+        // restart server to load the corrupted txn file
+        SimpleCounter digestMistachesCount = (SimpleCounter) ServerMetrics.getMetrics().DIGEST_MISMATCHES_COUNT;
+        digestMistachesCount.reset();
+
+        restartServerWithDigestFlag(true);
+
+        // check that digest mismatch is reported
+        assertThat("mismtach should be reported", digestMistachesCount.get(), greaterThan(0L));
+
+        // restart server with digest disabled
+        digestMistachesCount.reset();
+        restartServerWithDigestFlag(false);
+
+        // check that no digest mismatch is reported
+        Assert.assertEquals(0, digestMistachesCount.get());
+    }
+
+    private void restartServerWithDigestFlag(boolean digestEnabled)
+            throws Exception {
+        stopServer();
+        QuorumPeerMainTest.waitForOne(zk, States.CONNECTING);
+
+        ZooKeeperServer.setDigestEnabled(digestEnabled);
+
+        startServer();
+        QuorumPeerMainTest.waitForOne(zk, States.CONNECTED);
+    }
+
+    private TxnDigest getLastTxnLogDigest() throws IOException {
+        TxnIterator itr = new FileTxnLog(new File(tmpDir, "version-2")).read(1);
+        TxnDigest lastDigest = null;
+        while (itr.next()) {
+            lastDigest = itr.getDigest();
+        }
+        return lastDigest;
+    }
+
+    public static void create(ZooKeeper client, String path, CreateMode mode)
+              throws Exception {
+         client.create(path, path.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, mode);
+    }
+
+    /**
+     * Helper method to trigger various write ops inside ZK.
+     */
+    public static Map<String, String> performOperations(
+            ZooKeeper client, String prefix) throws Exception {
+        Map<String, String> nodes = new HashMap<>();
+
+        String path = prefix;
+        create(client, path, CreateMode.PERSISTENT);
+        nodes.put(path, path);
+
+        path = prefix + "/child1";
+        create(client, path, CreateMode.PERSISTENT);
+        nodes.put(path, path);
+
+        path = prefix + "/child2";
+        create(client, path, CreateMode.PERSISTENT);
+        client.delete(prefix + "/child2", -1);
+
+        path = prefix + "/child1/leaf";
+        create(client, path, CreateMode.PERSISTENT);
+        String updatedData = "updated data";
+        client.setData(path, updatedData.getBytes(), -1);
+        nodes.put(path, updatedData);
+
+        List<Op> subTxns = new ArrayList<Op>();
+        for (int i = 0; i < 3; i++) {
+            path = prefix + "/m" + i;
+            subTxns.add(Op.create(path, path.getBytes(),
+                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
+            nodes.put(path, path);
+        }
+        client.multi(subTxns);
+        client.close();
+
+        return nodes;
+    }
+
+    private void checkNodes(Map<String, String> expectedNodes) throws Exception {
+        ZooKeeper client = createClient();
+        try {
+            for (Map.Entry<String, String> entry: expectedNodes.entrySet()) {
+                Assert.assertEquals(entry.getValue(),
+                        new String(client.getData(entry.getKey(), false, null)));
+            }
+        } finally {
+            client.close();
+        }
+    }
+
+    public static final class MockedFileTxnLog extends MockUp<FileTxnLog> {
+        static long skipAppendZxid = -1;
+
+        @Mock
+        public synchronized boolean append(Invocation invocation, TxnHeader hdr,
+                Record txn, TxnDigest digest) throws IOException {
+            if (hdr != null && hdr.getZxid() == skipAppendZxid) {
+                LOG.info("skipping txn {}", skipAppendZxid);
+                return true;
+            }
+            return invocation.proceed(hdr, txn, digest);
+        }
+
+        public static void reset() {
+            skipAppendZxid = -1;
+        }
+    };
+}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/persistence/FileTxnSnapLogTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/persistence/FileTxnSnapLogTest.java
index 1412a93..34d57f4 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/persistence/FileTxnSnapLogTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/persistence/FileTxnSnapLogTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import java.io.File;
@@ -39,10 +40,12 @@ import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.server.DataNode;
 import org.apache.zookeeper.server.DataTree;
 import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.test.ClientBase;
 import org.apache.zookeeper.test.TestUtils;
 import org.apache.zookeeper.txn.CreateTxn;
 import org.apache.zookeeper.txn.SetDataTxn;
+import org.apache.zookeeper.txn.TxnDigest;
 import org.apache.zookeeper.txn.TxnHeader;
 import org.junit.After;
 import org.junit.Before;
@@ -209,7 +212,7 @@ public class FileTxnSnapLogTest {
 
         long zxid = fileTxnSnapLog.restore(new DataTree(), sessions, new FileTxnSnapLog.PlayBackListener() {
             @Override
-            public void onTxnLoaded(TxnHeader hdr, Record rec) {
+            public void onTxnLoaded(TxnHeader hdr, Record rec, TxnDigest digest) {
                 // empty by default
             }
         });
@@ -353,4 +356,45 @@ public class FileTxnSnapLogTest {
         assertEquals(ZooDefs.Ids.CREATOR_ALL_ACL, followerDataTree.getACL(a1));
     }
 
+    @Test
+    public void testEmptySnapshotSerialization() throws IOException {
+        File dataDir = ClientBase.createEmptyTestDir();
+        FileTxnSnapLog snaplog = new FileTxnSnapLog(dataDir, dataDir);
+        DataTree dataTree = new DataTree();
+        ConcurrentHashMap<Long, Integer> sessions = new ConcurrentHashMap<>();
+
+        ZooKeeperServer.setDigestEnabled(true);
+        snaplog.save(dataTree, sessions, true);
+        snaplog.restore(dataTree, sessions, (hdr, rec, digest) -> {  });
+
+        assertNull(dataTree.getDigestFromLoadedSnapshot());
+    }
+
+    @Test
+    public void testSnapshotSerializationCompatibility() throws IOException {
+        testSnapshotSerializationCompatibility(true, false);
+        testSnapshotSerializationCompatibility(false, false);
+        testSnapshotSerializationCompatibility(true, true);
+        testSnapshotSerializationCompatibility(false, true);
+    }
+
+    void testSnapshotSerializationCompatibility(Boolean digestEnabled, Boolean snappyEnabled) throws IOException {
+        File dataDir = ClientBase.createEmptyTestDir();
+        FileTxnSnapLog snaplog = new FileTxnSnapLog(dataDir, dataDir);
+        DataTree dataTree = new DataTree();
+        ConcurrentHashMap<Long, Integer> sessions = new ConcurrentHashMap<>();
+        SnapStream.setStreamMode(snappyEnabled ? SnapStream.StreamMode.SNAPPY : SnapStream.StreamMode.DEFAULT_MODE);
+
+        ZooKeeperServer.setDigestEnabled(digestEnabled);
+        TxnHeader txnHeader = new TxnHeader(1, 1, 1, 1 + 1, ZooDefs.OpCode.create);
+        CreateTxn txn = new CreateTxn("/" + 1, "data".getBytes(), null, false, 1);
+        Request request = new Request(1, 1, 1, txnHeader, txn, 1);
+        dataTree.processTxn(request.getHdr(), request.getTxn());
+        snaplog.save(dataTree, sessions, true);
+
+        int expectedNodeCount = dataTree.getNodeCount();
+        ZooKeeperServer.setDigestEnabled(!digestEnabled);
+        snaplog.restore(dataTree, sessions, (hdr, rec, digest) -> {  });
+        assertEquals(expectedNodeCount, dataTree.getNodeCount());
+    }
 }
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java
index 21a9d27..fd3374d 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java
@@ -19,27 +19,34 @@
 package org.apache.zookeeper.server.quorum;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.security.sasl.SaslException;
 import org.apache.jute.OutputArchive;
+import org.apache.zookeeper.AsyncCallback.MultiCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
 import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
 import org.apache.zookeeper.PortAssignment;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooKeeper.States;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.metrics.MetricsUtils;
 import org.apache.zookeeper.server.DataNode;
 import org.apache.zookeeper.server.DataTree;
+import org.apache.zookeeper.server.ServerMetrics;
 import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
@@ -65,6 +72,8 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase {
 
     @Before
     public void setup() throws Exception {
+        ZooKeeperServer.setDigestEnabled(true);
+
         LOG.info("Start up a 3 server quorum");
         final int ENSEMBLE_SERVERS = 3;
         clientPorts = new int[ENSEMBLE_SERVERS];
@@ -108,6 +117,8 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase {
 
     @After
     public void tearDown() throws Exception {
+        ZooKeeperServer.setDigestEnabled(false);
+
         if (mt != null) {
             for (MainThread t : mt) {
                 t.shutdown();
@@ -233,6 +244,82 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase {
         compareStat(parent, leaderId, followerA);
     }
 
+    @Test
+    public void testMultiOpDigestConsistentDuringSnapshot() throws Exception {
+        ServerMetrics.getMetrics().resetAll();
+
+        LOG.info("Create some txns");
+        final String path = "/testMultiOpDigestConsistentDuringSnapshot";
+        createEmptyNode(zk[followerA], path, CreateMode.PERSISTENT);
+
+        CustomDataTree dt =
+                (CustomDataTree) mt[followerA].main.quorumPeer.getZkDb().getDataTree();
+        final CountDownLatch setDataLatch = new CountDownLatch(1);
+        final CountDownLatch continueSetDataLatch = new CountDownLatch(1);
+        final ZooKeeper followerZk = zk[followerA];
+        dt.setDigestSerializeListener(new DigestSerializeListener() {
+            @Override
+            public void process() {
+                LOG.info("Trigger a multi op in async");
+                followerZk.multi(Arrays.asList(
+                        Op.create("/multi0", "/multi0".getBytes(),
+                                Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
+                        Op.setData(path, "new data".getBytes(), -1)
+                ), new MultiCallback() {
+                    @Override
+                    public void processResult(int rc, String path, Object ctx,
+                            List<OpResult> opResults) {}
+                }, null);
+
+                LOG.info("Wait for the signal to continue");
+                try {
+                    setDataLatch.await(3, TimeUnit.SECONDS);
+                } catch (Exception e) {
+                    LOG.error("Error while waiting for set data txn, {}", e);
+                }
+            }
+
+            @Override
+            public void finished() {
+                LOG.info("Finished writing digest out, continue");
+                continueSetDataLatch.countDown();
+            }
+        });
+
+        dt.setDataListener(new SetDataTxnListener() {
+            @Override
+            public void process() {
+                setDataLatch.countDown();
+                try {
+                    continueSetDataLatch.await(3, TimeUnit.SECONDS);
+                } catch (Exception e) {
+                    LOG.error("Error while waiting for continue signal, {}", e);
+                }
+            }
+        });
+
+        LOG.info("Trigger a snapshot");
+        ZooKeeperServer zkServer = mt[followerA].main.quorumPeer.getActiveServer();
+        zkServer.takeSnapshot(true);
+        checkNoMismatchReported();
+
+        LOG.info("Restart the server to load the snapshot again");
+        mt[followerA].shutdown();
+        QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTING);
+        mt[followerA].start();
+        QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTED);
+
+        LOG.info("Make sure there is nothing caught in the digest mismatch");
+        checkNoMismatchReported();
+
+    }
+
+    private void checkNoMismatchReported() {
+        long mismatch = (long) MetricsUtils.currentServerMetrics().get("digest_mismatches_count");
+
+        assertFalse("The mismatch count should be zero but is: " + mismatch, mismatch > 0);
+    }
+
     private void addSerializeListener(int sid, String parent, String child) {
         final ZooKeeper zkClient = zk[sid];
         CustomDataTree dt = (CustomDataTree) mt[sid].main.quorumPeer.getZkDb().getDataTree();
@@ -323,10 +410,22 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase {
 
     }
 
+    interface DigestSerializeListener {
+        void process();
+
+        void finished();
+    }
+
+    interface SetDataTxnListener {
+        void process();
+    }
+
     static class CustomDataTree extends DataTree {
 
         Map<String, NodeCreateListener> nodeCreateListeners = new HashMap<String, NodeCreateListener>();
         Map<String, NodeSerializeListener> listeners = new HashMap<String, NodeSerializeListener>();
+        DigestSerializeListener digestListener;
+        SetDataTxnListener setListener;
 
         @Override
         public void serializeNodeData(OutputArchive oa, String path, DataNode node) throws IOException {
@@ -362,6 +461,34 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase {
             nodeCreateListeners.put(path, listener);
         }
 
+        public void setDigestSerializeListener(DigestSerializeListener listener) {
+            this.digestListener = listener;
+        }
+
+        public void setDataListener(SetDataTxnListener listener) {
+            this.setListener = listener;
+        }
+
+        @Override
+        public boolean serializeZxidDigest(OutputArchive oa) throws IOException {
+            if (digestListener != null) {
+                digestListener.process();
+            }
+            boolean result = super.serializeZxidDigest(oa);
+            if (digestListener != null) {
+                digestListener.finished();
+            }
+            return result;
+        }
+
+        public Stat setData(String path, byte data[], int version, long zxid,
+                long time) throws NoNodeException {
+            if (setListener != null) {
+                setListener.process();
+            }
+
+            return super.setData(path, data, version, zxid, time);
+        }
     }
 
     interface NodeSerializeListener {
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderBeanTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderBeanTest.java
index 7ce583d..0c6938b 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderBeanTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderBeanTest.java
@@ -60,19 +60,23 @@ public class LeaderBeanTest {
     private QuorumPeer qp;
     private QuorumVerifier quorumVerifierMock;
 
-    @Before
-    public void setUp() throws IOException, X509Exception {
-        qp = new QuorumPeer();
-        long myId = qp.getId();
-
+    public static Map<Long, QuorumServer> getMockedPeerViews(long myId) {
         int clientPort = PortAssignment.unique();
         Map<Long, QuorumServer> peersView = new HashMap<Long, QuorumServer>();
         InetAddress clientIP = InetAddress.getLoopbackAddress();
 
-        peersView.put(Long.valueOf(myId), new QuorumServer(myId, new InetSocketAddress(clientIP, PortAssignment.unique()), new InetSocketAddress(clientIP, PortAssignment.unique()), new InetSocketAddress(clientIP, clientPort), LearnerType.PARTICIPANT));
+        peersView.put(Long.valueOf(myId),
+                new QuorumServer(myId, new InetSocketAddress(clientIP, PortAssignment.unique()),
+                        new InetSocketAddress(clientIP, PortAssignment.unique()),
+                        new InetSocketAddress(clientIP, clientPort), LearnerType.PARTICIPANT));
+        return peersView;
+    }
 
+    @Before
+    public void setUp() throws IOException, X509Exception {
+        qp = new QuorumPeer();
         quorumVerifierMock = mock(QuorumVerifier.class);
-        when(quorumVerifierMock.getAllMembers()).thenReturn(peersView);
+        when(quorumVerifierMock.getAllMembers()).thenReturn(getMockedPeerViews(qp.getId()));
 
         qp.setQuorumVerifier(quorumVerifierMock, false);
         File tmpDir = ClientBase.createEmptyTestDir();
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumDigestTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumDigestTest.java
new file mode 100644
index 0000000..691b455
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumDigestTest.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import mockit.Invocation;
+import mockit.Mock;
+import mockit.MockUp;
+import org.apache.jute.Record;
+import org.apache.zookeeper.AsyncCallback.StringCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.DataTree;
+import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
+import org.apache.zookeeper.server.ServerMetrics;
+import org.apache.zookeeper.server.TxnLogDigestTest;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.metric.SimpleCounter;
+import org.apache.zookeeper.txn.TxnDigest;
+import org.apache.zookeeper.txn.TxnHeader;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class QuorumDigestTest extends QuorumPeerTestBase {
+
+    private static final Logger LOG =
+          LoggerFactory.getLogger(QuorumDigestTest.class);
+
+    private Servers servers;
+    private String forceSnapSyncValue;
+
+    @BeforeClass
+    public static void applyMockUps() {
+        new DataTreeMock();
+    }
+
+    @Before
+    public void setup() throws Exception {
+        forceSnapSyncValue = System.getProperty(LearnerHandler.FORCE_SNAP_SYNC);
+        ZooKeeperServer.setDigestEnabled(true);
+        ((SimpleCounter) ServerMetrics.getMetrics().DIGEST_MISMATCHES_COUNT).reset();
+        servers = LaunchServers(3, 1, null);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (servers != null) {
+            servers.shutDownAllServers();
+        }
+        ZooKeeperServer.setDigestEnabled(false);
+        System.clearProperty(LearnerHandler.FORCE_SNAP_SYNC);
+        DataTreeMock.reset();
+    }
+
+    /**
+     * Check positive case without digest mismatch during diff sync.
+     */
+    @Test
+    public void testDigestMatchesDuringDiffSync() throws Exception {
+        triggerSync(false);
+    }
+
+    /**
+     * Check positive case without digest mismatch during snap sync.
+     */
+    @Test
+    public void testDigestMatchesDuringSnapSync() throws Exception {
+        triggerSync(true);
+
+        // have some extra txns
+        int leader = servers.findLeader();
+        TxnLogDigestTest.performOperations(servers.zk[leader],
+                "/testDigestMatchesDuringSnapSync");
+        Assert.assertEquals(0L, getMismatchDigestCount());
+    }
+
+    @Test
+    public void testDigestMatchesWithAsyncRequests() throws Exception {
+
+        int leader = servers.findLeader();
+
+        final ZooKeeper client = servers.zk[leader];
+        final AtomicBoolean stopped = new AtomicBoolean(true);
+        final String prefix = "/testDigestMatchesWithAsyncRequests";
+
+        // start a thread to send requests asynchronously,
+        Thread createTrafficThread = new Thread () {
+            @Override
+            public void run() {
+                int i = 0;
+                while (!stopped.get()) {
+                    String path = prefix + "-" + i;
+                    client.create(path, path.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                            CreateMode.PERSISTENT, new StringCallback() {
+                        @Override
+                        public void processResult(int rc, String path,
+                                Object ctx, String name) {
+                            // ignore the result
+                        }
+                    }, null);
+                    try {
+                        Thread.sleep(10);
+                    } catch (InterruptedException e) { /* ignore */ }
+                }
+            }
+        };
+        createTrafficThread.start();
+
+        // shutdown a follower and observer
+        List<Integer> targets = Arrays.asList(
+                servers.findAnyFollower(), servers.findAnyObserver());
+        stopServers(targets);
+
+        // start the follower and observer to have a diff sync
+        startServers(targets);
+
+        // make sure there is no digest mismatch
+        Assert.assertEquals(0L, getMismatchDigestCount());
+
+        // stop the leader
+        targets = Arrays.asList(leader);
+        stopServers(targets);
+        startServers(targets);
+
+        // make sure there is no digest mismatch
+        Assert.assertEquals(0L, getMismatchDigestCount());
+
+        stopped.set(true);
+    }
+
+    /**
+     * Check negative case by injecting txn miss during syncing.
+     */
+    @Test
+    public void testDigestMismatchesWhenTxnLost() throws Exception {
+        // make sure there is no mismatch after all servers start up
+        Assert.assertEquals(0L, getMismatchDigestCount());
+
+        // shutdown a follower and observer
+        List<Integer> targets = Arrays.asList(
+                servers.findAnyFollower(), servers.findAnyObserver());
+        stopServers(targets);
+
+        int leader = servers.findLeader();
+        triggerOps(leader, "/p1");
+
+        Assert.assertEquals(0L, getMismatchDigestCount());
+
+        DataTreeMock.skipTxnZxid = "100000006";
+
+        // start the follower and observer to have a diff sync
+        startServers(targets);
+
+        long misMatchCount = getMismatchDigestCount();
+        Assert.assertNotEquals(0L, misMatchCount);
+
+        triggerOps(leader, "/p2");
+        Assert.assertNotEquals(misMatchCount, getMismatchDigestCount());
+    }
+
+    private void stopServers(List<Integer> sids) throws InterruptedException {
+        for (int sid : sids) {
+            if (sid != -1) {
+                servers.mt[sid].shutdown();
+                waitForOne(servers.zk[sid], States.CONNECTING);
+            }
+        }
+    }
+
+    private void startServers(List<Integer> sids) throws InterruptedException {
+        for (int sid : sids) {
+            servers.mt[sid].start();
+            waitForOne(servers.zk[sid], States.CONNECTED);
+        }
+    }
+
+    private void triggerOps(int sid, String prefix) throws Exception {
+        TxnLogDigestTest.performOperations(servers.zk[sid], prefix);
+        servers.restartClient(sid, null);
+        waitForOne(servers.zk[sid], States.CONNECTED);
+    }
+
+    private void triggerSync(boolean snapSync) throws Exception {
+        if (snapSync) {
+            System.setProperty(LearnerHandler.FORCE_SNAP_SYNC, "true");
+        }
+
+        // make sure there is no mismatch after all servers start up
+        Assert.assertEquals(0L, getMismatchDigestCount());
+
+        int leader = servers.findLeader();
+        triggerOps(leader, "/p1");
+
+        Assert.assertEquals(0L, getMismatchDigestCount());
+
+        // shutdown a follower and observer
+        List<Integer> targets = Arrays.asList(
+                servers.findAnyFollower(), servers.findAnyObserver());
+        stopServers(targets);
+
+        // do some extra writes
+        triggerOps(leader, "/p2");
+
+        // start the follower and observer to have a diff sync
+        startServers(targets);
+
+        Assert.assertEquals(0L, getMismatchDigestCount());
+    }
+
+    public static long getMismatchDigestCount() {
+        return ((SimpleCounter) ServerMetrics.getMetrics().DIGEST_MISMATCHES_COUNT).get();
+    }
+
+    public static final class DataTreeMock extends MockUp<DataTree> {
+
+        static String skipTxnZxid = "";
+
+        @Mock
+        public ProcessTxnResult processTxn(Invocation invocation,
+                TxnHeader header, Record txn, TxnDigest digest) {
+            if (header != null && Long.toHexString(header.getZxid()).equals(skipTxnZxid)) {
+                LOG.info("skip process txn {}", header.getZxid());
+                ProcessTxnResult rc = new ProcessTxnResult();
+                rc.path = "";
+                rc.stat = new Stat();
+                rc.multiResult = new ArrayList<ProcessTxnResult>();
+                return rc;
+            }
+            return invocation.proceed(header, txn, digest);
+        }
+
+        public static void reset() {
+            skipTxnZxid = "";
+        }
+    }
+}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
index d2e5822..aa75218 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
@@ -438,34 +438,65 @@ public class QuorumPeerTestBase extends ZKTestCase implements Watcher {
         public int findLeader() {
             for (int i = 0; i < mt.length; i++) {
                 if (mt[i].main.quorumPeer.leader != null) {
+                    LOG.info("Leader is {}", i);
                     return i;
                 }
             }
+            LOG.info("Cannot find Leader");
             return -1;
         }
 
+        public int findAnyFollower() {
+            for (int i = 0; i < mt.length; i++) {
+                if (mt[i].main.quorumPeer.follower != null) {
+                    LOG.info("Follower is {}", i);
+                    return i;
+                }
+            }
+            LOG.info("Cannot find any follower");
+            return -1;
+        }
+
+        public int findAnyObserver() {
+            for (int i = 0; i < mt.length; i++) {
+                if (mt[i].main.quorumPeer.observer != null) {
+                    LOG.info("Observer is {}", i);
+                    return i;
+                }
+            }
+            LOG.info("Cannot find any observer");
+            return -1;
+        }
     }
 
     protected Servers LaunchServers(int numServers) throws IOException, InterruptedException {
         return LaunchServers(numServers, null);
     }
 
+    protected Servers LaunchServers(int numServers, Integer tickTime) throws IOException, InterruptedException {
+        return LaunchServers(numServers, 0, tickTime);
+    }
+
     /** * This is a helper function for launching a set of servers
      *
-     * @param numServers the number of servers
+     * @param numServers the number of participant servers
+     * @param numObserver the number of observer servers
      * @param tickTime A ticktime to pass to MainThread
      * @return
      * @throws IOException
      * @throws InterruptedException
      */
-    protected Servers LaunchServers(int numServers, Integer tickTime) throws IOException, InterruptedException {
-        int SERVER_COUNT = numServers;
+    protected Servers LaunchServers(int numServers, int numObservers, Integer tickTime) throws IOException, InterruptedException {
+        int SERVER_COUNT = numServers + numObservers;
         QuorumPeerMainTest.Servers svrs = new QuorumPeerMainTest.Servers();
         svrs.clientPorts = new int[SERVER_COUNT];
         StringBuilder sb = new StringBuilder();
         for (int i = 0; i < SERVER_COUNT; i++) {
             svrs.clientPorts[i] = PortAssignment.unique();
-            sb.append("server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + svrs.clientPorts[i] + "\n");
+            String role = i < numServers ? "participant" : "observer";
+            sb.append(String.format("server.%d=127.0.0.1:%d:%d:%s;127.0.0.1:%d\n",
+                    i, PortAssignment.unique(), PortAssignment.unique(), role,
+                    svrs.clientPorts[i]));
         }
         String quorumCfgSection = sb.toString();
 
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java
index 5302416..c106f4c 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java
@@ -360,8 +360,8 @@ public class Zab1_0Test extends ZKTestCase {
             for (int i = 1; i <= ops; i++) {
                 zxid = ZxidUtils.makeZxid(1, i);
                 String path = "/foo-" + i;
-                zkDb.processTxn(new TxnHeader(13, 1000 + i, zxid, 30
-                                                                          + i, ZooDefs.OpCode.create), new CreateTxn(path, "fpjwasalsohere".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1));
+                zkDb.processTxn(new TxnHeader(13, 1000 + i, zxid, 30 + i, ZooDefs.OpCode.create),
+                        new CreateTxn(path, "fpjwasalsohere".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1), null);
                 Stat stat = new Stat();
                 assertEquals("fpjwasalsohere", new String(zkDb.getData(path, stat, null)));
             }
@@ -585,7 +585,7 @@ public class Zab1_0Test extends ZKTestCase {
                     // Setup a database with a single /foo node
                     ZKDatabase zkDb = new ZKDatabase(new FileTxnSnapLog(tmpDir, tmpDir));
                     final long firstZxid = ZxidUtils.makeZxid(1, 1);
-                    zkDb.processTxn(new TxnHeader(13, 1313, firstZxid, 33, ZooDefs.OpCode.create), new CreateTxn("/foo", "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1));
+                    zkDb.processTxn(new TxnHeader(13, 1313, firstZxid, 33, ZooDefs.OpCode.create), new CreateTxn("/foo", "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1), null);
                     Stat stat = new Stat();
                     assertEquals("data1", new String(zkDb.getData("/foo", stat, null)));
 
@@ -719,7 +719,7 @@ public class Zab1_0Test extends ZKTestCase {
                     // Setup a database with a single /foo node
                     ZKDatabase zkDb = new ZKDatabase(new FileTxnSnapLog(tmpDir, tmpDir));
                     final long firstZxid = ZxidUtils.makeZxid(1, 1);
-                    zkDb.processTxn(new TxnHeader(13, 1313, firstZxid, 33, ZooDefs.OpCode.create), new CreateTxn("/foo", "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1));
+                    zkDb.processTxn(new TxnHeader(13, 1313, firstZxid, 33, ZooDefs.OpCode.create), new CreateTxn("/foo", "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1), null);
                     Stat stat = new Stat();
                     assertEquals("data1", new String(zkDb.getData("/foo", stat, null)));
 
@@ -949,8 +949,8 @@ public class Zab1_0Test extends ZKTestCase {
                     ZKDatabase zkDb = new ZKDatabase(new FileTxnSnapLog(tmpDir, tmpDir));
                     final long foo1Zxid = ZxidUtils.makeZxid(1, 1);
                     final long foo2Zxid = ZxidUtils.makeZxid(1, 2);
-                    zkDb.processTxn(new TxnHeader(13, 1313, foo1Zxid, 33, ZooDefs.OpCode.create), new CreateTxn("/foo1", "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1));
-                    zkDb.processTxn(new TxnHeader(13, 1313, foo2Zxid, 33, ZooDefs.OpCode.create), new CreateTxn("/foo2", "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1));
+                    zkDb.processTxn(new TxnHeader(13, 1313, foo1Zxid, 33, ZooDefs.OpCode.create), new CreateTxn("/foo1", "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1), null);
+                    zkDb.processTxn(new TxnHeader(13, 1313, foo2Zxid, 33, ZooDefs.OpCode.create), new CreateTxn("/foo2", "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1), null);
                     Stat stat = new Stat();
                     assertEquals("data1", new String(zkDb.getData("/foo1", stat, null)));
                     assertEquals("data1", new String(zkDb.getData("/foo2", stat, null)));
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/AdHashTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/AdHashTest.java
index e883011..671d484 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/AdHashTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/AdHashTest.java
@@ -87,7 +87,7 @@ public class AdHashTest extends ZKTestCase {
         removeListOfDigests(hashall, bucket3);
         removeListOfDigests(hashall, bucket2);
         removeListOfDigests(hashall, bucket1);
-        assertEquals("empty hashall's digest should be 0", hashall.toHexString(), "0");
+        assertEquals("empty hashall's digest should be 0", hashall.toString(), "0");
 
         AdHash hash45 = new AdHash();
         addListOfDigests(hash45, bucket4);
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/GetProposalFromTxnTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/GetProposalFromTxnTest.java
index 4110991..207caf5 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/GetProposalFromTxnTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/GetProposalFromTxnTest.java
@@ -34,6 +34,7 @@ import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.SyncRequestProcessor;
+import org.apache.zookeeper.server.TxnLogEntry;
 import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.quorum.Leader.Proposal;
@@ -105,8 +106,10 @@ public class GetProposalFromTxnTest extends ZKTestCase {
         // Get zxid of create requests
         while (itr.hasNext()) {
             Proposal proposal = itr.next();
-            TxnHeader hdr = new TxnHeader();
-            Record rec = SerializeUtils.deserializeTxn(proposal.packet.getData(), hdr);
+            TxnLogEntry logEntry = SerializeUtils.deserializeTxn(
+                    proposal.packet.getData());
+            TxnHeader hdr = logEntry.getHeader();
+            Record rec = logEntry.getTxn();
             if (hdr.getType() == OpCode.create) {
                 retrievedZxids.add(hdr.getZxid());
                 createCount++;
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/JMXEnv.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/JMXEnv.java
index 6fbb620..76845ed 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/JMXEnv.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/JMXEnv.java
@@ -55,6 +55,7 @@ public class JMXEnv {
         cs.start();
 
         JMXServiceURL addr = cs.getAddress();
+        LOG.info("connecting to addr {}", addr);
 
         cc = JMXConnectorFactory.connect(addr);
     }