You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ph...@apache.org on 2011/09/17 02:36:32 UTC
svn commit: r1171879 - in /zookeeper/trunk: ./
src/java/main/org/apache/zookeeper/server/
src/java/main/org/apache/zookeeper/server/quorum/
src/java/test/org/apache/zookeeper/test/
Author: phunt
Date: Sat Sep 17 00:36:31 2011
New Revision: 1171879
URL: http://svn.apache.org/viewvc?rev=1171879&view=rev
Log:
ZOOKEEPER-1176. Remove dead code and basic cleanup in DataTree (Thomas Koch via phunt)
Modified:
zookeeper/trunk/CHANGES.txt
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataNode.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/RestoreCommittedLogTest.java
Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1171879&r1=1171878&r2=1171879&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Sat Sep 17 00:36:31 2011
@@ -30,6 +30,9 @@ IMPROVEMENTS:
ZOOKEEPER-1184. jute generated files are not being cleaned up via "ant clean"
(Thomas Koch via phunt)
+ ZOOKEEPER-1176. Remove dead code and basic cleanup in DataTree
+ (Thomas Koch via phunt)
+
Release 3.4.0 -
Non-backward compatible changes:
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataNode.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataNode.java?rev=1171879&r1=1171878&r2=1171879&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataNode.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataNode.java Sat Sep 17 00:36:31 2011
@@ -127,6 +127,11 @@ public class DataNode implements Record
return children;
}
+ public synchronized long getApproximateDataSize() {
+ if(null==data) return 0;
+ return data.length;
+ }
+
synchronized public void copyStat(Stat to) {
to.setAversion(stat.getAversion());
to.setCtime(stat.getCtime());
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java?rev=1171879&r1=1171878&r2=1171879&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java Sat Sep 17 00:36:31 2011
@@ -121,22 +121,22 @@ public class DataTree {
* this is map from longs to acl's. It saves acl's being stored for each
* datanode.
*/
- public final Map<Long, List<ACL>> longKeyMap =
+ private final Map<Long, List<ACL>> longKeyMap =
new HashMap<Long, List<ACL>>();
/**
* this a map from acls to long.
*/
- public final Map<List<ACL>, Long> aclKeyMap =
+ private final Map<List<ACL>, Long> aclKeyMap =
new HashMap<List<ACL>, Long>();
/**
* these are the number of acls that we have in the datatree
*/
- protected long aclIndex = 0;
+ private long aclIndex = 0;
@SuppressWarnings("unchecked")
- public HashSet<String> getEphemerals(long sessionId) {
+ public Set<String> getEphemerals(long sessionId) {
HashSet<String> retv = ephemerals.get(sessionId);
if (retv == null) {
return new HashSet<String>();
@@ -152,6 +152,10 @@ public class DataTree {
return ephemerals;
}
+ int getAclSize() {
+ return longKeyMap.size();
+ }
+
private long incrementIndex() {
return ++aclIndex;
}
@@ -245,8 +249,7 @@ public class DataTree {
DataNode value = entry.getValue();
synchronized (value) {
result += entry.getKey().length();
- result += (value.data == null ? 0
- : value.data.length);
+ result += value.getApproximateDataSize();
}
}
return result;
@@ -445,16 +448,14 @@ public class DataTree {
}
synchronized (parent) {
Set<String> children = parent.getChildren();
- if (children != null) {
- if (children.contains(childName)) {
- throw new KeeperException.NodeExistsException();
- }
+ if (children != null && children.contains(childName)) {
+ throw new KeeperException.NodeExistsException();
}
-
+
if (parentCVersion == -1) {
parentCVersion = parent.stat.getCversion();
parentCVersion++;
- }
+ }
parent.stat.setCversion(parentCVersion);
parent.stat.setPzxid(zxid);
Long longval = convertAcls(acl);
@@ -486,8 +487,8 @@ public class DataTree {
}
}
// also check to update the quotas for this node
- String lastPrefix;
- if((lastPrefix = getMaxPrefixWithQuota(path)) != null) {
+ String lastPrefix = getMaxPrefixWithQuota(path);
+ if(lastPrefix != null) {
// ok we have some match and need to update
updateCount(lastPrefix, 1);
updateBytes(lastPrefix, data == null ? 0 : data.length);
@@ -534,18 +535,15 @@ public class DataTree {
}
}
}
- if (parentName.startsWith(procZookeeper)) {
+ if (parentName.startsWith(procZookeeper) && Quotas.limitNode.equals(childName)) {
// delete the node in the trie.
- if (Quotas.limitNode.equals(childName)) {
- // we need to update the trie
- // as well
- pTrie.deletePath(parentName.substring(quotaZookeeper.length()));
- }
+ // we need to update the trie as well
+ pTrie.deletePath(parentName.substring(quotaZookeeper.length()));
}
// also check to update the quotas for this node
- String lastPrefix;
- if((lastPrefix = getMaxPrefixWithQuota(path)) != null) {
+ String lastPrefix = getMaxPrefixWithQuota(path);
+ if(lastPrefix != null) {
// ok we have some match and need to update
updateCount(lastPrefix, -1);
int bytes = 0;
@@ -563,7 +561,7 @@ public class DataTree {
Set<Watcher> processed = dataWatches.triggerWatch(path,
EventType.NodeDeleted);
childWatches.triggerWatch(path, EventType.NodeDeleted, processed);
- childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,
+ childWatches.triggerWatch("".equals(parentName) ? "/" : parentName,
EventType.NodeChildrenChanged);
}
@@ -584,8 +582,8 @@ public class DataTree {
n.copyStat(s);
}
// now update if the path is in a quota subtree.
- String lastPrefix;
- if((lastPrefix = getMaxPrefixWithQuota(path)) != null) {
+ String lastPrefix = getMaxPrefixWithQuota(path);
+ if(lastPrefix != null) {
this.updateBytes(lastPrefix, (data == null ? 0 : data.length)
- (lastdata == null ? 0 : lastdata.length));
}
@@ -605,11 +603,11 @@ public class DataTree {
// root node for now.
String lastPrefix = pTrie.findMaxPrefix(path);
- if (!rootZookeeper.equals(lastPrefix) && !("".equals(lastPrefix))) {
- return lastPrefix;
+ if (rootZookeeper.equals(lastPrefix) || "".equals(lastPrefix)) {
+ return null;
}
else {
- return null;
+ return lastPrefix;
}
}
@@ -656,11 +654,10 @@ public class DataTree {
}
ArrayList<String> children;
Set<String> childs = n.getChildren();
- if (childs != null) {
- children = new ArrayList<String>(childs.size());
- children.addAll(childs);
- } else {
+ if (childs == null) {
children = new ArrayList<String>(0);
+ } else {
+ children = new ArrayList<String>(childs);
}
if (watcher != null) {
@@ -713,7 +710,7 @@ public class DataTree {
public Stat stat;
public List<ProcessTxnResult> multiResult;
-
+
/**
* Equality is defined as the clientId and the cxid being the same. This
* allows us to use hash tables to track completion of transactions.
@@ -801,7 +798,7 @@ public class DataTree {
case OpCode.check:
CheckVersionTxn checkTxn = (CheckVersionTxn) txn;
debug = "Check Version transaction for "
- + checkTxn.getPath()
+ + checkTxn.getPath()
+ " and version="
+ checkTxn.getVersion();
rc.path = checkTxn.getPath();
@@ -846,9 +843,9 @@ public class DataTree {
assert(record != null);
ByteBufferInputStream.byteBuffer2Record(bb, record);
-
+
if (failed && subtxn.getType() != OpCode.error){
- int ec = post_failed ? Code.RUNTIMEINCONSISTENCY.intValue()
+ int ec = post_failed ? Code.RUNTIMEINCONSISTENCY.intValue()
: Code.OK.intValue();
subtxn.setType(OpCode.error);
@@ -860,7 +857,7 @@ public class DataTree {
}
TxnHeader subHdr = new TxnHeader(header.getClientId(), header.getCxid(),
- header.getZxid(), header.getTime(),
+ header.getZxid(), header.getTime(),
subtxn.getType());
ProcessTxnResult subRc = processTxn(subHdr, record);
rc.multiResult.add(subRc);
@@ -891,7 +888,7 @@ public class DataTree {
* with the file.
*/
if (rc.zxid > lastProcessedZxid) {
- lastProcessedZxid = rc.zxid;
+ lastProcessedZxid = rc.zxid;
}
return rc;
}
@@ -1056,7 +1053,6 @@ public class DataTree {
}
String children[] = null;
synchronized (node) {
- scount++;
oa.writeString(pathString, "path");
oa.writeRecord(node, "node");
Set<String> childs = node.getChildren();
@@ -1078,10 +1074,6 @@ public class DataTree {
}
}
- int scount;
-
- public boolean initialized = false;
-
private void deserializeList(Map<Long, List<ACL>> longKeyMap,
InputArchive ia) throws IOException {
int i = ia.readInt("map");
@@ -1120,7 +1112,6 @@ public class DataTree {
}
public void serialize(OutputArchive oa, String tag) throws IOException {
- scount = 0;
serializeList(longKeyMap, oa);
serializeNode(oa, new StringBuilder(""));
// / marks end of stream
@@ -1134,7 +1125,7 @@ public class DataTree {
deserializeList(longKeyMap, ia);
nodes.clear();
String path = ia.readString("path");
- while (!path.equals("/")) {
+ while (!"/".equals(path)) {
DataNode node = new DataNode();
ia.readRecord(node, "node");
nodes.put(path, node);
@@ -1211,14 +1202,6 @@ public class DataTree {
childWatches.removeWatcher(watcher);
}
- public void clear() {
- root = null;
- nodes.clear();
- ephemerals.clear();
- // dataWatches = null;
- // childWatches = null;
- }
-
public void setWatches(long relativeZxid, List<String> dataWatches,
List<String> existWatches, List<String> childWatches,
Watcher watcher) {
@@ -1235,10 +1218,10 @@ public class DataTree {
e = new WatchedEvent(EventType.NodeDataChanged,
KeeperState.SyncConnected, path);
}
- if (e != null) {
- watcher.process(e);
- } else {
+ if (e == null) {
this.dataWatches.addWatch(path, watcher);
+ } else {
+ watcher.process(e);
}
}
for (String path : existWatches) {
@@ -1253,10 +1236,10 @@ public class DataTree {
e = new WatchedEvent(EventType.NodeCreated,
KeeperState.SyncConnected, path);
}
- if (e != null) {
- watcher.process(e);
- } else {
+ if (e == null) {
this.dataWatches.addWatch(path, watcher);
+ } else {
+ watcher.process(e);
}
}
for (String path : childWatches) {
@@ -1269,10 +1252,10 @@ public class DataTree {
e = new WatchedEvent(EventType.NodeChildrenChanged,
KeeperState.SyncConnected, path);
}
- if (e != null) {
- watcher.process(e);
- } else {
+ if (e == null) {
this.childWatches.addWatch(path, watcher);
+ } else {
+ watcher.process(e);
}
}
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java?rev=1171879&r1=1171878&r2=1171879&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java Sat Sep 17 00:36:31 2011
@@ -175,7 +175,7 @@ public class PrepRequestProcessor extend
/**
* Grab current pending change records for each op in a multi-op.
- *
+ *
* This is used inside MultiOp error code path to rollback in the event
* of a failed multi-op.
*
@@ -183,7 +183,7 @@ public class PrepRequestProcessor extend
*/
HashMap<String, ChangeRecord> getPendingChanges(MultiTransactionRecord multiRequest) {
HashMap<String, ChangeRecord> pendingChangeRecords = new HashMap<String, ChangeRecord>();
-
+
for(Op op: multiRequest) {
String path = op.getPath();
@@ -196,7 +196,7 @@ public class PrepRequestProcessor extend
// ignore this one
}
}
-
+
return pendingChangeRecords;
}
@@ -224,7 +224,7 @@ public class PrepRequestProcessor extend
break;
}
}
-
+
boolean empty = zks.outstandingChanges.isEmpty();
long firstZxid = 0;
if (!empty) {
@@ -234,7 +234,7 @@ public class PrepRequestProcessor extend
Iterator<ChangeRecord> priorIter = pendingChangeRecords.values().iterator();
while (priorIter.hasNext()) {
ChangeRecord c = priorIter.next();
-
+
/* Don't apply any prior change records less than firstZxid */
if (!empty && (c.zxid < firstZxid)) {
continue;
@@ -268,7 +268,7 @@ public class PrepRequestProcessor extend
AuthenticationProvider ap = ProviderRegistry.getProvider(id
.getScheme());
if (ap != null) {
- for (Id authId : ids) {
+ for (Id authId : ids) {
if (authId.getScheme().equals(id.getScheme())
&& ap.matches(authId.getId(), id.getId())) {
return;
@@ -433,7 +433,7 @@ public class PrepRequestProcessor extend
// queues up this operation without being the session owner.
// this request is the last of the session so it should be ok
//zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
- HashSet<String> es = zks.getZKDatabase()
+ Set<String> es = zks.getZKDatabase()
.getEphemerals(request.sessionId);
synchronized (zks.outstandingChanges) {
for (ChangeRecord c : zks.outstandingChanges) {
@@ -481,7 +481,7 @@ public class PrepRequestProcessor extend
// request.type + " id = 0x" + Long.toHexString(request.sessionId));
request.hdr = null;
request.txn = null;
-
+
try {
switch (request.type) {
case OpCode.create:
@@ -531,8 +531,8 @@ public class PrepRequestProcessor extend
if (ke != null) {
request.hdr.setType(OpCode.error);
request.txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue());
- }
-
+ }
+
/* Prep the request and convert to a Txn */
else {
try {
@@ -556,7 +556,7 @@ public class PrepRequestProcessor extend
}
//FIXME: I don't want to have to serialize it here and then
- // immediately deserialize in next processor. But I'm
+ // immediately deserialize in next processor. But I'm
// not sure how else to get the txn stored into our list.
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
@@ -568,7 +568,7 @@ public class PrepRequestProcessor extend
request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid, zks.getTime(), request.type);
request.txn = new MultiTxn(txns);
-
+
break;
//create/close session don't require request record
@@ -576,7 +576,7 @@ public class PrepRequestProcessor extend
case OpCode.closeSession:
pRequest2Txn(request.type, zks.getNextZxid(), request, null);
break;
-
+
//All the rest don't need to create a Txn - just verify session
case OpCode.sync:
case OpCode.exists:
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java?rev=1171879&r1=1171878&r2=1171879&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java Sat Sep 17 00:36:31 2011
@@ -22,9 +22,9 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Collection;
-import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
@@ -57,11 +57,11 @@ import org.apache.zookeeper.txn.TxnHeade
* and snapshots from the disk.
*/
public class ZKDatabase {
-
+
private static final Logger LOG = LoggerFactory.getLogger(ZKDatabase.class);
-
+
/**
- * make sure on a clear you take care of
+ * make sure on a clear you take care of
* all these members.
*/
protected DataTree dataTree;
@@ -73,7 +73,7 @@ public class ZKDatabase {
protected LinkedList<Proposal> committedLog = new LinkedList<Proposal>();
protected ReentrantReadWriteLock logLock = new ReentrantReadWriteLock();
volatile private boolean initialized = false;
-
+
/**
* the filetxnsnaplog that this zk database
* maps to. There is a one to one relationship
@@ -85,7 +85,7 @@ public class ZKDatabase {
sessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>();
this.snapLog = snapLog;
}
-
+
/**
* checks to see if the zk database has been
* initialized or not.
@@ -94,23 +94,23 @@ public class ZKDatabase {
public boolean isInitialized() {
return initialized;
}
-
+
/**
- * clear the zkdatabase.
- * Note to developers - be careful to see that
+ * clear the zkdatabase.
+ * Note to developers - be careful to see that
* the clear method does clear out all the
* data structures in zkdatabase.
*/
public void clear() {
minCommittedLog = 0;
maxCommittedLog = 0;
- /* to be safe we just create a new
+ /* to be safe we just create a new
* datatree.
*/
dataTree = new DataTree();
sessionsWithTimeouts.clear();
WriteLock lock = logLock.writeLock();
- try {
+ try {
lock.lock();
committedLog.clear();
} finally {
@@ -118,7 +118,7 @@ public class ZKDatabase {
}
initialized = false;
}
-
+
/**
* the datatree for this zkdatabase
* @return the datatree for this zkdatabase
@@ -126,7 +126,7 @@ public class ZKDatabase {
public DataTree getDataTree() {
return this.dataTree;
}
-
+
/**
* the committed log for this zk database
* @return the committed log for this zkdatabase
@@ -134,8 +134,8 @@ public class ZKDatabase {
public long getmaxCommittedLog() {
return maxCommittedLog;
}
-
-
+
+
/**
* the minimum committed transaction log
* available in memory
@@ -153,9 +153,9 @@ public class ZKDatabase {
public ReentrantReadWriteLock getLogLock() {
return logLock;
}
-
- public synchronized LinkedList<Proposal> getCommittedLog() {
+
+ public synchronized List<Proposal> getCommittedLog() {
ReadLock rl = logLock.readLock();
// only make a copy if this thread isn't already holding a lock
if(logLock.getReadHoldCount() <=0) {
@@ -165,10 +165,10 @@ public class ZKDatabase {
} finally {
rl.unlock();
}
- }
+ }
return this.committedLog;
- }
-
+ }
+
/**
* get the last processed zxid from a datatree
* @return the last processed zxid of a datatree
@@ -176,15 +176,7 @@ public class ZKDatabase {
public long getDataTreeLastProcessedZxid() {
return dataTree.lastProcessedZxid;
}
-
- /**
- * set the datatree initialized or not
- * @param b set the datatree initialized to b
- */
- public void setDataTreeInit(boolean b) {
- dataTree.initialized = b;
- }
-
+
/**
* return the sessions in the datatree
* @return the data tree sessions
@@ -192,7 +184,7 @@ public class ZKDatabase {
public Collection<Long> getSessions() {
return dataTree.getSessions();
}
-
+
/**
* get sessions with timeouts
* @return the hashmap of sessions with timeouts
@@ -201,9 +193,9 @@ public class ZKDatabase {
return sessionsWithTimeouts;
}
-
+
/**
- * load the database from the disk onto memory and also add
+ * load the database from the disk onto memory and also add
* the transactions to the committedlog in memory.
* @return the last valid zxid on disk
* @throws IOException
@@ -219,12 +211,12 @@ public class ZKDatabase {
addCommittedProposal(r);
}
};
-
+
long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener);
initialized = true;
return zxid;
}
-
+
/**
* maintains a list of last <i>committedLog</i>
* or so committed requests. This is used for
@@ -239,7 +231,7 @@ public class ZKDatabase {
committedLog.removeFirst();
minCommittedLog = committedLog.getFirst().packet.getZxid();
}
- if (committedLog.size() == 0) {
+ if (committedLog.isEmpty()) {
minCommittedLog = request.zxid;
maxCommittedLog = request.zxid;
}
@@ -267,7 +259,7 @@ public class ZKDatabase {
}
}
-
+
/**
* remove a cnxn from the datatree
* @param cnxn the cnxn to remove from the datatree
@@ -302,11 +294,11 @@ public class ZKDatabase {
}
/**
- * the paths for ephemeral session id
- * @param sessionId the session id for which paths match to
+ * the paths for ephemeral session id
+ * @param sessionId the session id for which paths match to
* @return the paths for a session id
*/
- public HashSet<String> getEphemerals(long sessionId) {
+ public Set<String> getEphemerals(long sessionId) {
return dataTree.getEphemerals(sessionId);
}
@@ -330,7 +322,7 @@ public class ZKDatabase {
}
/**
- * stat the path
+ * stat the path
* @param path the path for which stat is to be done
* @param serverCnxn the servercnxn attached to this request
* @return the stat of this node
@@ -339,7 +331,7 @@ public class ZKDatabase {
public Stat statNode(String path, ServerCnxn serverCnxn) throws KeeperException.NoNodeException {
return dataTree.statNode(path, serverCnxn);
}
-
+
/**
* get the datanode for this path
* @param path the path to lookup
@@ -359,14 +351,14 @@ public class ZKDatabase {
}
/**
- * get data and stat for a path
+ * get data and stat for a path
* @param path the path being queried
* @param stat the stat for this path
* @param watcher the watcher function
* @return
* @throws KeeperException.NoNodeException
*/
- public byte[] getData(String path, Stat stat, Watcher watcher)
+ public byte[] getData(String path, Stat stat, Watcher watcher)
throws KeeperException.NoNodeException {
return dataTree.getData(path, stat, watcher);
}
@@ -383,7 +375,7 @@ public class ZKDatabase {
List<String> existWatches, List<String> childWatches, Watcher watcher) {
dataTree.setWatches(relativeZxid, dataWatches, existWatches, childWatches, watcher);
}
-
+
/**
* get acl for a path
* @param path the path to query for acl
@@ -422,7 +414,7 @@ public class ZKDatabase {
* @return the acl size of the datatree
*/
public int getAclSize() {
- return dataTree.longKeyMap.size();
+ return dataTree.getAclSize();
}
/**
@@ -437,9 +429,9 @@ public class ZKDatabase {
loadDataBase();
return truncated;
}
-
+
/**
- * deserialize a snapshot from an input archive
+ * deserialize a snapshot from an input archive
* @param ia the input archive you want to deserialize from
* @throws IOException
*/
@@ -447,8 +439,8 @@ public class ZKDatabase {
clear();
SerializeUtils.deserializeSnapshot(getDataTree(),ia,getSessionWithTimeOuts());
initialized = true;
- }
-
+ }
+
/**
* serialize the snapshot
* @param oa the output archive to which the snapshot needs to be serialized
@@ -461,7 +453,7 @@ public class ZKDatabase {
}
/**
- * append to the underlying transaction log
+ * append to the underlying transaction log
* @param si the request to append
* @return true if the append was succesfull and false if not
*/
@@ -483,7 +475,7 @@ public class ZKDatabase {
public void commit() throws IOException {
this.snapLog.commit();
}
-
+
/**
* close this database. free the resources
* @throws IOException
@@ -491,5 +483,5 @@ public class ZKDatabase {
public void close() throws IOException {
this.snapLog.close();
}
-
+
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java?rev=1171879&r1=1171878&r2=1171879&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java Sat Sep 17 00:36:31 2011
@@ -69,17 +69,17 @@ import javax.security.sasl.SaslException
*/
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
protected static final Logger LOG;
-
+
static {
LOG = LoggerFactory.getLogger(ZooKeeperServer.class);
-
+
Environment.logEnv("Server environment:", LOG);
}
protected ZooKeeperServerBean jmxServerBean;
protected DataTreeBean jmxDataTreeBean;
-
+
/**
* The server delegates loading of the tree to an instance of the interface
*/
@@ -119,7 +119,7 @@ public class ZooKeeperServer implements
// this data structure must be accessed under the outstandingChanges lock
final HashMap<String, ChangeRecord> outstandingChangesForPath =
new HashMap<String, ChangeRecord>();
-
+
private ServerCnxnFactory serverCnxnFactory;
private final ServerStats serverStats;
@@ -127,22 +127,22 @@ public class ZooKeeperServer implements
void removeCnxn(ServerCnxn cnxn) {
zkDb.removeCnxn(cnxn);
}
-
+
/**
* Creates a ZooKeeperServer instance. Nothing is setup, use the setX
- * methods to prepare the instance (eg datadir, datalogdir, ticktime,
+ * methods to prepare the instance (eg datadir, datalogdir, ticktime,
* builder, etc...)
- *
+ *
* @throws IOException
*/
public ZooKeeperServer() {
serverStats = new ServerStats(this);
}
-
+
/**
* Creates a ZooKeeperServer instance. It sets everything up, but doesn't
* actually start listening for clients until run() is invoked.
- *
+ *
* @param dataDir the directory to put the data
*/
public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime,
@@ -154,7 +154,7 @@ public class ZooKeeperServer implements
this.tickTime = tickTime;
this.minSessionTimeout = minSessionTimeout;
this.maxSessionTimeout = maxSessionTimeout;
-
+
LOG.info("Created server with tickTime " + tickTime
+ " minSessionTimeout " + getMinSessionTimeout()
+ " maxSessionTimeout " + getMaxSessionTimeout()
@@ -163,7 +163,7 @@ public class ZooKeeperServer implements
}
/**
- * creates a zookeeperserver instance.
+ * creates a zookeeperserver instance.
* @param txnLogFactory the file transaction snapshot logging class
* @param tickTime the ticktime for the server
* @param treeBuilder the datatree builder
@@ -174,7 +174,7 @@ public class ZooKeeperServer implements
this(txnLogFactory, tickTime, -1, -1, treeBuilder,
new ZKDatabase(txnLogFactory));
}
-
+
public ServerStats serverStats() {
return serverStats;
}
@@ -230,7 +230,7 @@ public class ZooKeeperServer implements
public ZKDatabase getZKDatabase() {
return this.zkDb;
}
-
+
/**
* set the zkdatabase for this zookeeper server
* @param zkDb
@@ -238,7 +238,7 @@ public class ZooKeeperServer implements
public void setZKDatabase(ZKDatabase zkDb) {
this.zkDb = zkDb;
}
-
+
/**
* Restore sessions and data
*/
@@ -252,7 +252,7 @@ public class ZooKeeperServer implements
deadSessions.add(session);
}
}
- zkDb.setDataTreeInit(true);
+
for (long session : deadSessions) {
// XXX: Is lastProcessedZxid really the best thing to use?
killSession(session, zkDb.getDataTreeLastProcessedZxid());
@@ -273,7 +273,7 @@ public class ZooKeeperServer implements
}
}
-
+
/**
* This should be called from a synchronized block on this!
*/
@@ -296,10 +296,10 @@ public class ZooKeeperServer implements
private void close(long sessionId) {
submitRequest(null, sessionId, OpCode.closeSession, 0, null, null);
}
-
+
public void closeSession(long sessionId) {
LOG.info("Closing session 0x" + Long.toHexString(sessionId));
-
+
// we do not want to wait for a session close. send it as soon as we
// detect it!
close(sessionId);
@@ -331,7 +331,7 @@ public class ZooKeeperServer implements
super(msg);
}
}
-
+
void touch(ServerCnxn cnxn) throws MissingSessionException {
if (cnxn == null) {
return;
@@ -350,7 +350,7 @@ public class ZooKeeperServer implements
try {
jmxServerBean = new ZooKeeperServerBean(this);
MBeanRegistry.getInstance().register(jmxServerBean, null);
-
+
try {
jmxDataTreeBean = new DataTreeBean(zkDb.getDataTree());
MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean);
@@ -363,19 +363,19 @@ public class ZooKeeperServer implements
jmxServerBean = null;
}
}
-
- public void startdata()
+
+ public void startdata()
throws IOException, InterruptedException {
//check to see if zkDb is not null
if (zkDb == null) {
zkDb = new ZKDatabase(this.txnLogFactory);
- }
+ }
if (!zkDb.isInitialized()) {
loadData();
}
}
-
- public void startup() {
+
+ public void startup() {
createSessionTracker();
setupRequestProcessors();
@@ -539,10 +539,10 @@ public class ZooKeeperServer implements
public void reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd,
int sessionTimeout) throws IOException {
- if (!checkPasswd(sessionId, passwd)) {
- finishSessionInit(cnxn, false);
- } else {
+ if (checkPasswd(sessionId, passwd)) {
revalidateSession(cnxn, sessionId, sessionTimeout);
+ } else {
+ finishSessionInit(cnxn, false);
}
}
@@ -572,30 +572,31 @@ public class ZooKeeperServer implements
baos.close();
ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
bb.putInt(bb.remaining() - 4).rewind();
- cnxn.sendBuffer(bb);
+ cnxn.sendBuffer(bb);
+
+ if (valid) {
+ LOG.info("Established session 0x"
+ + Long.toHexString(cnxn.getSessionId())
+ + " with negotiated timeout " + cnxn.getSessionTimeout()
+ + " for client "
+ + cnxn.getRemoteSocketAddress());
+ } else {
- if (!valid) {
LOG.info("Invalid session 0x"
+ Long.toHexString(cnxn.getSessionId())
+ " for client "
+ cnxn.getRemoteSocketAddress()
+ ", probably expired");
cnxn.sendBuffer(ServerCnxnFactory.closeConn);
- } else {
- LOG.info("Established session 0x"
- + Long.toHexString(cnxn.getSessionId())
- + " with negotiated timeout " + cnxn.getSessionTimeout()
- + " for client "
- + cnxn.getRemoteSocketAddress());
}
-
+
cnxn.enableRecv();
} catch (Exception e) {
LOG.warn("Exception while establishing session, closing", e);
cnxn.close();
}
}
-
+
public void closeSession(ServerCnxn cnxn, RequestHeader requestHeader) {
closeSession(cnxn.getSessionId());
}
@@ -615,7 +616,7 @@ public class ZooKeeperServer implements
Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo);
submitRequest(si);
}
-
+
public void submitRequest(Request si) {
if (firstProcessor == null) {
synchronized (this) {
@@ -679,7 +680,7 @@ public class ZooKeeperServer implements
}
/**
- * return the last proceesed id from the
+ * return the last proceesed id from the
* datatree
*/
public long getLastProcessedZxid() {
@@ -688,7 +689,7 @@ public class ZooKeeperServer implements
/**
* return the outstanding requests
- * in the queue, which havent been
+ * in the queue, which havent been
* processed yet
*/
public long getOutstandingRequests() {
@@ -696,7 +697,7 @@ public class ZooKeeperServer implements
}
/**
- * trunccate the log to get in sync with others
+ * trunccate the log to get in sync with others
* if in a quorum
* @param zxid the zxid that it needs to get in sync
* with others
@@ -705,7 +706,7 @@ public class ZooKeeperServer implements
public void truncateLog(long zxid) throws IOException {
this.zkDb.truncateLog(zxid);
}
-
+
public int getTickTime() {
return tickTime;
}
@@ -740,7 +741,7 @@ public class ZooKeeperServer implements
public void setTxnLogFactory(FileTxnSnapLog txnLog) {
this.txnLogFactory = txnLog;
}
-
+
public FileTxnSnapLog getTxnLogFactory() {
return this.txnLogFactory;
}
@@ -750,9 +751,9 @@ public class ZooKeeperServer implements
}
public void dumpEphemerals(PrintWriter pwriter) {
- zkDb.dumpEphemerals(pwriter);
+ zkDb.dumpEphemerals(pwriter);
}
-
+
public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
ConnectRequest connReq = new ConnectRequest();
@@ -774,7 +775,7 @@ public class ZooKeeperServer implements
+ cnxn.getRemoteSocketAddress()
+ "; will be dropped if server is in r-o mode");
}
- if (readOnly == false && this instanceof ReadOnlyZooKeeperServer) {
+ if (!readOnly && this instanceof ReadOnlyZooKeeperServer) {
String msg = "Refusing session request for not-read-only client "
+ cnxn.getRemoteSocketAddress();
LOG.info(msg);
@@ -807,7 +808,11 @@ public class ZooKeeperServer implements
// session is setup
cnxn.disableRecv();
long sessionId = connReq.getSessionId();
- if (sessionId != 0) {
+ if (sessionId == 0) {
+ LOG.info("Client attempting to establish new session at "
+ + cnxn.getRemoteSocketAddress());
+ createSession(cnxn, passwd, sessionTimeout);
+ } else {
long clientSessionId = connReq.getSessionId();
LOG.info("Client attempting to renew session 0x"
+ Long.toHexString(clientSessionId)
@@ -815,10 +820,6 @@ public class ZooKeeperServer implements
serverCnxnFactory.closeSession(sessionId);
cnxn.setSessionId(sessionId);
reopenSession(cnxn, sessionId, passwd, sessionTimeout);
- } else {
- LOG.info("Client attempting to establish new session at "
- + cnxn.getRemoteSocketAddress());
- createSession(cnxn, passwd, sessionTimeout);
}
}
@@ -826,7 +827,7 @@ public class ZooKeeperServer implements
if (getGlobalOutstandingLimit() < getInProcess()) {
return outStandingCount > 0;
}
- return false;
+ return false;
}
public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
@@ -851,10 +852,18 @@ public class ZooKeeperServer implements
authReturn = ap.handleAuthentication(cnxn, authPacket.getAuth());
} catch(RuntimeException e) {
LOG.warn("Caught runtime exception from AuthenticationProvider: " + scheme + " due to " + e);
- authReturn = KeeperException.Code.AUTHFAILED;
+ authReturn = KeeperException.Code.AUTHFAILED;
}
}
- if (authReturn!= KeeperException.Code.OK) {
+ if (authReturn == KeeperException.Code.OK) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Authentication succeeded for scheme: " + scheme);
+ }
+ LOG.info("auth success " + cnxn.getRemoteSocketAddress());
+ ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
+ KeeperException.Code.OK.intValue());
+ cnxn.sendResponse(rh, null, null);
+ } else {
if (ap == null) {
LOG.warn("No authentication provider for scheme: "
+ scheme + " has "
@@ -869,15 +878,6 @@ public class ZooKeeperServer implements
// ... and close connection
cnxn.sendBuffer(ServerCnxnFactory.closeConn);
cnxn.disableRecv();
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Authentication succeeded for scheme: "
- + scheme);
- }
- LOG.info("auth success " + cnxn.getRemoteSocketAddress());
- ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
- KeeperException.Code.OK.intValue());
- cnxn.sendResponse(rh, null, null);
}
return;
} else {
@@ -910,7 +910,7 @@ public class ZooKeeperServer implements
// if using the DIGEST-MD5 mechanism, clientToken will be empty at the beginning of the
// SASL negotiation process.
responseToken = saslServer.evaluateResponse(clientToken);
- if (saslServer.isComplete() == true) {
+ if (saslServer.isComplete()) {
String authorizationID = saslServer.getAuthorizationID();
LOG.info("adding SASL authorization for authorizationID: " + authorizationID);
cnxn.addAuthInfo(new Id("sasl",authorizationID));
@@ -937,6 +937,4 @@ public class ZooKeeperServer implements
// wrap SASL response token to client inside a Response object.
return new SetSASLResponse(responseToken);
}
-
-
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java?rev=1171879&r1=1171878&r2=1171879&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java Sat Sep 17 00:36:31 2011
@@ -28,6 +28,7 @@ import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.LinkedList;
+import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
@@ -56,7 +57,7 @@ import org.apache.zookeeper.txn.TxnHeade
public class LearnerHandler extends Thread {
private static final Logger LOG = LoggerFactory.getLogger(LearnerHandler.class);
- protected final Socket sock;
+ protected final Socket sock;
public Socket getSocket() {
return sock;
@@ -65,22 +66,22 @@ public class LearnerHandler extends Thre
final Leader leader;
long tickOfLastAck;
-
+
/**
* ZooKeeper server identifier of this learner
*/
protected long sid = 0;
-
+
long getSid(){
return sid;
- }
+ }
protected int version = 0x1;
-
+
int getVersion() {
return version;
}
-
+
/**
* The packets to be sent to the learner
*/
@@ -99,7 +100,7 @@ public class LearnerHandler extends Thre
this.leader = leader;
leader.addLearnerHandler(this);
}
-
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
@@ -171,7 +172,7 @@ public class LearnerHandler extends Thre
String type = null;
String mess = null;
Record txn = null;
-
+
switch (p.getType()) {
case Leader.ACK:
type = "ACK";
@@ -181,7 +182,7 @@ public class LearnerHandler extends Thre
break;
case Leader.FOLLOWERINFO:
type = "FOLLOWERINFO";
- break;
+ break;
case Leader.NEWLEADER:
type = "NEWLEADER";
break;
@@ -232,7 +233,7 @@ public class LearnerHandler extends Thre
*/
@Override
public void run() {
- try {
+ try {
sock.setSoTimeout(leader.self.getTickTime()*leader.self.getInitLimit());
ia = BinaryInputArchive.getArchive(new BufferedInputStream(sock
.getInputStream()));
@@ -263,18 +264,18 @@ public class LearnerHandler extends Thre
LOG.info("Follower sid: " + this.sid + " : info : "
+ leader.self.quorumPeers.get(this.sid));
-
+
if (qp.getType() == Leader.OBSERVERINFO) {
learnerType = LearnerType.OBSERVER;
- }
-
+ }
+
long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
-
+
long peerLastZxid;
StateSummary ss = null;
long zxid = qp.getZxid();
long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
-
+
if (this.getVersion() < 0x10000) {
// we are going to have to extrapolate the epoch information
long epoch = ZxidUtils.getEpochFromZxid(zxid);
@@ -299,21 +300,21 @@ public class LearnerHandler extends Thre
leader.waitForEpochAck(this.getSid(), ss);
}
peerLastZxid = ss.getLastZxid();
-
+
/* the default to send to the follower */
int packetToSend = Leader.SNAP;
long zxidToSend = 0;
long leaderLastZxid = 0;
/** the packets that the follower needs to get updates from **/
long updates = peerLastZxid;
-
- /* we are sending the diff check if we have proposals in memory to be able to
- * send a diff to the
- */
+
+ /* we are sending the diff check if we have proposals in memory to be able to
+ * send a diff to the
+ */
ReentrantReadWriteLock lock = leader.zk.getZKDatabase().getLogLock();
ReadLock rl = lock.readLock();
try {
- rl.lock();
+ rl.lock();
final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();
final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();
LOG.info("Synchronizing with Follower sid: " + this.sid
@@ -321,7 +322,7 @@ public class LearnerHandler extends Thre
+" minCommittedLog = "+Long.toHexString(minCommittedLog)
+" peerLastZxid = "+Long.toHexString(peerLastZxid));
- LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();
+ List<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();
if (proposals.size() != 0) {
if ((maxCommittedLog >= peerLastZxid)
@@ -353,12 +354,12 @@ public class LearnerHandler extends Thre
LOG.info("Sending TRUNC");
zxidToSend = prevProposalZxid;
updates = zxidToSend;
- }
+ }
else {
// Just send the diff
packetToSend = Leader.DIFF;
LOG.info("Sending diff");
- zxidToSend = maxCommittedLog;
+ zxidToSend = maxCommittedLog;
}
}
@@ -375,7 +376,7 @@ public class LearnerHandler extends Thre
}
} else {
// just let the state transfer happen
- }
+ }
leaderLastZxid = leader.startForwarding(this, updates);
if (peerLastZxid == leaderLastZxid) {
@@ -401,21 +402,21 @@ public class LearnerHandler extends Thre
}
oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet");
bufferedOutput.flush();
-
+
/* if we are not truncating or sending a diff just send a snapshot */
if (packetToSend == Leader.SNAP) {
LOG.info("Sending snapshot last zxid of peer is 0x"
- + Long.toHexString(peerLastZxid) + " "
+ + Long.toHexString(peerLastZxid) + " "
+ " zxid of leader is 0x"
+ Long.toHexString(leaderLastZxid)
- + "sent zxid of db as 0x"
+ + "sent zxid of db as 0x"
+ Long.toHexString(zxidToSend));
// Dump data to peer
leader.zk.getZKDatabase().serializeSnapshot(oa);
oa.writeString("BenWasHere", "signature");
}
bufferedOutput.flush();
-
+
// Start sending packets
new Thread() {
public void run() {
@@ -428,9 +429,9 @@ public class LearnerHandler extends Thre
}
}
}.start();
-
+
/*
- * Have to wait for the first ACK, wait until
+ * Have to wait for the first ACK, wait until
* the leader is ready, and only then we can
* start processing messages.
*/
@@ -441,7 +442,7 @@ public class LearnerHandler extends Thre
return;
}
leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
-
+
/*
* Wait until leader starts up
*/
@@ -456,7 +457,7 @@ public class LearnerHandler extends Thre
//
queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
-
+
while (true) {
qp = new QuorumPacket();
ia.readRecord(qp, "packet");
@@ -525,7 +526,7 @@ public class LearnerHandler extends Thre
qp.setData(bos.toByteArray());
queuedPackets.add(qp);
break;
- case Leader.REQUEST:
+ case Leader.REQUEST:
bb = ByteBuffer.wrap(qp.getData());
sessionId = bb.getLong();
cxid = bb.getInt();
@@ -547,7 +548,7 @@ public class LearnerHandler extends Thre
if (sock != null && !sock.isClosed()) {
LOG.error("Unexpected exception causing shutdown while sock "
+ "still open", e);
- //close the socket to make sure the
+ //close the socket to make sure the
//other side can see it being close
try {
sock.close();
@@ -558,7 +559,7 @@ public class LearnerHandler extends Thre
} catch (InterruptedException e) {
LOG.error("Unexpected exception causing shutdown", e);
} finally {
- LOG.warn("******* GOODBYE "
+ LOG.warn("******* GOODBYE "
+ (sock != null ? sock.getRemoteSocketAddress() : "<null>")
+ " ********");
shutdown();
Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java?rev=1171879&r1=1171878&r2=1171879&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java Sat Sep 17 00:36:31 2011
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertTru
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -43,8 +44,7 @@ import org.junit.Test;
public class FollowerResyncConcurrencyTest extends ZKTestCase {
- volatile int counter = 0;
- volatile int errors = 0;
+ private volatile int counter = 0;
private static final Logger LOG = Logger.getLogger(FollowerResyncConcurrencyTest.class);
public static final long CONNECTION_TIMEOUT = ClientTest.CONNECTION_TIMEOUT;
@@ -64,7 +64,7 @@ public class FollowerResyncConcurrencyTe
* @throws KeeperException
*/
@Test
- public void testResyncBySnapThenDiffAfterFollowerCrashes ()
+ public void testResyncBySnapThenDiffAfterFollowerCrashes ()
throws IOException, InterruptedException, KeeperException, Throwable{
final Semaphore sem = new Semaphore(0);
@@ -80,7 +80,7 @@ public class FollowerResyncConcurrencyTe
Leader leader = qu.getPeer(index).peer.leader;
- assertNotNull(leader);
+ assertNotNull(leader);
/*
* Reusing the index variable to select a follower to connect to
*/
@@ -94,11 +94,11 @@ public class FollowerResyncConcurrencyTe
ZooKeeper zk = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(index).peer.getClientPort(), 1000, watcher1);
ZooKeeper zk2 = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(index).peer.getClientPort(), 1000, watcher2);
-
+
watcher1.waitForConnected(CONNECTION_TIMEOUT);
watcher2.waitForConnected(CONNECTION_TIMEOUT);
-
- zk.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+ zk.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Thread t = new Thread(new Runnable() {
@Override
@@ -109,9 +109,6 @@ public class FollowerResyncConcurrencyTe
@Override
public void processResult(int rc, String path, Object ctx, String name) {
counter++;
- if (rc != 0) {
- errors++;
- }
if(counter == 14200){
sem.release();
}
@@ -131,35 +128,32 @@ public class FollowerResyncConcurrencyTe
}
});
-
+
for(int i = 0; i < 13000; i++) {
zk3.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
counter++;
- if (rc != 0) {
- errors++;
- }
if(counter == 14200){
sem.release();
}
}
- }, null);
+ }, null);
if(i == 5000){
- qu.shutdown(index);
+ qu.shutdown(index);
LOG.info("Shutting down s1");
}
if(i == 12000){
//Restart off of snap, then get some txns for a log, then shut down
- qu.restart(index);
+ qu.restart(index);
Thread.sleep(300);
qu.shutdown(index);
t.start();
- Thread.sleep(300);
+ Thread.sleep(300);
qu.restart(index);
LOG.info("Setting up server: " + index);
}
@@ -173,9 +167,6 @@ public class FollowerResyncConcurrencyTe
@Override
public void processResult(int rc, String path, Object ctx, String name) {
counter++;
- if (rc != 0) {
- errors++;
- }
if(counter == 14200){
sem.release();
}
@@ -192,29 +183,29 @@ public class FollowerResyncConcurrencyTe
}
t.join(10000);
Thread.sleep(1000);
-
+
verifyState(qu, index, leader);
-
- }
-
+
+ }
+
/**
* This test:
* Starts up 3 ZKs. The non-leader ZKs are writing to cluster
- * Shut down one of the non-leader ZKs.
+ * Shut down one of the non-leader ZKs.
* Restart after sessions have expired but <500 txns have taken place (get a diff)
* Shut down immediately after restarting, start running separate thread with other transactions
* Restart to a diff while transactions are running in leader
- *
- *
+ *
+ *
* Before fixes for ZOOKEEPER-962, restarting off of diff could get an inconsistent view of data missing transactions that
* completed during diff syncing. Follower would also be considered "restarted" before all forwarded transactions
* were completely processed, so restarting would cause a snap file with a too-high zxid to be written, and transactions
* would be missed
- *
+ *
* This test should pretty reliably catch the failure of restarting the server before all diff messages have been processed,
* however, due to the transient nature of the system it may not catch failures due to concurrent processing of transactions
* during the leader's diff forwarding.
- *
+ *
* @throws IOException
* @throws InterruptedException
* @throws KeeperException
@@ -222,7 +213,7 @@ public class FollowerResyncConcurrencyTe
*/
@Test
- public void testResyncByDiffAfterFollowerCrashes ()
+ public void testResyncByDiffAfterFollowerCrashes ()
throws IOException, InterruptedException, KeeperException, Throwable{
final Semaphore sem = new Semaphore(0);
@@ -253,26 +244,23 @@ public class FollowerResyncConcurrencyTe
watcher1.waitForConnected(CONNECTION_TIMEOUT);
watcher2.waitForConnected(CONNECTION_TIMEOUT);
watcher3.waitForConnected(CONNECTION_TIMEOUT);
- zk.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zk.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk2.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
-
+
final AtomicBoolean runNow = new AtomicBoolean(false);
Thread t = new Thread(new Runnable() {
@Override
- public void run() {
+ public void run() {
int inSyncCounter = 0;
- while(inSyncCounter < 400) {
+ while(inSyncCounter < 400) {
if(runNow.get()) {
zk3.create("/mytestfoo", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
counter++;
- if (rc != 0) {
- errors++;
- }
if(counter > 7300){
sem.release();
}
@@ -280,7 +268,7 @@ public class FollowerResyncConcurrencyTe
}
}, null);
-
+
try {
Thread.sleep(10);
} catch (Exception e) {
@@ -302,19 +290,16 @@ public class FollowerResyncConcurrencyTe
@Override
public void processResult(int rc, String path, Object ctx, String name) {
counter++;
- if (rc != 0) {
- errors++;
- }
if(counter > 7300){
sem.release();
}
}
- }, null);
+ }, null);
if(i == 1000){
- qu.shutdown(index);
+ qu.shutdown(index);
Thread.sleep(1100);
LOG.info("Shutting down s1");
@@ -322,14 +307,14 @@ public class FollowerResyncConcurrencyTe
if(i == 1100 || i == 1150 || i == 1200) {
Thread.sleep(1000);
}
-
+
if(i == 1200){
- qu.startThenShutdown(index);
+ qu.startThenShutdown(index);
runNow.set(true);
qu.restart(index);
LOG.info("Setting up server: " + index);
}
-
+
if(i>=1000 && i%2== 0) {
zk3.create("/newbaz", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
@@ -337,9 +322,6 @@ public class FollowerResyncConcurrencyTe
@Override
public void processResult(int rc, String path, Object ctx, String name) {
counter++;
- if (rc != 0) {
- errors++;
- }
if(counter > 7300){
sem.release();
}
@@ -360,45 +342,45 @@ public class FollowerResyncConcurrencyTe
t.join(10000);
Thread.sleep(1000);
// Verify that server is following and has the same epoch as the leader
-
+
verifyState(qu, index, leader);
-
+
}
private void verifyState(QuorumUtil qu, int index, Leader leader) {
assertTrue("Not following", qu.getPeer(index).peer.follower != null);
long epochF = (qu.getPeer(index).peer.getActiveServer().getZxid() >> 32L);
long epochL = (leader.getEpoch() >> 32L);
- assertTrue("Zxid: " + qu.getPeer(index).peer.getActiveServer().getZKDatabase().getDataTreeLastProcessedZxid() +
+ assertTrue("Zxid: " + qu.getPeer(index).peer.getActiveServer().getZKDatabase().getDataTreeLastProcessedZxid() +
"Current epoch: " + epochF, epochF == epochL);
- int leaderIndex = (index == 1) ? 2 : 1;
+ int leaderIndex = (index == 1) ? 2 : 1;
Collection<Long> sessionsRestarted = qu.getPeer(index).peer.getActiveServer().getZKDatabase().getSessions();
Collection<Long> sessionsNotRestarted = qu.getPeer(leaderIndex).peer.getActiveServer().getZKDatabase().getSessions();
-
+
for(Long l : sessionsRestarted) {
- assertTrue("Should have same set of sessions in both servers, did not expect: " + l, sessionsNotRestarted.contains(l));
- }
+ assertTrue("Should have same set of sessions in both servers, did not expect: " + l, sessionsNotRestarted.contains(l));
+ }
assertEquals("Should have same number of sessions", sessionsNotRestarted.size(), sessionsRestarted.size());
ZKDatabase restarted = qu.getPeer(index).peer.getActiveServer().getZKDatabase();
ZKDatabase clean = qu.getPeer(3).peer.getActiveServer().getZKDatabase();
ZKDatabase lead = qu.getPeer(leaderIndex).peer.getActiveServer().getZKDatabase();
for(Long l : sessionsRestarted) {
assertTrue("Should have same set of sessions in both servers, did not expect: " + l, sessionsNotRestarted.contains(l));
- HashSet ephemerals = restarted.getEphemerals(l);
- HashSet cleanEphemerals = clean.getEphemerals(l);
+ Set ephemerals = restarted.getEphemerals(l);
+ Set cleanEphemerals = clean.getEphemerals(l);
for(Object o : cleanEphemerals) {
if(!ephemerals.contains(o)) {
LOG.info("Restarted follower doesn't contain ephemeral " + o);
}
}
- HashSet leadEphemerals = lead.getEphemerals(l);
+ Set leadEphemerals = lead.getEphemerals(l);
for(Object o : leadEphemerals) {
if(!cleanEphemerals.contains(o)) {
LOG.info("Follower doesn't contain ephemeral from leader " + o);
}
}
- assertEquals("Should have same number of ephemerals in both followers", ephemerals.size(), cleanEphemerals.size());
+ assertEquals("Should have same number of ephemerals in both followers", ephemerals.size(), cleanEphemerals.size());
assertEquals("Leader should equal follower", lead.getEphemerals(l).size(), cleanEphemerals.size());
}
- }
+ }
}
Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/RestoreCommittedLogTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/RestoreCommittedLogTest.java?rev=1171879&r1=1171878&r2=1171879&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/RestoreCommittedLogTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/RestoreCommittedLogTest.java Sat Sep 17 00:36:31 2011
@@ -20,7 +20,6 @@ package org.apache.zookeeper.test;
import java.io.File;
import java.util.List;
-import java.util.LinkedList;
import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
@@ -34,16 +33,15 @@ import org.apache.zookeeper.server.quoru
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.SyncRequestProcessor;
import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.junit.Assert;
import org.junit.Test;
-/** After a replica starts, it should load commits in its committedLog list.
+/** After a replica starts, it should load commits in its committedLog list.
* This test checks if committedLog != 0 after replica restarted.
*/
public class RestoreCommittedLogTest extends ZKTestCase implements Watcher {
private static final Logger LOG = Logger.getLogger(RestoreCommittedLogTest.class);
- private static String HOSTPORT = "127.0.0.1:" + PortAssignment.unique();
+ private static final String HOSTPORT = "127.0.0.1:" + PortAssignment.unique();
private static final int CONNECTION_TIMEOUT = 3000;
/**
* test the purge
@@ -76,7 +74,7 @@ public class RestoreCommittedLogTest ext
// start server again
zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
zks.startdata();
- LinkedList<Proposal> committedLog = zks.getZKDatabase().getCommittedLog();
+ List<Proposal> committedLog = zks.getZKDatabase().getCommittedLog();
int logsize = committedLog.size();
LOG.info("committedLog size = " + logsize);
Assert.assertTrue("log size != 0", (logsize != 0));