You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ma...@apache.org on 2010/07/29 23:10:36 UTC
svn commit: r980576 - in /hadoop/zookeeper/trunk: ./
src/java/main/org/apache/zookeeper/server/
src/java/main/org/apache/zookeeper/server/quorum/
src/java/test/org/apache/zookeeper/test/
Author: mahadev
Date: Thu Jul 29 21:10:36 2010
New Revision: 980576
URL: http://svn.apache.org/viewvc?rev=980576&view=rev
Log:
ZOOKEEPER-790. Last processed zxid set prematurely while establishing leadership (flavio via mahadev)
Added:
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java
Modified:
hadoop/zookeeper/trunk/CHANGES.txt
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java
Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=980576&r1=980575&r2=980576&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Thu Jul 29 21:10:36 2010
@@ -72,6 +72,9 @@ BUGFIXES:
ZOOKEEPER-783. committedLog in ZKDatabase is not properly synchronized
(henry via mahadev)
+ ZOOKEEPER-790. Last processed zxid set prematurely while establishing
+ leadership (flavio via mahadev)
+
IMPROVEMENTS:
ZOOKEEPER-724. Improve junit test integration - log harness information
(phunt via mahadev)
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java?rev=980576&r1=980575&r2=980576&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java Thu Jul 29 21:10:36 2010
@@ -163,6 +163,7 @@ public class NIOServerCnxn implements Wa
public void startup(ZooKeeperServer zks) throws IOException,
InterruptedException {
start();
+ zks.startdata();
zks.startup();
setZooKeeperServer(zks);
}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java?rev=980576&r1=980575&r2=980576&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java Thu Jul 29 21:10:36 2010
@@ -357,14 +357,18 @@ public class ZooKeeperServer implements
}
}
- public void startup() throws IOException, InterruptedException {
+ public void startdata()
+ throws IOException, InterruptedException {
//check to see if zkDb is not null
if (zkDb == null) {
zkDb = new ZKDatabase(this.txnLogFactory);
- }
+ }
if (!zkDb.isInitialized()) {
loadData();
}
+ }
+
+ public void startup() {
createSessionTracker();
setupRequestProcessors();
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=980576&r1=980575&r2=980576&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java Thu Jul 29 21:10:36 2010
@@ -327,12 +327,6 @@ public class Leader {
self.tick++;
}
- if(LOG.isInfoEnabled()){
- LOG.info("Have quorum of supporters; starting up and setting last processed zxid: " + zk.getZxid());
- }
- zk.startup();
- zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());
-
if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
self.cnxnFactory.setZooKeeperServer(zk);
}
@@ -499,6 +493,11 @@ public class Leader {
return;
} else {
lastCommitted = zxid;
+ if(LOG.isInfoEnabled()){
+ LOG.info("Have quorum of supporters; starting up and setting last processed zxid: " + zk.getZxid());
+ }
+ zk.startup();
+ zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());
}
}
}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java?rev=980576&r1=980575&r2=980576&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java Thu Jul 29 21:10:36 2010
@@ -352,7 +352,29 @@ public class LearnerHandler extends Thre
}
}
}.start();
-
+
+ /*
+ * Have to wait for the first ACK, wait until
+ * the leader is ready, and only then we can
+ * start processing messages.
+ */
+ qp = new QuorumPacket();
+ ia.readRecord(qp, "packet");
+ if(qp.getType() != Leader.ACK){
+ LOG.error("Next packet was supposed to be an ACK");
+ return;
+ }
+ leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
+
+ /*
+ * Wait until leader starts up
+ */
+ synchronized(leader.zk){
+ while(!leader.zk.isRunning() && !this.isInterrupted()){
+ leader.zk.wait(20);
+ }
+ }
+
while (true) {
qp = new QuorumPacket();
ia.readRecord(qp, "packet");
@@ -475,6 +497,7 @@ public class LearnerHandler extends Thre
} catch (IOException e) {
LOG.warn("Ignoring unexpected exception during socket close", e);
}
+ this.interrupt();
leader.removeLearnerHandler(this);
}
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java?rev=980576&r1=980575&r2=980576&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java Thu Jul 29 21:10:36 2010
@@ -36,6 +36,7 @@ import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.quorum.Leader;
import org.apache.zookeeper.server.quorum.LearnerHandler;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -267,7 +268,42 @@ public class QuorumTest extends QuorumBa
}
zk.close();
}
-
+
+ /**
+ * See ZOOKEEPER-790 for details
+ * */
+ @Test
+ public void testFollowersStartAfterLeader() throws Exception {
+ QuorumUtil qu = new QuorumUtil(1);
+ CountdownWatcher watcher = new CountdownWatcher();
+ qu.startQuorum();
+
+ int index = 1;
+ while(qu.getPeer(index).peer.leader == null)
+ index++;
+
+ ZooKeeper zk = new ZooKeeper(
+ "127.0.0.1:" + qu.getPeer((index == 1)?2:1).peer.getClientPort(),
+ ClientBase.CONNECTION_TIMEOUT, watcher);
+ watcher.waitForConnected(CONNECTION_TIMEOUT);
+
+ // break the quorum
+ qu.shutdown(index);
+
+ // try to reestablish the quorum
+ qu.start(index);
+ Assert.assertTrue("quorum reestablishment failed",
+ QuorumBase.waitForServerUp(
+ "127.0.0.1:" + qu.getPeer(2).clientPort,
+ CONNECTION_TIMEOUT));
+ Thread.sleep(1000);
+
+ // zk should have reconnected already
+ zk.create("/test", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ zk.close();
+ }
+
/**
* Tests if closeSession can be logged before a leader gets established, which
* could lead to a locked-out follower (see ZOOKEEPER-790).
@@ -293,21 +329,24 @@ public class QuorumTest extends QuorumBa
throws IOException, InterruptedException, KeeperException{
final Semaphore sem = new Semaphore(0);
- Leader leader = qb.s1.leader;
- if (leader == null) leader = qb.s2.leader;
- if (leader == null) leader = qb.s3.leader;
- if (leader == null) leader = qb.s4.leader;
- if (leader == null) leader = qb.s5.leader;
+ QuorumUtil qu = new QuorumUtil(2);
+ qu.startQuorum();
+
- Assert.assertNotNull(leader);
+ int index = 1;
+ while(qu.getPeer(index).peer.leader == null)
+ index++;
+ Leader leader = qu.getPeer(index).peer.leader;
- int serverPort = qb.s1.getClientPort();
- if(qb.s1.leader != null){
- serverPort = qb.s2.getClientPort();
- }
+ Assert.assertNotNull(leader);
+
+ /*
+ * Reusing the index variable to select a follower to connect to
+ */
+ index = (index == 1) ? 2 : 1;
- ZooKeeper zk = new DisconnectableZooKeeper("127.0.0.1:" + serverPort, 1000, new Watcher() {
+ ZooKeeper zk = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(index).peer.getClientPort(), 1000, new Watcher() {
public void process(WatchedEvent event) {
}});
@@ -328,13 +367,12 @@ public class QuorumTest extends QuorumBa
}, null);
if(i == 5000){
- qb.shutdown(qb.s1);
+ qu.shutdown(index);
LOG.info("Shutting down s1");
}
if(i == 12000){
- qb.setupServer(1);
- qb.s1.start();
- LOG.info("Setting up s1");
+ qu.start(index);
+ LOG.info("Setting up server: " + index);
}
if((i % 1000) == 0){
Thread.sleep(500);
@@ -345,10 +383,10 @@ public class QuorumTest extends QuorumBa
sem.tryAcquire(15000, TimeUnit.MILLISECONDS);
// Verify that server is following and has the same epoch as the leader
- Assert.assertTrue("Not following", qb.s1.follower != null);
- long epochF = (qb.s1.getActiveServer().getZxid() >> 32L);
+ Assert.assertTrue("Not following", qu.getPeer(index).peer.follower != null);
+ long epochF = (qu.getPeer(index).peer.getActiveServer().getZxid() >> 32L);
long epochL = (leader.getEpoch() >> 32L);
- Assert.assertTrue("Zxid: " + qb.s1.getActiveServer().getZxid() +
+ Assert.assertTrue("Zxid: " + qu.getPeer(index).peer.getActiveServer().getZxid() +
"Current epoch: " + epochF, epochF == epochL);
}
Added: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java?rev=980576&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java (added)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java Thu Jul 29 21:10:36 2010
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.OperatingSystemMXBean;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.server.quorum.Election;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.junit.Assert;
+
+import com.sun.management.UnixOperatingSystemMXBean;
+
+/**
+ * Utility for quorum testing. Setups 2n+1 peers and allows to start/stop all
+ * peers, particular peer, n peers etc.
+ */
+public class QuorumUtil {
+
+ // TODO partitioning of peers and clients
+
+ // TODO refactor QuorumBase to be special case of this
+
+ private static final Logger LOG = Logger.getLogger(QuorumUtil.class);
+
+ public class PeerStruct {
+ public int id;
+ public QuorumPeer peer;
+ public File dataDir;
+ public int clientPort;
+ }
+
+ private final Map<Long, QuorumServer> peersView = new HashMap<Long, QuorumServer>();
+
+ private final Map<Integer, PeerStruct> peers = new HashMap<Integer, PeerStruct>();
+
+ private final int N;
+
+ private final int ALL;
+
+ private String hostPort;
+
+ private int tickTime;
+
+ private int initLimit;
+
+ private int syncLimit;
+
+ private int electionAlg;
+
+ /**
+ * Initializes 2n+1 quorum peers which will form a ZooKeeper ensemble.
+ *
+ * @param n
+ * number of peers in the ensemble will be 2n+1
+ */
+ public QuorumUtil(int n) throws RuntimeException {
+ try {
+ ClientBase.setupTestEnv();
+ JMXEnv.setUp();
+
+ N = n;
+ ALL = 2 * N + 1;
+ tickTime = 2000;
+ initLimit = 3;
+ syncLimit = 3;
+ electionAlg = 3;
+ hostPort = "";
+
+ for (int i = 1; i <= ALL; ++i) {
+ PeerStruct ps = new PeerStruct();
+ ps.id = i;
+ ps.dataDir = ClientBase.createTmpDir();
+ ps.clientPort = PortAssignment.unique();
+ peers.put(i, ps);
+
+ peersView.put(Long.valueOf(i), new QuorumServer(i, new InetSocketAddress(
+ "127.0.0.1", ps.clientPort + 1000), new InetSocketAddress("127.0.0.1",
+ PortAssignment.unique() + 1000), LearnerType.PARTICIPANT));
+ hostPort += "127.0.0.1:" + ps.clientPort + ((i == ALL) ? "" : ",");
+ }
+ for (int i = 1; i <= ALL; ++i) {
+ PeerStruct ps = peers.get(i);
+ LOG.info("Creating QuorumPeer " + i + "; public port " + ps.clientPort);
+ ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort,
+ electionAlg, ps.id, tickTime, initLimit, syncLimit);
+ Assert.assertEquals(ps.clientPort, ps.peer.getClientPort());
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public PeerStruct getPeer(int id) {
+ return peers.get(id);
+ }
+
+ public void startAll() throws IOException {
+ for (int i = 1; i <= ALL; ++i) {
+ start(i);
+ LOG.info("Started QuorumPeer " + i);
+ }
+
+ LOG.info("Checking ports " + hostPort);
+ for (String hp : hostPort.split(",")) {
+ Assert.assertTrue("waiting for server up", ClientBase.waitForServerUp(hp,
+ ClientBase.CONNECTION_TIMEOUT));
+ LOG.info(hp + " is accepting client connections");
+ }
+
+ // interesting to see what's there...
+ try {
+ JMXEnv.dump();
+ // make sure we have all servers listed
+ Set<String> ensureNames = new LinkedHashSet<String>();
+ for (int i = 1; i <= ALL; ++i) {
+ ensureNames.add("InMemoryDataTree");
+ }
+ for (int i = 1; i <= ALL; ++i) {
+ ensureNames
+ .add("name0=ReplicatedServer_id" + i + ",name1=replica." + i + ",name2=");
+ }
+ for (int i = 1; i <= ALL; ++i) {
+ for (int j = 1; j <= ALL; ++j) {
+ ensureNames.add("name0=ReplicatedServer_id" + i + ",name1=replica." + j);
+ }
+ }
+ for (int i = 1; i <= ALL; ++i) {
+ ensureNames.add("name0=ReplicatedServer_id" + i);
+ }
+ JMXEnv.ensureAll(ensureNames.toArray(new String[ensureNames.size()]));
+ } catch (IOException e) {
+ LOG.warn("IOException during JMXEnv operation", e);
+ }
+ }
+
+ /**
+ * Start first N+1 peers.
+ */
+ public void startQuorum() throws IOException {
+ shutdownAll();
+ for (int i = 1; i <= N + 1; ++i) {
+ start(i);
+ }
+ for (int i = 1; i <= N + 1; ++i) {
+ Assert.assertTrue("Waiting for server up", ClientBase.waitForServerUp("127.0.0.1:"
+ + getPeer(i).clientPort, ClientBase.CONNECTION_TIMEOUT));
+ }
+ }
+
+ public void start(int id) throws IOException {
+ PeerStruct ps = getPeer(id);
+ LOG.info("Creating QuorumPeer " + ps.id + "; public port " + ps.clientPort);
+ ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort, electionAlg,
+ ps.id, tickTime, initLimit, syncLimit);
+ Assert.assertEquals(ps.clientPort, ps.peer.getClientPort());
+
+ ps.peer.start();
+ }
+
+ public void shutdownAll() {
+ for (int i = 1; i <= ALL; ++i) {
+ shutdown(i);
+ }
+ for (String hp : hostPort.split(",")) {
+ Assert.assertTrue("Waiting for server down", ClientBase.waitForServerDown(hp,
+ ClientBase.CONNECTION_TIMEOUT));
+ LOG.info(hp + " is no longer accepting client connections");
+ }
+ }
+
+ public void shutdown(int id) {
+ QuorumPeer qp = getPeer(id).peer;
+ try {
+ LOG.info("Shutting down quorum peer " + qp.getName());
+ qp.shutdown();
+ Election e = qp.getElectionAlg();
+ if (e != null) {
+ LOG.info("Shutting down leader election " + qp.getName());
+ e.shutdown();
+ } else {
+ LOG.info("No election available to shutdown " + qp.getName());
+ }
+ LOG.info("Waiting for " + qp.getName() + " to exit thread");
+ qp.join(30000);
+ if (qp.isAlive()) {
+ Assert.fail("QP failed to shutdown in 30 seconds: " + qp.getName());
+ }
+ } catch (InterruptedException e) {
+ LOG.debug("QP interrupted: " + qp.getName(), e);
+ }
+ }
+
+ public String getConnString() {
+ return hostPort;
+ }
+
+ public void tearDown() throws Exception {
+ LOG.info("TearDown started");
+
+ OperatingSystemMXBean osMbean = ManagementFactory.getOperatingSystemMXBean();
+ if (osMbean != null && osMbean instanceof UnixOperatingSystemMXBean) {
+ UnixOperatingSystemMXBean unixos = (UnixOperatingSystemMXBean) osMbean;
+ LOG.info("fdcount after test is: " + unixos.getOpenFileDescriptorCount());
+ }
+
+ shutdownAll();
+ JMXEnv.tearDown();
+ }
+}