You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ma...@apache.org on 2010/07/29 23:10:36 UTC

svn commit: r980576 - in /hadoop/zookeeper/trunk: ./ src/java/main/org/apache/zookeeper/server/ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/test/

Author: mahadev
Date: Thu Jul 29 21:10:36 2010
New Revision: 980576

URL: http://svn.apache.org/viewvc?rev=980576&view=rev
Log:
ZOOKEEPER-790.  Last processed zxid set prematurely while establishing leadership (flavio via mahadev)

Added:
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java
Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=980576&r1=980575&r2=980576&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Thu Jul 29 21:10:36 2010
@@ -72,6 +72,9 @@ BUGFIXES: 
   ZOOKEEPER-783. committedLog in ZKDatabase is not properly synchronized
   (henry via mahadev) 
 
+  ZOOKEEPER-790.  Last processed zxid set prematurely while establishing 
+  leadership (flavio via mahadev)
+
 IMPROVEMENTS:
   ZOOKEEPER-724. Improve junit test integration - log harness information 
   (phunt via mahadev)

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java?rev=980576&r1=980575&r2=980576&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java Thu Jul 29 21:10:36 2010
@@ -163,6 +163,7 @@ public class NIOServerCnxn implements Wa
         public void startup(ZooKeeperServer zks) throws IOException,
                 InterruptedException {
             start();
+            zks.startdata();
             zks.startup();
             setZooKeeperServer(zks);
         }

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java?rev=980576&r1=980575&r2=980576&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java Thu Jul 29 21:10:36 2010
@@ -357,14 +357,18 @@ public class ZooKeeperServer implements 
         }
     }
     
-    public void startup() throws IOException, InterruptedException {
+    public void startdata() 
+    throws IOException, InterruptedException {
         //check to see if zkDb is not null
         if (zkDb == null) {
             zkDb = new ZKDatabase(this.txnLogFactory);
-        }
+        }  
         if (!zkDb.isInitialized()) {
             loadData();
         }
+    }
+    
+    public void startup() {        
         createSessionTracker();
         setupRequestProcessors();
 

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=980576&r1=980575&r2=980576&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java Thu Jul 29 21:10:36 2010
@@ -327,12 +327,6 @@ public class Leader {
                 self.tick++;
             }
             
-            if(LOG.isInfoEnabled()){
-                LOG.info("Have quorum of supporters; starting up and setting last processed zxid: " + zk.getZxid());
-            }
-            zk.startup();
-            zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());
-            
             if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
                 self.cnxnFactory.setZooKeeperServer(zk);
             }
@@ -499,6 +493,11 @@ public class Leader {
                 return;
             } else {
                 lastCommitted = zxid;
+                if(LOG.isInfoEnabled()){
+                    LOG.info("Have quorum of supporters; starting up and setting last processed zxid: " + zk.getZxid());
+                }
+                zk.startup();
+                zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());
             }
         }
     }

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java?rev=980576&r1=980575&r2=980576&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java Thu Jul 29 21:10:36 2010
@@ -352,7 +352,29 @@ public class LearnerHandler extends Thre
                     }
                 }
             }.start();
-
+            
+            /*
+             * Have to wait for the first ACK, wait until 
+             * the leader is ready, and only then we can
+             * start processing messages.
+             */
+            qp = new QuorumPacket();
+            ia.readRecord(qp, "packet");
+            if(qp.getType() != Leader.ACK){
+                LOG.error("Next packet was supposed to be an ACK");
+                return;
+            }
+            leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
+            
+            /*
+             * Wait until leader starts up
+             */
+            synchronized(leader.zk){
+                while(!leader.zk.isRunning() && !this.isInterrupted()){
+                    leader.zk.wait(20);
+                }
+            }
+            
             while (true) {
                 qp = new QuorumPacket();
                 ia.readRecord(qp, "packet");
@@ -475,6 +497,7 @@ public class LearnerHandler extends Thre
         } catch (IOException e) {
             LOG.warn("Ignoring unexpected exception during socket close", e);
         }
