You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ma...@apache.org on 2009/06/30 22:47:09 UTC

svn commit: r789943 - in /hadoop/zookeeper/trunk: ./ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/test/

Author: mahadev
Date: Tue Jun 30 20:47:09 2009
New Revision: 789943

URL: http://svn.apache.org/viewvc?rev=789943&view=rev
Log:
ZOOKEEPER-453. Worker is not removed in QuorumCnxManager upon crash. (flavio via mahadev)

Added:
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLERestartTest.java
Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=789943&r1=789942&r2=789943&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Tue Jun 30 20:47:09 2009
@@ -148,6 +148,9 @@
   ZOOKEEPER-450. emphemeral cleanup not happening with session timeout. (breed
 via mahadev)
 
+  ZOOKEEPER-453. Worker is not removed in QuorumCnxManager upon crash. (flavio
+via mahadev)
+
 IMPROVEMENTS:
   ZOOKEEPER-308. improve the atomic broadcast performance 3x.
   (breed via mahadev)

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java?rev=789943&r1=789942&r2=789943&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java Tue Jun 30 20:47:09 2009
@@ -402,6 +402,7 @@
     }
     
     public void shutdown(){
+        LOG.debug("Shutting down connection manager");
         manager.halt();
     }
 

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java?rev=789943&r1=789942&r2=789943&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java Tue Jun 30 20:47:09 2009
@@ -156,7 +156,7 @@
                         sock.connect(addr, self.tickTime * self.syncLimit);
                         sock.setTcpNoDelay(nodelay);
                         break;
