You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ca...@apache.org on 2011/09/03 19:37:10 UTC

svn commit: r1164891 - in /zookeeper/branches/branch-3.3: ./ src/java/main/org/apache/zookeeper/server/persistence/ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/server/quorum/

Author: camille
Date: Sat Sep  3 17:37:10 2011
New Revision: 1164891

URL: http://svn.apache.org/viewvc?rev=1164891&view=rev
Log:
ZOOKEEPER-1154, ZOOKEEPER-1156: 
Data inconsistency when the node(s) with the highest zxid is not present at the time of leader election
Log truncation truncating log too much - can cause data loss

Vishal Kathuria via camille

Modified:
    zookeeper/branches/branch-3.3/CHANGES.txt
    zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
    zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
    zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
    zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java

Modified: zookeeper/branches/branch-3.3/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/CHANGES.txt?rev=1164891&r1=1164890&r2=1164891&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/CHANGES.txt (original)
+++ zookeeper/branches/branch-3.3/CHANGES.txt Sat Sep  3 17:37:10 2011
@@ -13,6 +13,10 @@ BUGFIXES:
   ZOOKEEPER-1117. zookeeper 3.3.3 fails to build with gcc >= 4.6.1 on
   Debian/Ubuntu (James Page via mahadev)
 
+  ZOOKEEPER-1154. Data inconsistency when the node(s) with the highest zxid is not present at the time of leader election. (Vishal Kathuria via camille)
+  
+  ZOOKEEPER-1156. Log truncation truncating log too much - can cause data loss. (Vishal Kathuria via camille)
+
 Release 3.3.3 - 2011-02-23
 Backward compatible changes:
 

Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java?rev=1164891&r1=1164890&r2=1164891&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java (original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java Sat Sep  3 17:37:10 2011
@@ -400,21 +400,32 @@ public class FileTxnLog implements TxnLo
         long position;
         protected PositionInputStream(InputStream in) {
             super(in);
+            position = 0;
         }
         
         @Override
         public int read() throws IOException {
             int rc = super.read();
-            if (rc > 0) {
+            if (rc > -1) {
                 position++;
             }
             return rc;
         }
+
+        public int read(byte[] b) throws IOException {
+            int rc = super.read(b);
+            if (rc > 0) {
+                position += rc;
+            }
+            return rc;            
+        }
         
         @Override
         public int read(byte[] b, int off, int len) throws IOException {
             int rc = super.read(b, off, len);
-            position += rc;
+            if (rc > 0) {
+                position += rc;
+            }
             return rc;
         }
         
@@ -429,6 +440,21 @@ public class FileTxnLog implements TxnLo
         public long getPosition() {
             return position;
         }
+
+        @Override
+        public boolean markSupported() {
+            return false;
+        }
+
+        @Override
+        public void mark(int readLimit) {
+            throw new UnsupportedOperationException("mark");
+        }
+
+        @Override
+        public void reset() {
+            throw new UnsupportedOperationException("reset");
+        }
     }
     
     /**
@@ -504,7 +530,7 @@ public class FileTxnLog implements TxnLo
         }
 
         /**
-         * read the header fomr the inputarchive
+         * read the header from the inputarchive
          * @param ia the inputarchive to be read from
          * @param is the inputstream
          * @throws IOException

Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java?rev=1164891&r1=1164890&r2=1164891&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java (original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java Sat Sep  3 17:37:10 2011
@@ -270,14 +270,52 @@ public class LearnerHandler extends Thre
                 rl.lock();        
                 final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();
                 final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();
+                LOG.info("Synchronizing with Follower sid: " + this.sid
+                        +" maxCommittedLog ="+Long.toHexString(maxCommittedLog)
+                        +" minCommittedLog = "+Long.toHexString(minCommittedLog)
+                        +" peerLastZxid = "+Long.toHexString(peerLastZxid));
+
                 LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();
+
                 if (proposals.size() != 0) {
                     if ((maxCommittedLog >= peerLastZxid)
                             && (minCommittedLog <= peerLastZxid)) {
-                        packetToSend = Leader.DIFF;
-                        zxidToSend = maxCommittedLog;
+
+                        // as we look through proposals, this variable keeps track of previous
+                        // proposal Id.
+                        long prevProposalZxid = minCommittedLog;
+
+                        // Keep track of whether we are about to send the first packet.
+                        // Before sending the first packet, we have to tell the learner
+                        // whether to expect a trunc or a diff
+                        boolean firstPacket=true;
+
                         for (Proposal propose: proposals) {
-                            if (propose.packet.getZxid() > peerLastZxid) {
+                            // skip the proposals the peer already has
+                            if (propose.packet.getZxid() <= peerLastZxid) {
+                                prevProposalZxid = propose.packet.getZxid();
+                                continue;
+                            } else {
+                                // If we are sending the first packet, figure out whether to trunc
+                                // in case the follower has some proposals that the leader doesn't
+                                if (firstPacket) {
+                                    firstPacket = false;
+                                    // Does the peer have some proposals that the leader hasn't seen yet
+                                    if (prevProposalZxid < peerLastZxid) {
+                                        // send a trunc message before sending the diff
+                                        packetToSend = Leader.TRUNC;
+                                        LOG.info("Sending TRUNC");
+                                        zxidToSend = prevProposalZxid;
+                                        updates = zxidToSend;
+                                    } 
+                                    else {
+                                        // Just send the diff
+                                        packetToSend = Leader.DIFF;
+                                        LOG.info("Sending diff");
+                                        zxidToSend = maxCommittedLog;        
+                                    }
+
+                                }
                                 queuePacket(propose.packet);
                                 QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),
                                         null, null);
@@ -292,7 +330,7 @@ public class LearnerHandler extends Thre
                 } else {
                     // just let the state transfer happen
                 }               
-                
+
                 leaderLastZxid = leader.startForwarding(this, updates);
                 if (peerLastZxid == leaderLastZxid) {
                     // We are in sync so we'll do an empty diff

Modified: zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java?rev=1164891&r1=1164890&r2=1164891&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java (original)
+++ zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java Sat Sep  3 17:37:10 2011
@@ -21,6 +21,7 @@ package org.apache.zookeeper.server.quor
 import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
 
 import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.io.LineNumberReader;
 import java.io.StringReader;
 import java.net.InetSocketAddress;
@@ -36,7 +37,10 @@ import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.PortAssignment;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.server.quorum.QuorumPeerTestBase.MainThread;
 import org.apache.zookeeper.test.ClientBase;
+import org.junit.Assert;
 import org.junit.Test;
 
 
@@ -286,4 +290,165 @@ public class QuorumPeerMainTest extends 
         assertTrue("fastleaderelection used", found);
     }
    
+    /**
+     * Test the case of server with highest zxid not present at leader election and joining later.
+     * This test case is for reproducing the issue and fixing the bug mentioned in  ZOOKEEPER-1154
+     * and ZOOKEEPER-1156.
+     */
+    @Test
+    public void testHighestZxidJoinLate() throws Exception {
+        int numServers = 3;
+        Servers svrs = LaunchServers(numServers);
+        String path = "/hzxidtest";
+        int leader=-1;
+
+        // find the leader
+        for (int i=0; i < numServers; i++) {
+            if (svrs.mt[i].main.quorumPeer.leader != null) {
+                leader = i;
+            }
+        }
+        
+        // make sure there is a leader
+        Assert.assertTrue("There should be a leader", leader >=0);
+
+        int nonleader = (leader+1)%numServers;
+
+        byte[] input = new byte[1];
+        input[0] = 1;
+        byte[] output;
+
+        // Create a couple of nodes
+        svrs.zk[leader].create(path+leader, input, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        svrs.zk[leader].create(path+nonleader, input, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        
+        // make sure the updates indeed committed. If it is not
+        // the following statement will throw.
+        output = svrs.zk[leader].getData(path+nonleader, false, null);
+        
+        // Shutdown every one else but the leader
+        for (int i=0; i < numServers; i++) {
+            if (i != leader) {
+                svrs.mt[i].shutdown();
+            }
+        }
+
+        input[0] = 2;
+
+        // Update the node on the leader
+        svrs.zk[leader].setData(path+leader, input, -1, null, null);     
+        
+        // wait some time to let this get written to disk
+        Thread.sleep(500);
+
+        // shut the leader down
+        svrs.mt[leader].shutdown();
+
+        System.gc();
+
+        waitForAll(svrs.zk, States.CONNECTING);
+
+        // Start everyone but the leader
+        for (int i=0; i < numServers; i++) {
+            if (i != leader) {
+                svrs.mt[i].start();
+            }
+        }
+
+        // wait to connect to one of these
+        waitForOne(svrs.zk[nonleader], States.CONNECTED);
+
+        // validate that the old value is there and not the new one
+        output = svrs.zk[nonleader].getData(path+leader, false, null);
+
+        Assert.assertEquals(
+                "Expecting old value 1 since 2 isn't committed yet",
+                output[0], 1);
+
+        // Do some other update, so we bump the maxCommttedZxid
+        // by setting the value to 2
+        svrs.zk[nonleader].setData(path+nonleader, input, -1);
+
+        // start the old leader 
+        svrs.mt[leader].start();
+
+        // connect to it
+        waitForOne(svrs.zk[leader], States.CONNECTED);
+
+        // make sure it doesn't have the new value that it alone had logged
+        output = svrs.zk[leader].getData(path+leader, false, null);
+        Assert.assertEquals(
+                "Validating that the deposed leader has rolled back that change it had written",
+                output[0], 1);
+        
+        // make sure the leader has the subsequent changes that were made while it was offline
+        output = svrs.zk[leader].getData(path+nonleader, false, null);
+        Assert.assertEquals(
+                "Validating that the deposed leader caught up on changes it missed",
+                output[0], 2);
+    }
+
+    // This class holds the servers and clients for those servers
+    private class Servers {
+        MainThread mt[];
+        ZooKeeper zk[];
+    }
+    
+    /**
+     * This is a helper function for launching a set of servers
+     *  
+     * @param numServers
+     * @return
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    private Servers LaunchServers(int numServers) throws IOException, InterruptedException {
+        int SERVER_COUNT = numServers;
+        Servers svrs = new Servers();
+        final int clientPorts[] = new int[SERVER_COUNT];
+        StringBuilder sb = new StringBuilder();
+        for(int i = 0; i < SERVER_COUNT; i++) {
+            clientPorts[i] = PortAssignment.unique();
+            sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+"\n");
+        }
+        String quorumCfgSection = sb.toString();
+  
+        MainThread mt[] = new MainThread[SERVER_COUNT];
+        ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
+        for(int i = 0; i < SERVER_COUNT; i++) {
+            mt[i] = new MainThread(i, clientPorts[i], quorumCfgSection);
+            mt[i].start();
+            zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
+        }
+  
+        waitForAll(zk, States.CONNECTED);
+  
+        svrs.mt = mt;
+        svrs.zk = zk;
+        return svrs;
+    }
+    
+    private void waitForOne(ZooKeeper zk, States state) throws InterruptedException {
+        while(zk.getState() != state) {
+            Thread.sleep(500);
+        }
+    }
+    
+    private void waitForAll(ZooKeeper[] zks, States state) throws InterruptedException {
+        int iterations = 10;
+        boolean someoneNotConnected = true;
+        while(someoneNotConnected) {
+            if (iterations-- == 0) {
+                throw new RuntimeException("Waiting too long");
+            }
+            
+            someoneNotConnected = false;
+            for(ZooKeeper zk: zks) {
+                if (zk.getState() != state) {
+                    someoneNotConnected = true;
+                }
+            }
+            Thread.sleep(1000);
+        }
+    }
 }

Modified: zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java?rev=1164891&r1=1164890&r2=1164891&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java (original)
+++ zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java Sat Sep  3 17:37:10 2011
@@ -40,7 +40,7 @@ import org.apache.zookeeper.test.QuorumB
 public class QuorumPeerTestBase extends TestCase implements Watcher {
     protected static final Logger LOG =
         Logger.getLogger(QuorumPeerTestBase.class);
-    
+
     public void process(WatchedEvent event) {
         // ignore for this test
     }
@@ -56,7 +56,7 @@ public class QuorumPeerTestBase extends 
 
     public static class MainThread extends Thread {
         final File confFile;
-        final TestQPMain main;
+        volatile TestQPMain main;
 
         public MainThread(int myid, int clientPort, String quorumCfgSection)
             throws IOException
@@ -83,7 +83,7 @@ public class QuorumPeerTestBase extends 
                 dir = dir.replace('\\', '/');
             }
             fwriter.write("dataDir=" + dir + "\n");
-            
+
             fwriter.write("clientPort=" + clientPort + "\n");
             fwriter.write(quorumCfgSection + "\n");
             fwriter.flush();
@@ -98,6 +98,12 @@ public class QuorumPeerTestBase extends 
             main = new TestQPMain();
         }
 
+        Thread currentThread;
+        synchronized public void start() {
+            main = new TestQPMain();
+            currentThread = new Thread(this);
+            currentThread.start();
+        }
         public void run() {
             String args[] = new String[1];
             args[0] = confFile.toString();
@@ -106,11 +112,18 @@ public class QuorumPeerTestBase extends 
             } catch (Exception e) {
                 // test will still fail even though we just log/ignore
                 LOG.error("unexpected exception in run", e);
+            } finally {
+                currentThread = null;
             }
         }
 
-        public void shutdown() {
-            main.shutdown();
+        public void shutdown() throws InterruptedException {
+            Thread t = currentThread;
+            if (t != null && t.isAlive()) {
+                main.shutdown();
+                t.join(500);
+            }
         }
+
     }
 }