You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by th...@apache.org on 2013/10/07 21:08:56 UTC
svn commit: r1530029 - in /zookeeper/trunk: ./
src/java/main/org/apache/zookeeper/server/quorum/
src/java/test/org/apache/zookeeper/server/quorum/
Author: thawan
Date: Mon Oct 7 19:08:55 2013
New Revision: 1530029
URL: http://svn.apache.org/r1530029
Log:
ZOOKEEPER-1551. Observers ignore txns that come after snapshot and UPTODATE (thawan, fpj via thawan)
Modified:
zookeeper/trunk/CHANGES.txt
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1530029&r1=1530028&r2=1530029&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Mon Oct 7 19:08:55 2013
@@ -426,6 +426,9 @@ BUGFIXES:
ZOOKEEPER-1778. Use static final Logger objects (Rakesh R via michim)
+ ZOOKEEPER-1551. Observers ignore txns that come after snapshot and UPTODATE
+ (thawan, fpj via thawan)
+
IMPROVEMENTS:
ZOOKEEPER-1170. Fix compiler (eclipse) warnings: unused imports,
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=1530029&r1=1530028&r2=1530029&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java Mon Oct 7 19:08:55 2013
@@ -1110,13 +1110,16 @@ public class Leader {
.getZxid(), null, null);
handler.queuePacket(qp);
}
- List<Long>zxids = new ArrayList<Long>(outstandingProposals.keySet());
- Collections.sort(zxids);
- for (Long zxid: zxids) {
- if (zxid <= lastSeenZxid) {
- continue;
+ // Only participant need to get outstanding proposals
+ if (handler.getLearnerType() == LearnerType.PARTICIPANT) {
+ List<Long>zxids = new ArrayList<Long>(outstandingProposals.keySet());
+ Collections.sort(zxids);
+ for (Long zxid: zxids) {
+ if (zxid <= lastSeenZxid) {
+ continue;
+ }
+ handler.queuePacket(outstandingProposals.get(zxid).packet);
}
- handler.queuePacket(outstandingProposals.get(zxid).packet);
}
}
if (handler.getLearnerType() == LearnerType.PARTICIPANT) {
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java?rev=1530029&r1=1530028&r2=1530029&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java Mon Oct 7 19:08:55 2013
@@ -418,25 +418,42 @@ public class Learner {
}
break;
case Leader.INFORM:
- case Leader.INFORMANDACTIVATE:
- TxnHeader hdr = new TxnHeader();
- Record txn;
+ case Leader.INFORMANDACTIVATE:
+ PacketInFlight packet = new PacketInFlight();
+ packet.hdr = new TxnHeader();
+
if (qp.getType() == Leader.COMMITANDACTIVATE) {
- ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
- long suggestedLeaderId = buffer.getLong();
+ ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
+ long suggestedLeaderId = buffer.getLong();
byte[] remainingdata = new byte[buffer.remaining()];
buffer.get(remainingdata);
- txn = SerializeUtils.deserializeTxn(remainingdata, hdr);
- QuorumVerifier qv = self.configFromString(new String(((SetDataTxn)txn).getData()));
- boolean majorChange =
- self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
+ packet.rec = SerializeUtils.deserializeTxn(remainingdata, packet.hdr);
+ QuorumVerifier qv = self.configFromString(new String(((SetDataTxn)packet.rec).getData()));
+ boolean majorChange =
+ self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
if (majorChange) {
- throw new Exception("changes proposed in reconfig");
+ throw new Exception("changes proposed in reconfig");
+ }
+ } else {
+ packet.rec = SerializeUtils.deserializeTxn(qp.getData(), packet.hdr);
+ // Log warning message if txn comes out-of-order
+ if (packet.hdr.getZxid() != lastQueued + 1) {
+ LOG.warn("Got zxid 0x"
+ + Long.toHexString(packet.hdr.getZxid())
+ + " expected 0x"
+ + Long.toHexString(lastQueued + 1));
}
+ lastQueued = packet.hdr.getZxid();
+ }
+
+ if (!snapshotTaken) {
+ // Apply to db directly if we haven't taken the snapshot
+ zk.processTxn(packet.hdr, packet.rec);
} else {
- txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
+ packetsNotCommitted.add(packet);
+ packetsCommitted.add(qp.getZxid());
}
- zk.processTxn(hdr, txn);
+
break;
case Leader.UPTODATE:
LOG.info("Learner received UPTODATE message");
@@ -486,6 +503,30 @@ public class Learner {
for(Long zxid: packetsCommitted) {
fzk.commit(zxid);
}
+ } else if (zk instanceof ObserverZooKeeperServer) {
+ // Similar to follower, we need to log requests between the snapshot
+ // and UPTODATE
+ ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk;
+ for (PacketInFlight p : packetsNotCommitted) {
+ Long zxid = packetsCommitted.peekFirst();
+ if (p.hdr.getZxid() != zxid) {
+ // log warning message if there is no matching commit
+ // old leader send outstanding proposal to observer
+ LOG.warn("Committing " + Long.toHexString(zxid)
+ + ", but next proposal is "
+ + Long.toHexString(p.hdr.getZxid()));
+ continue;
+ }
+ packetsCommitted.remove();
+ Request request = new Request(null, p.hdr.getClientId(),
+ p.hdr.getCxid(), p.hdr.getType(), null, null);
+ request.setTxn(p.rec);
+ request.setHdr(p.hdr);
+ ozk.commitRequest(request);
+ }
+ } else {
+ // New server type need to handle in-flight packets
+ throw new UnsupportedOperationException("Unknown server type");
}
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=1530029&r1=1530028&r2=1530029&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Mon Oct 7 19:08:55 2013
@@ -407,7 +407,7 @@ public class QuorumPeer extends Thread i
* Enables/Disables sync request processor. This option is enabled
* by default and is to be used with observers.
*/
- protected boolean syncEnabled;
+ protected boolean syncEnabled = true;
/**
* The current tick
Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java?rev=1530029&r1=1530028&r2=1530029&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java Mon Oct 7 19:08:55 2013
@@ -336,6 +336,10 @@ public class Zab1_0Test {
void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f) throws Exception;
}
+ static public interface ObserverConversation {
+ void converseWithObserver(InputArchive ia, OutputArchive oa, Observer o) throws Exception;
+ }
+
public void testLeaderConversation(LeaderConversation conversation) throws Exception {
Socket pair[] = getSocketPair();
Socket leaderSocket = pair[0];
@@ -500,6 +504,57 @@ public class Zab1_0Test {
}
}
+ public void testObserverConversation(ObserverConversation conversation) throws Exception {
+ File tmpDir = File.createTempFile("test", "dir");
+ tmpDir.delete();
+ tmpDir.mkdir();
+ Thread observerThread = null;
+ ConversableObserver observer = null;
+ QuorumPeer peer = null;
+ try {
+ peer = createQuorumPeer(tmpDir);
+ peer.setSyncEnabled(true);
+ observer = createObserver(tmpDir, peer);
+ peer.observer = observer;
+
+ ServerSocket ss = new ServerSocket();
+ ss.bind(null);
+ observer.setLeaderSocketAddress((InetSocketAddress)ss.getLocalSocketAddress());
+ final Observer observerForThread = observer;
+
+ observerThread = new Thread() {
+ public void run() {
+ try {
+ observerForThread.observeLeader();
+ } catch(Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+ observerThread.start();
+ Socket leaderSocket = ss.accept();
+
+ InputArchive ia = BinaryInputArchive.getArchive(leaderSocket
+ .getInputStream());
+ OutputArchive oa = BinaryOutputArchive.getArchive(leaderSocket
+ .getOutputStream());
+
+ conversation.converseWithObserver(ia, oa, observer);
+ } finally {
+ if (observer != null) {
+ observer.shutdown();
+ }
+ if (observerThread != null) {
+ observerThread.interrupt();
+ observerThread.join();
+ }
+ if (peer != null) {
+ peer.shutdown();
+ }
+ recursiveDelete(tmpDir);
+ }
+ }
+
@Test
public void testUnnecessarySnap() throws Exception {
testPopulatedLeaderConversation(new PopulatedLeaderConversation() {
@@ -540,6 +595,29 @@ public class Zab1_0Test {
}, 2);
}
+ // We want to track the change with a callback rather than depending on timing
+ class TrackerWatcher implements Watcher {
+ boolean changed;
+ synchronized void waitForChange() throws InterruptedException {
+ while(!changed) {
+ wait();
+ }
+ }
+ @Override
+ public void process(WatchedEvent event) {
+ if (event.getType() == EventType.NodeDataChanged) {
+ synchronized(this) {
+ changed = true;
+ notifyAll();
+ }
+ }
+ }
+ synchronized public boolean changed() {
+ return changed;
+ }
+
+ };
+
@Test
public void testNormalFollowerRun() throws Exception {
testFollowerConversation(new FollowerConversation() {
@@ -617,28 +695,6 @@ public class Zab1_0Test {
proposeSetData(qp, proposalZxid, "data2", 2);
oa.writeRecord(qp, null);
- // We want to track the change with a callback rather than depending on timing
- class TrackerWatcher implements Watcher {
- boolean changed;
- synchronized void waitForChange() throws InterruptedException {
- while(!changed) {
- wait();
- }
- }
- @Override
- public void process(WatchedEvent event) {
- if (event.getType() == EventType.NodeDataChanged) {
- synchronized(this) {
- changed = true;
- notifyAll();
- }
- }
- }
- synchronized public boolean changed() {
- return changed;
- }
-
- };
TrackerWatcher watcher = new TrackerWatcher();
// The change should not have happened yet, since we haven't committed
@@ -916,6 +972,157 @@ public class Zab1_0Test {
}
@Test
+ public void testNormalObserverRun() throws Exception {
+ testObserverConversation(new ObserverConversation() {
+ @Override
+ public void converseWithObserver(InputArchive ia, OutputArchive oa,
+ Observer o) throws Exception {
+ File tmpDir = File.createTempFile("test", "dir");
+ tmpDir.delete();
+ tmpDir.mkdir();
+ File logDir = o.zk.getTxnLogFactory().getDataDir().getParentFile();
+ File snapDir = o.zk.getTxnLogFactory().getSnapDir().getParentFile();
+ try {
+ Assert.assertEquals(0, o.self.getAcceptedEpoch());
+ Assert.assertEquals(0, o.self.getCurrentEpoch());
+
+ // Setup a database with a single /foo node
+ ZKDatabase zkDb = new ZKDatabase(new FileTxnSnapLog(tmpDir, tmpDir));
+ final long foo1Zxid = ZxidUtils.makeZxid(1, 1);
+ final long foo2Zxid = ZxidUtils.makeZxid(1, 2);
+ zkDb.processTxn(new TxnHeader(13, 1313, foo1Zxid, 33,
+ ZooDefs.OpCode.create), new CreateTxn("/foo1",
+ "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ false, 1));
+ zkDb.processTxn(new TxnHeader(13, 1313, foo2Zxid, 33,
+ ZooDefs.OpCode.create), new CreateTxn("/foo2",
+ "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ false, 1));
+ Stat stat = new Stat();
+ Assert.assertEquals("data1",
+ new String(zkDb.getData("/foo1", stat, null)));
+ Assert.assertEquals("data1",
+ new String(zkDb.getData("/foo2", stat, null)));
+
+ QuorumPacket qp = new QuorumPacket();
+ readPacketSkippingPing(ia, qp);
+ Assert.assertEquals(Leader.OBSERVERINFO, qp.getType());
+ Assert.assertEquals(qp.getZxid(), 0);
+ LearnerInfo learnInfo = new LearnerInfo();
+ ByteBufferInputStream.byteBuffer2Record(
+ ByteBuffer.wrap(qp.getData()), learnInfo);
+ Assert.assertEquals(learnInfo.getProtocolVersion(), 0x10000);
+ Assert.assertEquals(learnInfo.getServerid(), 0);
+
+ // We are simulating an established leader, so the epoch is 1
+ qp.setType(Leader.LEADERINFO);
+ qp.setZxid(ZxidUtils.makeZxid(1, 0));
+ byte protoBytes[] = new byte[4];
+ ByteBuffer.wrap(protoBytes).putInt(0x10000);
+ qp.setData(protoBytes);
+ oa.writeRecord(qp, null);
+
+ readPacketSkippingPing(ia, qp);
+ Assert.assertEquals(Leader.ACKEPOCH, qp.getType());
+ Assert.assertEquals(0, qp.getZxid());
+ Assert.assertEquals(ZxidUtils.makeZxid(0, 0), ByteBuffer
+ .wrap(qp.getData()).getInt());
+ Assert.assertEquals(1, o.self.getAcceptedEpoch());
+ Assert.assertEquals(0, o.self.getCurrentEpoch());
+
+ // Send the snapshot we created earlier
+ qp.setType(Leader.SNAP);
+ qp.setData(new byte[0]);
+ qp.setZxid(zkDb.getDataTreeLastProcessedZxid());
+ oa.writeRecord(qp, null);
+ zkDb.serializeSnapshot(oa);
+ oa.writeString("BenWasHere", null);
+ qp.setType(Leader.NEWLEADER);
+ qp.setZxid(ZxidUtils.makeZxid(1, 0));
+ oa.writeRecord(qp, null);
+
+ // Get the ack of the new leader
+ readPacketSkippingPing(ia, qp);
+ Assert.assertEquals(Leader.ACK, qp.getType());
+ Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
+ Assert.assertEquals(1, o.self.getAcceptedEpoch());
+ Assert.assertEquals(1, o.self.getCurrentEpoch());
+
+ Assert.assertEquals(foo2Zxid, o.zk.getLastProcessedZxid());
+
+ // Make sure the data was recorded in the filesystem ok
+ ZKDatabase zkDb2 = new ZKDatabase(new FileTxnSnapLog(
+ logDir, snapDir));
+ long lastZxid = zkDb2.loadDataBase();
+ Assert.assertEquals("data1",
+ new String(zkDb2.getData("/foo1", stat, null)));
+ Assert.assertEquals(foo2Zxid, lastZxid);
+
+ // Register watch
+ TrackerWatcher watcher = new TrackerWatcher();
+ Assert.assertEquals("data1", new String(o.zk
+ .getZKDatabase().getData("/foo2", stat, watcher)));
+
+ // Propose /foo1 update
+ long proposalZxid = ZxidUtils.makeZxid(1, 1000);
+ proposeSetData(qp, "/foo1", proposalZxid, "data2", 2);
+ oa.writeRecord(qp, null);
+
+ // Commit /foo1 update
+ qp.setType(Leader.COMMIT);
+ qp.setZxid(proposalZxid);
+ oa.writeRecord(qp, null);
+
+ // Inform /foo2 update
+ long informZxid = ZxidUtils.makeZxid(1, 1001);
+ proposeSetData(qp, "/foo2", informZxid, "data2", 2);
+ qp.setType(Leader.INFORM);
+ oa.writeRecord(qp, null);
+
+ qp.setType(Leader.UPTODATE);
+ qp.setZxid(0);
+ oa.writeRecord(qp, null);
+
+ // Read the uptodate ack
+ readPacketSkippingPing(ia, qp);
+ Assert.assertEquals(Leader.ACK, qp.getType());
+ Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
+
+ // Data should get updated
+ watcher.waitForChange();
+ Assert.assertEquals("data2", new String(o.zk
+ .getZKDatabase().getData("/foo1", stat, null)));
+ Assert.assertEquals("data2", new String(o.zk
+ .getZKDatabase().getData("/foo2", stat, null)));
+
+ zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir));
+ lastZxid = zkDb2.loadDataBase();
+ Assert.assertEquals("data2", new String(zkDb2.getData("/foo1", stat, null)));
+ Assert.assertEquals("data2", new String(zkDb2.getData("/foo2", stat, null)));
+ Assert.assertEquals(informZxid, lastZxid);
+ } finally {
+ recursiveDelete(tmpDir);
+ }
+
+ }
+
+ private void proposeSetData(QuorumPacket qp, String path,
+ long zxid, String data, int version) throws IOException {
+ qp.setType(Leader.PROPOSAL);
+ qp.setZxid(zxid);
+ TxnHeader hdr = new TxnHeader(4, 1414, qp.getZxid(), 55,
+ ZooDefs.OpCode.setData);
+ SetDataTxn sdt = new SetDataTxn(path, data.getBytes(), version);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ OutputArchive boa = BinaryOutputArchive.getArchive(baos);
+ boa.writeRecord(hdr, null);
+ boa.writeRecord(sdt, null);
+ qp.setData(baos.toByteArray());
+ }
+ });
+ }
+
+ @Test
public void testLeaderBehind() throws Exception {
testLeaderConversation(new LeaderConversation() {
public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l)
@@ -1048,6 +1255,33 @@ public class Zab1_0Test {
return new ConversableFollower(peer, zk);
}
+ static class ConversableObserver extends Observer {
+
+ ConversableObserver(QuorumPeer self, ObserverZooKeeperServer zk) {
+ super(self, zk);
+ }
+
+ InetSocketAddress leaderAddr;
+ public void setLeaderSocketAddress(InetSocketAddress addr) {
+ leaderAddr = addr;
+ }
+
+ @Override
+ protected InetSocketAddress findLeader() {
+ return leaderAddr;
+ }
+ }
+
+ private ConversableObserver createObserver(File tmpDir, QuorumPeer peer)
+ throws IOException {
+ FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir);
+ peer.setTxnFactory(logFactory);
+ ZKDatabase zkDb = new ZKDatabase(logFactory);
+ ObserverZooKeeperServer zk = new ObserverZooKeeperServer(logFactory, peer, zkDb);
+ peer.setZKDatabase(zkDb);
+ return new ConversableObserver(peer, zk);
+ }
+
private QuorumPeer createQuorumPeer(File tmpDir) throws IOException, FileNotFoundException {
HashMap<Long, QuorumServer> peers = new HashMap<Long, QuorumServer>();
QuorumPeer peer = new QuorumPeer();