+        this.interrupt();
         leader.removeLearnerHandler(this);
     }
 

Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java?rev=980576&r1=980575&r2=980576&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java Thu Jul 29 21:10:36 2010
@@ -36,6 +36,7 @@ import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.quorum.Leader;
 import org.apache.zookeeper.server.quorum.LearnerHandler;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -267,7 +268,42 @@ public class QuorumTest extends QuorumBa
         }
         zk.close();
     }
-    
+
+    /** 
+     * See ZOOKEEPER-790 for details 
+     * */
+    @Test
+    public void testFollowersStartAfterLeader() throws Exception {
+        QuorumUtil qu = new QuorumUtil(1);
+        CountdownWatcher watcher = new CountdownWatcher();
+        qu.startQuorum();
+        
+        int index = 1;
+        while(qu.getPeer(index).peer.leader == null)
+            index++;
+        
+        ZooKeeper zk = new ZooKeeper(
+                "127.0.0.1:" + qu.getPeer((index == 1)?2:1).peer.getClientPort(),
+                ClientBase.CONNECTION_TIMEOUT, watcher);
+        watcher.waitForConnected(CONNECTION_TIMEOUT);
+        
+        // break the quorum
+        qu.shutdown(index);
+
+        // try to reestablish the quorum
+        qu.start(index);
+        Assert.assertTrue("quorum reestablishment failed",
+                QuorumBase.waitForServerUp(
+                        "127.0.0.1:" + qu.getPeer(2).clientPort,
+                        CONNECTION_TIMEOUT));
+        Thread.sleep(1000);
+
+        // zk should have reconnected already
+        zk.create("/test", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        zk.close();
+    }
+
     /**
      * Tests if closeSession can be logged before a leader gets established, which
      * could lead to a locked-out follower (see ZOOKEEPER-790). 
@@ -293,21 +329,24 @@ public class QuorumTest extends QuorumBa
     throws IOException, InterruptedException, KeeperException{
         final Semaphore sem = new Semaphore(0);
                 
-        Leader leader = qb.s1.leader;
-        if (leader == null) leader = qb.s2.leader;
-        if (leader == null) leader = qb.s3.leader;
-        if (leader == null) leader = qb.s4.leader;
-        if (leader == null) leader = qb.s5.leader;
+        QuorumUtil qu = new QuorumUtil(2);
+        qu.startQuorum();
+                
         
-        Assert.assertNotNull(leader);
+        int index = 1;
+        while(qu.getPeer(index).peer.leader == null)
+            index++;
         
+        Leader leader = qu.getPeer(index).peer.leader;
         
-        int serverPort = qb.s1.getClientPort();
-        if(qb.s1.leader != null){
-            serverPort = qb.s2.getClientPort();
-        }
+        Assert.assertNotNull(leader);
+  
+        /*
+         * Reusing the index variable to select a follower to connect to
+         */
+        index = (index == 1) ? 2 : 1;
         
-        ZooKeeper zk = new DisconnectableZooKeeper("127.0.0.1:" + serverPort, 1000, new Watcher() {
+        ZooKeeper zk = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(index).peer.getClientPort(), 1000, new Watcher() {
             public void process(WatchedEvent event) {
         }});
 
@@ -328,13 +367,12 @@ public class QuorumTest extends QuorumBa
             }, null);
             
             if(i == 5000){
-                qb.shutdown(qb.s1);
+                qu.shutdown(index);
                 LOG.info("Shutting down s1");
             }
             if(i == 12000){
-                qb.setupServer(1);
-                qb.s1.start();
-                LOG.info("Setting up s1");
+                qu.start(index);
+                LOG.info("Setting up server: " + index);
             }
             if((i % 1000) == 0){
                 Thread.sleep(500);
@@ -345,10 +383,10 @@ public class QuorumTest extends QuorumBa
         sem.tryAcquire(15000, TimeUnit.MILLISECONDS);
         
         // Verify that server is following and has the same epoch as the leader
-        Assert.assertTrue("Not following", qb.s1.follower != null);
-        long epochF = (qb.s1.getActiveServer().getZxid() >> 32L);
+        Assert.assertTrue("Not following", qu.getPeer(index).peer.follower != null);
+        long epochF = (qu.getPeer(index).peer.getActiveServer().getZxid() >> 32L);
         long epochL = (leader.getEpoch() >> 32L);
-        Assert.assertTrue("Zxid: " + qb.s1.getActiveServer().getZxid() + 
+        Assert.assertTrue("Zxid: " + qu.getPeer(index).peer.getActiveServer().getZxid() + 
                 "Current epoch: " + epochF, epochF == epochL);
         
     }

Added: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java?rev=980576&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java (added)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java Thu Jul 29 21:10:36 2010
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.OperatingSystemMXBean;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.server.quorum.Election;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.junit.Assert;
+
+import com.sun.management.UnixOperatingSystemMXBean;
+
+/**
+ * Utility for quorum testing. Setups 2n+1 peers and allows to start/stop all
+ * peers, particular peer, n peers etc.
+ */
+public class QuorumUtil {
+
+    // TODO partitioning of peers and clients
+
+    // TODO refactor QuorumBase to be special case of this
+
+    private static final Logger LOG = Logger.getLogger(QuorumUtil.class);
+
+    public class PeerStruct {
+        public int id;
+        public QuorumPeer peer;
+        public File dataDir;
+        public int clientPort;
+    }
+
+    private final Map<Long, QuorumServer> peersView = new HashMap<Long, QuorumServer>();
+
+    private final Map<Integer, PeerStruct> peers = new HashMap<Integer, PeerStruct>();
+
+    private final int N;
+
+    private final int ALL;
+
+    private String hostPort;
+
+    private int tickTime;
+
+    private int initLimit;
+
+    private int syncLimit;
+
+    private int electionAlg;
+
+    /**
+     * Initializes 2n+1 quorum peers which will form a ZooKeeper ensemble.
+     *
+     * @param n
+     *            number of peers in the ensemble will be 2n+1
+     */
+    public QuorumUtil(int n) throws RuntimeException {
+        try {
+            ClientBase.setupTestEnv();
+            JMXEnv.setUp();
+
+            N = n;
+            ALL = 2 * N + 1;
+            tickTime = 2000;
+            initLimit = 3;
+            syncLimit = 3;
+            electionAlg = 3;
+            hostPort = "";
+
+            for (int i = 1; i <= ALL; ++i) {
+                PeerStruct ps = new PeerStruct();
+                ps.id = i;
+                ps.dataDir = ClientBase.createTmpDir();
+                ps.clientPort = PortAssignment.unique();
+                peers.put(i, ps);
+
+                peersView.put(Long.valueOf(i), new QuorumServer(i, new InetSocketAddress(
+                        "127.0.0.1", ps.clientPort + 1000), new InetSocketAddress("127.0.0.1",
+                        PortAssignment.unique() + 1000), LearnerType.PARTICIPANT));
+                hostPort += "127.0.0.1:" + ps.clientPort + ((i == ALL) ? "" : ",");
+            }
+            for (int i = 1; i <= ALL; ++i) {
+                PeerStruct ps = peers.get(i);
+                LOG.info("Creating QuorumPeer " + i + "; public port " + ps.clientPort);
+                ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort,
+                        electionAlg, ps.id, tickTime, initLimit, syncLimit);
+                Assert.assertEquals(ps.clientPort, ps.peer.getClientPort());
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public PeerStruct getPeer(int id) {
+        return peers.get(id);
+    }
+
+    public void startAll() throws IOException {
+        for (int i = 1; i <= ALL; ++i) {
+            start(i);
+            LOG.info("Started QuorumPeer " + i);
+        }
+
+        LOG.info("Checking ports " + hostPort);
+        for (String hp : hostPort.split(",")) {
+            Assert.assertTrue("waiting for server up", ClientBase.waitForServerUp(hp,
+                    ClientBase.CONNECTION_TIMEOUT));
+            LOG.info(hp + " is accepting client connections");
+        }
+
+        // interesting to see what's there...
+        try {
+            JMXEnv.dump();
+            // make sure we have all servers listed
+            Set<String> ensureNames = new LinkedHashSet<String>();
+            for (int i = 1; i <= ALL; ++i) {
+                ensureNames.add("InMemoryDataTree");
+            }
+            for (int i = 1; i <= ALL; ++i) {
+                ensureNames
+                        .add("name0=ReplicatedServer_id" + i + ",name1=replica." + i + ",name2=");
+            }
+            for (int i = 1; i <= ALL; ++i) {
+                for (int j = 1; j <= ALL; ++j) {
+                    ensureNames.add("name0=ReplicatedServer_id" + i + ",name1=replica." + j);
+                }
+            }
+            for (int i = 1; i <= ALL; ++i) {
+                ensureNames.add("name0=ReplicatedServer_id" + i);
+            }
+            JMXEnv.ensureAll(ensureNames.toArray(new String[ensureNames.size()]));
+        } catch (IOException e) {
+            LOG.warn("IOException during JMXEnv operation", e);
+        }
+    }
+
+    /**
+     * Start first N+1 peers.
+     */
+    public void startQuorum() throws IOException {
+        shutdownAll();
+        for (int i = 1; i <= N + 1; ++i) {
+            start(i);
+        }
+        for (int i = 1; i <= N + 1; ++i) {
+            Assert.assertTrue("Waiting for server up", ClientBase.waitForServerUp("127.0.0.1:"
+                    + getPeer(i).clientPort, ClientBase.CONNECTION_TIMEOUT));
+        }
+    }
+
+    public void start(int id) throws IOException {
+        PeerStruct ps = getPeer(id);
+        LOG.info("Creating QuorumPeer " + ps.id + "; public port " + ps.clientPort);
+        ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort, electionAlg,
+                ps.id, tickTime, initLimit, syncLimit);
+        Assert.assertEquals(ps.clientPort, ps.peer.getClientPort());
+
+        ps.peer.start();
+    }
+
+    public void shutdownAll() {
+        for (int i = 1; i <= ALL; ++i) {
+            shutdown(i);
+        }
+        for (String hp : hostPort.split(",")) {
+            Assert.assertTrue("Waiting for server down", ClientBase.waitForServerDown(hp,
+                    ClientBase.CONNECTION_TIMEOUT));
+            LOG.info(hp + " is no longer accepting client connections");
+        }
+    }
+
+    public void shutdown(int id) {
+        QuorumPeer qp = getPeer(id).peer;
+        try {
+            LOG.info("Shutting down quorum peer " + qp.getName());
+            qp.shutdown();
+            Election e = qp.getElectionAlg();
+            if (e != null) {
+                LOG.info("Shutting down leader election " + qp.getName());
+                e.shutdown();
+            } else {
+                LOG.info("No election available to shutdown " + qp.getName());
+            }
+            LOG.info("Waiting for " + qp.getName() + " to exit thread");
+            qp.join(30000);
+            if (qp.isAlive()) {
+                Assert.fail("QP failed to shutdown in 30 seconds: " + qp.getName());
+            }
+        } catch (InterruptedException e) {
+            LOG.debug("QP interrupted: " + qp.getName(), e);
+        }
+    }
+
+    public String getConnString() {
+        return hostPort;
+    }
+
+    public void tearDown() throws Exception {
+        LOG.info("TearDown started");
+
+        OperatingSystemMXBean osMbean = ManagementFactory.getOperatingSystemMXBean();
+        if (osMbean != null && osMbean instanceof UnixOperatingSystemMXBean) {
+            UnixOperatingSystemMXBean unixos = (UnixOperatingSystemMXBean) osMbean;
+            LOG.info("fdcount after test is: " + unixos.getOpenFileDescriptorCount());
+        }
+
+        shutdownAll();
+        JMXEnv.tearDown();
+    }
+}