You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ma...@apache.org on 2009/06/30 22:47:09 UTC
svn commit: r789943 - in /hadoop/zookeeper/trunk: ./
src/java/main/org/apache/zookeeper/server/quorum/
src/java/test/org/apache/zookeeper/test/
Author: mahadev
Date: Tue Jun 30 20:47:09 2009
New Revision: 789943
URL: http://svn.apache.org/viewvc?rev=789943&view=rev
Log:
ZOOKEEPER-453. Worker is not removed in QuorumCnxManager upon crash. (flavio via mahadev)
Added:
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLERestartTest.java
Modified:
hadoop/zookeeper/trunk/CHANGES.txt
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=789943&r1=789942&r2=789943&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Tue Jun 30 20:47:09 2009
@@ -148,6 +148,9 @@
ZOOKEEPER-450. emphemeral cleanup not happening with session timeout. (breed
via mahadev)
+ ZOOKEEPER-453. Worker is not removed in QuorumCnxManager upon crash. (flavio
+via mahadev)
+
IMPROVEMENTS:
ZOOKEEPER-308. improve the atomic broadcast performance 3x.
(breed via mahadev)
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java?rev=789943&r1=789942&r2=789943&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java Tue Jun 30 20:47:09 2009
@@ -402,6 +402,7 @@
}
public void shutdown(){
+ LOG.debug("Shutting down connection manager");
manager.halt();
}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java?rev=789943&r1=789942&r2=789943&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java Tue Jun 30 20:47:09 2009
@@ -156,7 +156,7 @@
sock.connect(addr, self.tickTime * self.syncLimit);
sock.setTcpNoDelay(nodelay);
break;
- } catch (ConnectException e) {
+ } catch (IOException e) {
if (tries == 4) {
LOG.error("Unexpected exception",e);
throw e;
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=789943&r1=789942&r2=789943&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java Tue Jun 30 20:47:09 2009
@@ -155,6 +155,8 @@
if(vsw != null)
vsw.finish();
else LOG.error("No SendWorker for this identifier (" + sid + ")");
+ } else {
+ LOG.error("Cannot open channel to server " + sid);
}
if (!queueSendMap.containsKey(sid)) {
@@ -201,9 +203,19 @@
//If wins the challenge, then close the new connection.
if (sid < self.getId()) {
try {
+ /*
+ * This replica might still believe that the connection to sid
+ * is up, so we have to shut down the workers before trying to
+ * open a new connection.
+ */
SendWorker sw = senderWorkerMap.get(sid);
-
- LOG.info("Create new connection to server: " + sid);
+ if(sw != null)
+ sw.finish();
+
+ /*
+ * Now we start a new connection
+ */
+ LOG.debug("Create new connection to server: " + sid);
s.socket().close();
connectOne(sid);
@@ -297,6 +309,7 @@
if ((senderWorkerMap.get(sid) == null)) {
SocketChannel channel;
try {
+ LOG.debug("Opening channel to server " + sid);
channel = SocketChannel
.open(self.quorumPeers.get(sid).electionAddr);
channel.socket().setTcpNoDelay(true);
@@ -304,6 +317,8 @@
} catch (IOException e) {
LOG.warn("Cannot open channel to " + sid, e);
}
+ } else {
+ LOG.error("There is a connection for server " + sid);
}
}
@@ -412,7 +427,7 @@
Long sid;
SocketChannel channel;
RecvWorker recvWorker;
- boolean running = true;
+ volatile boolean running = true;
SendWorker(SocketChannel channel, Long sid) {
this.sid = sid;
@@ -426,7 +441,7 @@
this.recvWorker = recvWorker;
}
- boolean finish() {
+ synchronized boolean finish() {
running = false;
LOG.debug("Calling finish");
@@ -473,23 +488,20 @@
* message back to the beginning of the queue and leave.
*/
LOG.warn("Exception when using channel: " + sid, e);
- running = false;
- synchronized (senderWorkerMap) {
- recvWorker.finish();
- recvWorker = null;
+ finish();
+ recvWorker.finish();
+ recvWorker = null;
- senderWorkerMap.remove(sid);
- ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
- if(bq != null){
- if (bq.size() == 0) {
- boolean ret = bq.offer(b);
- if (!ret) {
- // to appease findbugs
- LOG.error("Not able to add to a quue of size 0");
- }
+ ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
+ if(bq != null){
+ if (bq.size() == 0) {
+ boolean ret = bq.offer(b);
+ if (!ret) {
+ // to appease findbugs
+ LOG.error("Not able to add to a quue of size 0");
}
- } else LOG.error("No queue for server " + sid);
- }
+ }
+ } else LOG.error("No queue for server " + sid);
}
}
LOG.warn("Send worker leaving thread");
@@ -503,14 +515,14 @@
class RecvWorker extends Thread {
Long sid;
SocketChannel channel;
- boolean running = true;
+ volatile boolean running = true;
RecvWorker(SocketChannel channel, Long sid) {
this.sid = sid;
this.channel = channel;
}
- boolean finish() {
+ synchronized boolean finish() {
running = false;
this.interrupt();
return running;
@@ -555,7 +567,6 @@
} catch (IOException e) {
LOG.warn("Connection broken: ", e);
-
} catch (InterruptedException e) {
LOG.warn("Interrupted while trying to add new "
+ "message to the reception queue", e);
Added: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLERestartTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLERestartTest.java?rev=789943&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLERestartTest.java (added)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLERestartTest.java Tue Jun 30 20:47:09 2009
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.concurrent.Semaphore;
+
+
+import junit.framework.TestCase;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.server.quorum.FastLeaderElection;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.Vote;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+import org.junit.Test;
+
+public class FLERestartTest extends TestCase {
+ protected static final Logger LOG = Logger.getLogger(FLETest.class);
+
+ static class TestVote {
+ TestVote(int id, long leader) {
+ this.leader = leader;
+ }
+
+ long leader;
+ }
+
+ int countVotes(HashSet<TestVote> hs, long id) {
+ int counter = 0;
+ for(TestVote v : hs){
+ if(v.leader == id) counter++;
+ }
+
+ return counter;
+ }
+
+ int count;
+ int baseport;
+ int baseLEport;
+ HashMap<Long,QuorumServer> peers;
+ ArrayList<FLERestartThread> restartThreads;
+ HashMap<Integer, HashSet<TestVote> > voteMap;
+ File tmpdir[];
+ int port[];
+ int successCount;
+ Semaphore finish;
+
+ volatile Vote votes[];
+ volatile boolean leaderDies;
+ volatile long leader = -1;
+ //volatile int round = 1;
+ Random rand = new Random();
+
+ @Override
+ public void setUp() throws Exception {
+ count = 3;
+ baseport= 33003;
+ baseLEport = 43003;
+
+ peers = new HashMap<Long,QuorumServer>(count);
+ restartThreads = new ArrayList<FLERestartThread>(count);
+ voteMap = new HashMap<Integer, HashSet<TestVote> >();
+ votes = new Vote[count];
+ tmpdir = new File[count];
+ port = new int[count];
+ successCount = 0;
+ finish = new Semaphore(0);
+
+ LOG.info("SetUp " + getName());
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ for(int i = 0; i < restartThreads.size(); i++) {
+ ((FastLeaderElection) restartThreads.get(i).peer.getElectionAlg()).shutdown();
+ }
+ LOG.info("FINISHED " + getName());
+ }
+
+ class FLERestartThread extends Thread {
+ int i;
+ QuorumPeer peer;
+ int peerRound = 0;
+
+ FLERestartThread(QuorumPeer peer, int i) {
+ this.i = i;
+ this.peer = peer;
+ LOG.info("Constructor: " + getName());
+ }
+ public void run() {
+ try {
+ Vote v = null;
+ while(true) {
+ peer.setPeerState(ServerState.LOOKING);
+ LOG.info("Going to call leader election again.");
+ v = peer.getElectionAlg().lookForLeader();
+ if(v == null){
+ LOG.info("Thread " + i + " got a null vote");
+ break;
+ }
+
+ /*
+ * A real zookeeper would take care of setting the current vote. Here
+ * we do it manually.
+ */
+ peer.setCurrentVote(v);
+
+ LOG.info("Finished election: " + i + ", " + v.id);
+ //votes[i] = v;
+
+ switch(i){
+ case 0:
+ if(peerRound == 0){
+ LOG.info("First peer, shutting it down");
+ peer.shutdown();
+ ((FastLeaderElection) restartThreads.get(i).peer.getElectionAlg()).shutdown();
+
+ peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 2, 2, 2);
+ peer.startLeaderElection();
+ peerRound++;
+ } else {
+ finish.release(2);
+ return;
+ }
+
+ break;
+ case 1:
+ LOG.info("Second entering case");
+ finish.acquire();
+ //if(threads.get(0).peer.getPeerState() == ServerState.LEADING ){
+ LOG.info("Release");
+
+ return;
+ case 2:
+ LOG.info("First peer, do nothing, just join");
+ finish.acquire();
+ //if(threads.get(0).peer.getPeerState() == ServerState.LEADING ){
+ LOG.info("Release");
+
+ return;
+ }
+ }
+ } catch (Exception e){
+ e.printStackTrace();
+ }
+ }
+ }
+
+
+ @Test
+ public void testLERestart() throws Exception {
+
+ FastLeaderElection le[] = new FastLeaderElection[count];
+ leaderDies = true;
+ boolean allowOneBadLeader = leaderDies;
+
+ LOG.info("TestLE: " + getName()+ ", " + count);
+ for(int i = 0; i < count; i++) {
+ peers.put(Long.valueOf(i), new QuorumServer(i, new InetSocketAddress(baseport+100+i),
+ new InetSocketAddress(baseLEport+100+i)));
+ tmpdir[i] = ClientBase.createTmpDir();
+ port[i] = baseport+i;
+ }
+
+ for(int i = 0; i < count; i++) {
+ QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 2, 2, 2);
+ peer.startLeaderElection();
+ FLERestartThread thread = new FLERestartThread(peer, i);
+ thread.start();
+ restartThreads.add(thread);
+ }
+ LOG.info("Started threads " + getName());
+ for(int i = 0; i < restartThreads.size(); i++) {
+ restartThreads.get(i).join(10000);
+ if (restartThreads.get(i).isAlive()) {
+ fail("Threads didn't join");
+ }
+
+ }
+ }
+}
\ No newline at end of file