You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by th...@apache.org on 2013/10/07 21:16:04 UTC

svn commit: r1530035 - in /zookeeper/branches/branch-3.4: ./ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/server/quorum/

Author: thawan
Date: Mon Oct  7 19:16:04 2013
New Revision: 1530035

URL: http://svn.apache.org/r1530035
Log:
ZOOKEEPER-1551. Observers ignore txns that come after snapshot and UPTODATE (thawan, fpj via thawan)

Modified:
    zookeeper/branches/branch-3.4/CHANGES.txt
    zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
    zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
    zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
    zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java

Modified: zookeeper/branches/branch-3.4/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/CHANGES.txt?rev=1530035&r1=1530034&r2=1530035&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/CHANGES.txt (original)
+++ zookeeper/branches/branch-3.4/CHANGES.txt Mon Oct  7 19:16:04 2013
@@ -119,6 +119,9 @@ BUGFIXES:
 
   ZOOKEEPER-732. Improper translation of error into Python exception (Andrei Savu, Lei Zhang, fpj via fpj)
 
+  ZOOKEEPER-1551. Observers ignore txns that come after snapshot and UPTODATE
+  (thawan, fpj via thawan)
+
 IMPROVEMENTS:
 
   ZOOKEEPER-1564. Allow JUnit test build with IBM Java

Modified: zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=1530035&r1=1530034&r2=1530035&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Leader.java (original)
+++ zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Leader.java Mon Oct  7 19:16:04 2013
@@ -829,13 +829,16 @@ public class Leader {
                         .getZxid(), null, null);
                 handler.queuePacket(qp);
             }
