You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by br...@apache.org on 2011/07/15 06:10:46 UTC
svn commit: r1146953 - in /zookeeper/trunk: CHANGES.txt
src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java
src/java/main/org/apache/zookeeper/ZooKeeper.java
Author: breed
Date: Fri Jul 15 04:10:45 2011
New Revision: 1146953
URL: http://svn.apache.org/viewvc?rev=1146953&view=rev
Log:
ZOOKEEPER-1063. Dubious synchronization in Zookeeper and ClientCnxnSocketNIO classes
Modified:
zookeeper/trunk/CHANGES.txt
zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java
Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1146953&r1=1146952&r2=1146953&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Fri Jul 15 04:10:45 2011
@@ -247,6 +247,8 @@ BUGFIXES:
ZOOKEEPER-782. Incorrect C API documentation for Watches. (mahadev via breed)
+ ZOOKEEPER-1063. Dubious synchronization in Zookeeper and ClientCnxnSocketNIO classes (Yanick Dufresne via breed)
+
IMPROVEMENTS:
ZOOKEEPER-724. Improve junit test integration - log harness information
(phunt via mahadev)
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java?rev=1146953&r1=1146952&r2=1146953&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java Fri Jul 15 04:10:45 2011
@@ -58,8 +58,7 @@ public class ClientCnxnSocketNIO extends
* @throws InterruptedException
* @throws IOException
*/
- boolean doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue) throws InterruptedException, IOException {
- boolean packetReceived = false;
+ void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue) throws InterruptedException, IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
throw new IOException("Socket is null!");
@@ -85,19 +84,21 @@ public class ClientCnxnSocketNIO extends
}
lenBuffer.clear();
incomingBuffer = lenBuffer;
- packetReceived = true;
+ updateLastHeard();
initialized = true;
} else {
sendThread.readResponse(incomingBuffer);
lenBuffer.clear();
incomingBuffer = lenBuffer;
- packetReceived = true;
+ updateLastHeard();
}
}
}
if (sockKey.isWritable()) {
+ LinkedList<Packet> pending = new LinkedList<Packet>();
synchronized (outgoingQueue) {
if (!outgoingQueue.isEmpty()) {
+ updateLastSend();
ByteBuffer pbb = outgoingQueue.getFirst().bb;
sock.write(pbb);
if (!pbb.hasRemaining()) {
@@ -106,18 +107,15 @@ public class ClientCnxnSocketNIO extends
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
- pendingQueue.add(p);
+ pending.add(p);
}
}
}
}
+ synchronized(pendingQueue) {
+ pendingQueue.addAll(pending);
+ }
}
- if (outgoingQueue.isEmpty()) {
- disableWrite();
- } else {
- enableWrite();
- }
- return packetReceived;
}
@Override
@@ -263,21 +261,16 @@ public class ClientCnxnSocketNIO extends
sendThread.primeConnection();
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
- if (outgoingQueue.size() > 0) {
- // We have something to send so it's the same
- // as if we do the send now.
- updateLastSend();
- }
- if (doIO(pendingQueue, outgoingQueue)) {
- updateLastHeard();
- }
+ doIO(pendingQueue, outgoingQueue);
}
}
if (sendThread.getZkState().isConnected()) {
- if (outgoingQueue.size() > 0) {
- enableWrite();
- } else {
- disableWrite();
+ synchronized(outgoingQueue) {
+ if (!outgoingQueue.isEmpty()) {
+ enableWrite();
+ } else {
+ disableWrite();
+ }
}
}
selected.clear();
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java?rev=1146953&r1=1146952&r2=1146953&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java Fri Jul 15 04:10:45 2011
@@ -90,16 +90,22 @@ public class ZooKeeper {
private final ZKWatchManager watchManager = new ZKWatchManager();
List<String> getDataWatches() {
- List<String> rc = new ArrayList<String>(watchManager.dataWatches.keySet());
- return rc;
+ synchronized(watchManager.dataWatches) {
+ List<String> rc = new ArrayList<String>(watchManager.dataWatches.keySet());
+ return rc;
+ }
}
List<String> getExistWatches() {
- List<String> rc = new ArrayList<String>(watchManager.existWatches.keySet());
- return rc;
+ synchronized(watchManager.existWatches) {
+ List<String> rc = new ArrayList<String>(watchManager.existWatches.keySet());
+ return rc;
+ }
}
List<String> getChildWatches() {
- List<String> rc = new ArrayList<String>(watchManager.childWatches.keySet());
- return rc;
+ synchronized(watchManager.childWatches) {
+ List<String> rc = new ArrayList<String>(watchManager.childWatches.keySet());
+ return rc;
+ }
}
/**
@@ -138,27 +144,32 @@ public class ZooKeeper {
switch (type) {
case None:
result.add(defaultWatcher);
- for(Set<Watcher> ws: dataWatches.values()) {
- result.addAll(ws);
- }
- for(Set<Watcher> ws: existWatches.values()) {
- result.addAll(ws);
- }
- for(Set<Watcher> ws: childWatches.values()) {
- result.addAll(ws);
- }
+ boolean clear = ClientCnxn.getDisableAutoResetWatch() &&
+ state != Watcher.Event.KeeperState.SyncConnected;
- // clear the watches if auto watch reset is not enabled
- if (ClientCnxn.getDisableAutoResetWatch() &&
- state != Watcher.Event.KeeperState.SyncConnected)
- {
- synchronized(dataWatches) {
+ synchronized(dataWatches) {
+ for(Set<Watcher> ws: dataWatches.values()) {
+ result.addAll(ws);
+ }
+ if (clear) {
dataWatches.clear();
}
- synchronized(existWatches) {
+ }
+
+ synchronized(existWatches) {
+ for(Set<Watcher> ws: existWatches.values()) {
+ result.addAll(ws);
+ }
+ if (clear) {
existWatches.clear();
}
- synchronized(childWatches) {
+ }
+
+ synchronized(childWatches) {
+ for(Set<Watcher> ws: childWatches.values()) {
+ result.addAll(ws);
+ }
+ if (clear) {
childWatches.clear();
}
}