-                    } catch (ConnectException e) {
+                    } catch (IOException e) {
                         if (tries == 4) {
                             LOG.error("Unexpected exception",e);
                             throw e;

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=789943&r1=789942&r2=789943&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java Tue Jun 30 20:47:09 2009
@@ -155,6 +155,8 @@
         	    if(vsw != null)
         	        vsw.finish();
         	    else LOG.error("No SendWorker for this identifier (" + sid + ")");
+        	} else {
+        	    LOG.error("Cannot open channel to server "  + sid);
         	}
 
         	if (!queueSendMap.containsKey(sid)) {
@@ -201,9 +203,19 @@
         //If wins the challenge, then close the new connection.
         if (sid < self.getId()) {
             try {
+                /*
+                 * This replica might still believe that the connection to sid
+                 * is up, so we have to shut down the workers before trying to
+                 * open a new connection.
+                 */
                 SendWorker sw = senderWorkerMap.get(sid);
-
-                LOG.info("Create new connection to server: " + sid);
+                if(sw != null)
+                    sw.finish();
+                
+                /*
+                 * Now we start a new connection
+                 */
+                LOG.debug("Create new connection to server: " + sid);
                 s.socket().close();
                 connectOne(sid);
                 
@@ -297,6 +309,7 @@
         if ((senderWorkerMap.get(sid) == null)) {
             SocketChannel channel;
             try {
+                LOG.debug("Opening channel to server "  + sid);
                 channel = SocketChannel
                         .open(self.quorumPeers.get(sid).electionAddr);
                 channel.socket().setTcpNoDelay(true);
@@ -304,6 +317,8 @@
             } catch (IOException e) {
                 LOG.warn("Cannot open channel to " + sid, e);
             }
+        } else {
+            LOG.error("There is a connection for server " + sid);
         }
     }
     
@@ -412,7 +427,7 @@
         Long sid;
         SocketChannel channel;
         RecvWorker recvWorker;
-        boolean running = true;
+        volatile boolean running = true;
 
         SendWorker(SocketChannel channel, Long sid) {
             this.sid = sid;
@@ -426,7 +441,7 @@
             this.recvWorker = recvWorker;
         }
 
-        boolean finish() {
+        synchronized boolean finish() {
             running = false;
 
             LOG.debug("Calling finish");
@@ -473,23 +488,20 @@
                      * message back to the beginning of the queue and leave.
                      */
                     LOG.warn("Exception when using channel: " + sid, e);
-                    running = false;
-                    synchronized (senderWorkerMap) {
-                        recvWorker.finish();
-                        recvWorker = null;
+                    finish();
+                    recvWorker.finish();
+                    recvWorker = null;
                     
-                        senderWorkerMap.remove(sid);
-                        ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
-                        if(bq != null){
-                            if (bq.size() == 0) {
-                                boolean ret = bq.offer(b);
-                                if (!ret) {
-                                    // to appease findbugs
-                                    LOG.error("Not able to add to a quue of size 0");
-                                }
+                    ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
+                    if(bq != null){
+                        if (bq.size() == 0) {
+                            boolean ret = bq.offer(b);
+                            if (!ret) {
+                                // to appease findbugs
+                                LOG.error("Not able to add to a quue of size 0");
                             }
-                        } else LOG.error("No queue for server " + sid);
-                    }
+                        }
+                    } else LOG.error("No queue for server " + sid);
                 }
             }
             LOG.warn("Send worker leaving thread");
@@ -503,14 +515,14 @@
     class RecvWorker extends Thread {
         Long sid;
         SocketChannel channel;
-        boolean running = true;
+        volatile boolean running = true;
 
         RecvWorker(SocketChannel channel, Long sid) {
             this.sid = sid;
             this.channel = channel;
         }
 
-        boolean finish() {
+        synchronized boolean finish() {
             running = false;
             this.interrupt();
             return running;
@@ -555,7 +567,6 @@
 
             } catch (IOException e) {
                 LOG.warn("Connection broken: ", e);
-
             } catch (InterruptedException e) {
                 LOG.warn("Interrupted while trying to add new "
                         + "message to the reception queue", e);

Added: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLERestartTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLERestartTest.java?rev=789943&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLERestartTest.java (added)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLERestartTest.java Tue Jun 30 20:47:09 2009
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.concurrent.Semaphore;
+
+
+import junit.framework.TestCase;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.server.quorum.FastLeaderElection;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.Vote;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+import org.junit.Test;
+
+public class FLERestartTest extends TestCase {
+    protected static final Logger LOG = Logger.getLogger(FLETest.class);
+
+    static class TestVote {
+        TestVote(int id, long leader) {
+            this.leader = leader;
+        }
+
+        long leader;
+    }
+
+    int countVotes(HashSet<TestVote> hs, long id) {
+        int counter = 0;
+        for(TestVote v : hs){
+            if(v.leader == id) counter++;
+        }
+
+        return counter;
+    }
+
+    int count;
+    int baseport;
+    int baseLEport;
+    HashMap<Long,QuorumServer> peers;
+    ArrayList<FLERestartThread> restartThreads;
+    HashMap<Integer, HashSet<TestVote> > voteMap;
+    File tmpdir[];
+    int port[];
+    int successCount;
+    Semaphore finish;
+    
+    volatile Vote votes[];
+    volatile boolean leaderDies;
+    volatile long leader = -1;
+    //volatile int round = 1;
+    Random rand = new Random();
+
+    @Override
+    public void setUp() throws Exception {
+        count = 3;
+        baseport= 33003;
+        baseLEport = 43003;
+
+        peers = new HashMap<Long,QuorumServer>(count);
+        restartThreads = new ArrayList<FLERestartThread>(count);
+        voteMap = new HashMap<Integer, HashSet<TestVote> >();
+        votes = new Vote[count];
+        tmpdir = new File[count];
+        port = new int[count];
+        successCount = 0;
+        finish = new Semaphore(0);
+
+        LOG.info("SetUp " + getName());
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        for(int i = 0; i < restartThreads.size(); i++) {
+            ((FastLeaderElection) restartThreads.get(i).peer.getElectionAlg()).shutdown();
+        }
+        LOG.info("FINISHED " + getName());
+    }
+
+    class FLERestartThread extends Thread {
+        int i;
+        QuorumPeer peer;
+        int peerRound = 0;
+
+        FLERestartThread(QuorumPeer peer, int i) {
+            this.i = i;
+            this.peer = peer;
+            LOG.info("Constructor: " + getName());
+        }
+        public void run() {
+            try {
+                Vote v = null;
+                while(true) {
+                    peer.setPeerState(ServerState.LOOKING);
+                    LOG.info("Going to call leader election again.");
+                    v = peer.getElectionAlg().lookForLeader();
+                    if(v == null){
+                        LOG.info("Thread " + i + " got a null vote");
+                        break;
+                    }
+
+                    /*
+                     * A real zookeeper would take care of setting the current vote. Here
+                     * we do it manually.
+                     */
+                    peer.setCurrentVote(v);
+
+                    LOG.info("Finished election: " + i + ", " + v.id);
+                    //votes[i] = v;
+
+                    switch(i){
+                    case 0:
+                        if(peerRound == 0){
+                            LOG.info("First peer, shutting it down");
+                            peer.shutdown();
+                            ((FastLeaderElection) restartThreads.get(i).peer.getElectionAlg()).shutdown();
+                            
+                            peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 2, 2, 2);
+                            peer.startLeaderElection();
+                            peerRound++;
+                        } else { 
+                            finish.release(2);
+                            return;
+                        }    
+                        
+                        break;
+                    case 1:
+                        LOG.info("Second entering case");
+                        finish.acquire();
+                        //if(threads.get(0).peer.getPeerState() == ServerState.LEADING ){
+                        LOG.info("Release");
+                        
+                        return;                   
+                    case 2:
+                        LOG.info("First peer, do nothing, just join");
+                        finish.acquire();
+                        //if(threads.get(0).peer.getPeerState() == ServerState.LEADING ){
+                        LOG.info("Release");
+
+                        return;
+                    }
+                }
+            } catch (Exception e){
+                e.printStackTrace();
+            }
+        }
+    }
+    
+    
+    @Test
+    public void testLERestart() throws Exception {
+
+        FastLeaderElection le[] = new FastLeaderElection[count];
+        leaderDies = true;
+        boolean allowOneBadLeader = leaderDies;
+
+        LOG.info("TestLE: " + getName()+ ", " + count);
+        for(int i = 0; i < count; i++) {
+            peers.put(Long.valueOf(i), new QuorumServer(i, new InetSocketAddress(baseport+100+i),
+                    new InetSocketAddress(baseLEport+100+i)));
+            tmpdir[i] = ClientBase.createTmpDir();
+            port[i] = baseport+i;
+        }
+
+        for(int i = 0; i < count; i++) {
+            QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 2, 2, 2);
+            peer.startLeaderElection();
+            FLERestartThread thread = new FLERestartThread(peer, i);
+            thread.start();
+            restartThreads.add(thread);
+        }
+        LOG.info("Started threads " + getName());
+        for(int i = 0; i < restartThreads.size(); i++) {
+            restartThreads.get(i).join(10000);
+            if (restartThreads.get(i).isAlive()) {
+                fail("Threads didn't join");
+            }
+
+        }
+    }
+}
\ No newline at end of file