-            List<Long>zxids = new ArrayList<Long>(outstandingProposals.keySet());
-            Collections.sort(zxids);
-            for (Long zxid: zxids) {
-                if (zxid <= lastSeenZxid) {
-                    continue;
+            // Only participant need to get outstanding proposals
+            if (handler.getLearnerType() == LearnerType.PARTICIPANT) {
+                List<Long>zxids = new ArrayList<Long>(outstandingProposals.keySet());
+                Collections.sort(zxids);
+                for (Long zxid: zxids) {
+                    if (zxid <= lastSeenZxid) {
+                        continue;
+                    }
+                    handler.queuePacket(outstandingProposals.get(zxid).packet);
                 }
-                handler.queuePacket(outstandingProposals.get(zxid).packet);
             }
         }
         if (handler.getLearnerType() == LearnerType.PARTICIPANT) {

Modified: zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Learner.java?rev=1530035&r1=1530034&r2=1530035&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Learner.java (original)
+++ zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Learner.java Mon Oct  7 19:16:04 2013
@@ -392,9 +392,28 @@ public class Learner {       
                     }
                     break;
                 case Leader.INFORM:
-                    TxnHeader hdr = new TxnHeader();
-                    Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
-                    zk.processTxn(hdr, txn);
+                    /*
+                     * Only observer get this type of packet. We treat this
+                     * as receiving PROPOSAL and COMMMIT.
+                     */
+                    PacketInFlight packet = new PacketInFlight();
+                    packet.hdr = new TxnHeader();
+                    packet.rec = SerializeUtils.deserializeTxn(qp.getData(), packet.hdr);
+                    // Log warning message if txn comes out-of-order
+                    if (packet.hdr.getZxid() != lastQueued + 1) {
+                        LOG.warn("Got zxid 0x"
+                                + Long.toHexString(packet.hdr.getZxid())
+                                + " expected 0x"
+                                + Long.toHexString(lastQueued + 1));
+                    }
+                    lastQueued = packet.hdr.getZxid();
+                    if (!snapshotTaken) {
+                        // Apply to db directly if we haven't taken the snapshot
+                        zk.processTxn(packet.hdr, packet.rec);
+                    } else {
+                        packetsNotCommitted.add(packet);
+                        packetsCommitted.add(qp.getZxid());
+                    }
                     break;
                 case Leader.UPTODATE:
                     if (!snapshotTaken) { // true for the pre v1.0 case
@@ -425,6 +444,30 @@ public class Learner {       
             for(Long zxid: packetsCommitted) {
                 fzk.commit(zxid);
             }
+        } else if (zk instanceof ObserverZooKeeperServer) {
+            // Similar to follower, we need to log requests between the snapshot
+            // and UPTODATE
+            ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk;
+            for (PacketInFlight p : packetsNotCommitted) {
+                Long zxid = packetsCommitted.peekFirst();
+                if (p.hdr.getZxid() != zxid) {
+                    // log warning message if there is no matching commit
+                    // old leader send outstanding proposal to observer
+                    LOG.warn("Committing " + Long.toHexString(zxid)
+                            + ", but next proposal is "
+                            + Long.toHexString(p.hdr.getZxid()));
+                    continue;
+                }
+                packetsCommitted.remove();
+                Request request = new Request(null, p.hdr.getClientId(),
+                        p.hdr.getCxid(), p.hdr.getType(), null, null);
+                request.txn = p.rec;
+                request.hdr = p.hdr;
+                ozk.commitRequest(request);
+            }
+        } else {
+            // New server type need to handle in-flight packets
+            throw new UnsupportedOperationException("Unknown server type");
         }
     }
     

Modified: zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=1530035&r1=1530034&r2=1530035&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original)
+++ zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Mon Oct  7 19:16:04 2013
@@ -247,7 +247,7 @@ public class QuorumPeer extends Thread i
      * Enables/Disables sync request processor. This option is enabled
      * by default and is to be used with observers.
      */
-    protected boolean syncEnabled;
+    protected boolean syncEnabled = true;
 
     /**
      * The current tick

Modified: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java?rev=1530035&r1=1530034&r2=1530035&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java (original)
+++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java Mon Oct  7 19:16:04 2013
@@ -52,6 +52,7 @@ import org.apache.zookeeper.server.Serve
 import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.ZooKeeperServer.DataTreeBuilder;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.persistence.Util;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
@@ -332,6 +333,10 @@ public class Zab1_0Test {
         void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f) throws Exception;
     }
     
+    static public interface ObserverConversation {
+        void converseWithObserver(InputArchive ia, OutputArchive oa, Observer o) throws Exception;
+    }
+
     public void testLeaderConversation(LeaderConversation conversation) throws Exception {
         Socket pair[] = getSocketPair();
         Socket leaderSocket = pair[0];
@@ -496,6 +501,57 @@ public class Zab1_0Test {
         }
     }
 
+    public void testObserverConversation(ObserverConversation conversation) throws Exception {
+        File tmpDir = File.createTempFile("test", "dir");
+        tmpDir.delete();
+        tmpDir.mkdir();
+        Thread observerThread = null;
+        ConversableObserver observer = null;
+        QuorumPeer peer = null;
+        try {
+            peer = createQuorumPeer(tmpDir);
+            peer.setSyncEnabled(true);
+            observer = createObserver(tmpDir, peer);
+            peer.observer = observer;
+
+            ServerSocket ss = new ServerSocket();
+            ss.bind(null);
+            observer.setLeaderSocketAddress((InetSocketAddress)ss.getLocalSocketAddress());
+            final Observer observerForThread = observer;
+
+            observerThread = new Thread() {
+                public void run() {
+                    try {
+                        observerForThread.observeLeader();
+                    } catch(Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            };
+            observerThread.start();
+            Socket leaderSocket = ss.accept();
+
+            InputArchive ia = BinaryInputArchive.getArchive(leaderSocket
+                    .getInputStream());
+            OutputArchive oa = BinaryOutputArchive.getArchive(leaderSocket
+                    .getOutputStream());
+
+            conversation.converseWithObserver(ia, oa, observer);
+        } finally {
+            if (observer != null) {
+                observer.shutdown();
+            }
+            if (observerThread != null) {
+                observerThread.interrupt();
+                observerThread.join();
+            }
+            if (peer != null) {
+                peer.shutdown();
+            }
+            recursiveDelete(tmpDir);
+        }
+    }
+
     @Test
     public void testUnnecessarySnap() throws Exception {
         testPopulatedLeaderConversation(new PopulatedLeaderConversation() {
@@ -535,7 +591,31 @@ public class Zab1_0Test {
            }
        }, 2);
     }
-    
+
+    // We want to track the change with a callback rather than depending on timing
+    class TrackerWatcher implements Watcher {
+        boolean changed;
+        synchronized void waitForChange() throws InterruptedException {
+            while(!changed) {
+                wait();
+            }
+        }
+
+        @Override
+        public void process(WatchedEvent event) {
+            if (event.getType() == EventType.NodeDataChanged) {
+                synchronized(this) {
+                    changed = true;
+                    notifyAll();
+                }
+            }
+        }
+        synchronized public boolean changed() {
+            return changed;
+        } 
+    };
+
+
     @Test
     public void testNormalFollowerRun() throws Exception {
         testFollowerConversation(new FollowerConversation() {
@@ -613,28 +693,6 @@ public class Zab1_0Test {
                     proposeSetData(qp, proposalZxid, "data2", 2);
                     oa.writeRecord(qp, null);
                     
-                    // We want to track the change with a callback rather than depending on timing
-                    class TrackerWatcher implements Watcher {
-                        boolean changed;
-                        synchronized void waitForChange() throws InterruptedException {
-                            while(!changed) {
-                                wait();
-                            }
-                        }
-                        @Override
-                        public void process(WatchedEvent event) {
-                            if (event.getType() == EventType.NodeDataChanged) {
-                                synchronized(this) {
-                                    changed = true;
-                                    notifyAll();
-                                }
-                            }
-                        }
-                        synchronized public boolean changed() {
-                            return changed;
-                        }
-                        
-                    };
                     TrackerWatcher watcher = new TrackerWatcher();
                     
                     // The change should not have happened yet, since we haven't committed
@@ -925,6 +983,157 @@ public class Zab1_0Test {
     }
 
     @Test
+    public void testNormalObserverRun() throws Exception {
+        testObserverConversation(new ObserverConversation() {
+            @Override
+            public void converseWithObserver(InputArchive ia, OutputArchive oa,
+                    Observer o) throws Exception {
+                File tmpDir = File.createTempFile("test", "dir");
+                tmpDir.delete();
+                tmpDir.mkdir();
+                File logDir = o.zk.getTxnLogFactory().getDataDir().getParentFile();
+                File snapDir = o.zk.getTxnLogFactory().getSnapDir().getParentFile();
+                try {
+                    Assert.assertEquals(0, o.self.getAcceptedEpoch());
+                    Assert.assertEquals(0, o.self.getCurrentEpoch());
+
+                    // Setup a database with a single /foo node
+                    ZKDatabase zkDb = new ZKDatabase(new FileTxnSnapLog(tmpDir, tmpDir));
+                    final long foo1Zxid = ZxidUtils.makeZxid(1, 1);
+                    final long foo2Zxid = ZxidUtils.makeZxid(1, 2);
+                    zkDb.processTxn(new TxnHeader(13, 1313, foo1Zxid, 33,
+                            ZooDefs.OpCode.create), new CreateTxn("/foo1",
+                            "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                            false, 1));
+                    zkDb.processTxn(new TxnHeader(13, 1313, foo2Zxid, 33,
+                            ZooDefs.OpCode.create), new CreateTxn("/foo2",
+                            "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                            false, 1));
+                    Stat stat = new Stat();
+                    Assert.assertEquals("data1",
+                            new String(zkDb.getData("/foo1", stat, null)));
+                    Assert.assertEquals("data1",
+                            new String(zkDb.getData("/foo2", stat, null)));
+
+                    QuorumPacket qp = new QuorumPacket();
+                    readPacketSkippingPing(ia, qp);
+                    Assert.assertEquals(Leader.OBSERVERINFO, qp.getType());
+                    Assert.assertEquals(qp.getZxid(), 0);
+                    LearnerInfo learnInfo = new LearnerInfo();
+                    ByteBufferInputStream.byteBuffer2Record(
+                            ByteBuffer.wrap(qp.getData()), learnInfo);
+                    Assert.assertEquals(learnInfo.getProtocolVersion(), 0x10000);
+                    Assert.assertEquals(learnInfo.getServerid(), 0);
+
+                    // We are simulating an established leader, so the epoch is 1
+                    qp.setType(Leader.LEADERINFO);
+                    qp.setZxid(ZxidUtils.makeZxid(1, 0));
+                    byte protoBytes[] = new byte[4];
+                    ByteBuffer.wrap(protoBytes).putInt(0x10000);
+                    qp.setData(protoBytes);
+                    oa.writeRecord(qp, null);
+
+                    readPacketSkippingPing(ia, qp);
+                    Assert.assertEquals(Leader.ACKEPOCH, qp.getType());
+                    Assert.assertEquals(0, qp.getZxid());
+                    Assert.assertEquals(ZxidUtils.makeZxid(0, 0), ByteBuffer
+                            .wrap(qp.getData()).getInt());
+                    Assert.assertEquals(1, o.self.getAcceptedEpoch());
+                    Assert.assertEquals(0, o.self.getCurrentEpoch());
+
+                    // Send the snapshot we created earlier
+                    qp.setType(Leader.SNAP);
+                    qp.setData(new byte[0]);
+                    qp.setZxid(zkDb.getDataTreeLastProcessedZxid());
+                    oa.writeRecord(qp, null);
+                    zkDb.serializeSnapshot(oa);
+                    oa.writeString("BenWasHere", null);
+                    qp.setType(Leader.NEWLEADER);
+                    qp.setZxid(ZxidUtils.makeZxid(1, 0));
+                    oa.writeRecord(qp, null);
+
+                    // Get the ack of the new leader
+                    readPacketSkippingPing(ia, qp);
+                    Assert.assertEquals(Leader.ACK, qp.getType());
+                    Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
+                    Assert.assertEquals(1, o.self.getAcceptedEpoch());
+                    Assert.assertEquals(1, o.self.getCurrentEpoch());
+
+                    Assert.assertEquals(foo2Zxid, o.zk.getLastProcessedZxid());
+
+                    // Make sure the data was recorded in the filesystem ok
+                    ZKDatabase zkDb2 = new ZKDatabase(new FileTxnSnapLog(
+                            logDir, snapDir));
+                    long lastZxid = zkDb2.loadDataBase();
+                    Assert.assertEquals("data1",
+                            new String(zkDb2.getData("/foo1", stat, null)));
+                    Assert.assertEquals(foo2Zxid, lastZxid);
+
+                    // Register watch
+                    TrackerWatcher watcher = new TrackerWatcher();
+                    Assert.assertEquals("data1", new String(o.zk
+                            .getZKDatabase().getData("/foo2", stat, watcher)));
+
+                    // Propose /foo1 update
+                    long proposalZxid = ZxidUtils.makeZxid(1, 1000);
+                    proposeSetData(qp, "/foo1", proposalZxid, "data2", 2);
+                    oa.writeRecord(qp, null);
+
+                    // Commit /foo1 update
+                    qp.setType(Leader.COMMIT);
+                    qp.setZxid(proposalZxid);
+                    oa.writeRecord(qp, null);
+
+                    // Inform /foo2 update
+                    long informZxid = ZxidUtils.makeZxid(1, 1001);
+                    proposeSetData(qp, "/foo2", informZxid, "data2", 2);
+                    qp.setType(Leader.INFORM);
+                    oa.writeRecord(qp, null);
+
+                    qp.setType(Leader.UPTODATE);
+                    qp.setZxid(0);
+                    oa.writeRecord(qp, null);
+
+                    // Read the uptodate ack
+                    readPacketSkippingPing(ia, qp);
+                    Assert.assertEquals(Leader.ACK, qp.getType());
+                    Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
+
+                    // Data should get updated
+                    watcher.waitForChange();
+                    Assert.assertEquals("data2", new String(o.zk
+                            .getZKDatabase().getData("/foo1", stat, null)));
+                    Assert.assertEquals("data2", new String(o.zk
+                            .getZKDatabase().getData("/foo2", stat, null)));
+
+                    zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir));
+                    lastZxid = zkDb2.loadDataBase();
+                    Assert.assertEquals("data2", new String(zkDb2.getData("/foo1", stat, null)));
+                    Assert.assertEquals("data2", new String(zkDb2.getData("/foo2", stat, null)));
+                    Assert.assertEquals(informZxid, lastZxid);
+                } finally {
+                    recursiveDelete(tmpDir);
+                }
+
+            }
+
+            private void proposeSetData(QuorumPacket qp, String path,
+                    long zxid, String data, int version) throws IOException {
+                qp.setType(Leader.PROPOSAL);
+                qp.setZxid(zxid);
+                TxnHeader hdr = new TxnHeader(4, 1414, qp.getZxid(), 55,
+                        ZooDefs.OpCode.setData);
+                SetDataTxn sdt = new SetDataTxn(path, data.getBytes(), version);
+                ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                OutputArchive boa = BinaryOutputArchive.getArchive(baos);
+                boa.writeRecord(hdr, null);
+                boa.writeRecord(sdt, null);
+                qp.setData(baos.toByteArray());
+            }
+        });
+    }
+
+    @Test
     public void testLeaderBehind() throws Exception {
         testLeaderConversation(new LeaderConversation() {
             public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l)
@@ -1057,6 +1266,35 @@ public class Zab1_0Test {
         return new ConversableFollower(peer, zk);
     }
 
+    static class ConversableObserver extends Observer {
+        
+        ConversableObserver(QuorumPeer self, ObserverZooKeeperServer zk) {
+            super(self, zk);
+        }
+        
+        InetSocketAddress leaderAddr;
+        public void setLeaderSocketAddress(InetSocketAddress addr) {
+            leaderAddr = addr;
+        }
+        
+        @Override
+        protected InetSocketAddress findLeader() {
+            return leaderAddr;
+        }
+    }
+        
+    private ConversableObserver createObserver(File tmpDir, QuorumPeer peer)
+            throws IOException {
+        FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir);
+        peer.setTxnFactory(logFactory);
+        DataTreeBuilder treeBuilder = new ZooKeeperServer.BasicDataTreeBuilder();
+        ZKDatabase zkDb = new ZKDatabase(logFactory);
+        ObserverZooKeeperServer zk = new ObserverZooKeeperServer(logFactory, peer, treeBuilder, zkDb);
+        peer.setZKDatabase(zkDb);
+        return new ConversableObserver(peer, zk);
+    }
+        
+    
     private QuorumPeer createQuorumPeer(File tmpDir) throws IOException,
             FileNotFoundException {
         QuorumPeer peer = new QuorumPeer();