You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by br...@apache.org on 2011/07/15 06:10:46 UTC

svn commit: r1146953 - in /zookeeper/trunk: CHANGES.txt src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java src/java/main/org/apache/zookeeper/ZooKeeper.java

Author: breed
Date: Fri Jul 15 04:10:45 2011
New Revision: 1146953

URL: http://svn.apache.org/viewvc?rev=1146953&view=rev
Log:
ZOOKEEPER-1063. Dubious synchronization in Zookeeper and ClientCnxnSocketNIO classes

Modified:
    zookeeper/trunk/CHANGES.txt
    zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java

Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1146953&r1=1146952&r2=1146953&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Fri Jul 15 04:10:45 2011
@@ -247,6 +247,8 @@ BUGFIXES: 
 
   ZOOKEEPER-782. Incorrect C API documentation for Watches. (mahadev via breed)
 
+  ZOOKEEPER-1063. Dubious synchronization in Zookeeper and ClientCnxnSocketNIO classes (Yanick Dufresne via breed)
+
 IMPROVEMENTS:
   ZOOKEEPER-724. Improve junit test integration - log harness information 
   (phunt via mahadev)

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java?rev=1146953&r1=1146952&r2=1146953&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java Fri Jul 15 04:10:45 2011
@@ -58,8 +58,7 @@ public class ClientCnxnSocketNIO extends
      * @throws InterruptedException
      * @throws IOException
      */
-    boolean doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue) throws InterruptedException, IOException {
-        boolean packetReceived = false;
+    void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue) throws InterruptedException, IOException {
         SocketChannel sock = (SocketChannel) sockKey.channel();
         if (sock == null) {
             throw new IOException("Socket is null!");
@@ -85,19 +84,21 @@ public class ClientCnxnSocketNIO extends
                     }
                     lenBuffer.clear();
                     incomingBuffer = lenBuffer;
-                    packetReceived = true;
+                    updateLastHeard();
                     initialized = true;
                 } else {
                     sendThread.readResponse(incomingBuffer);
                     lenBuffer.clear();
                     incomingBuffer = lenBuffer;
-                    packetReceived = true;
+                    updateLastHeard();
                 }
             }
         }
         if (sockKey.isWritable()) {
+            LinkedList<Packet> pending = new LinkedList<Packet>();
             synchronized (outgoingQueue) {
                 if (!outgoingQueue.isEmpty()) {
+                    updateLastSend();
                     ByteBuffer pbb = outgoingQueue.getFirst().bb;
                     sock.write(pbb);
                     if (!pbb.hasRemaining()) {
@@ -106,18 +107,15 @@ public class ClientCnxnSocketNIO extends
                         if (p.requestHeader != null
                                 && p.requestHeader.getType() != OpCode.ping
                                 && p.requestHeader.getType() != OpCode.auth) {
-                            pendingQueue.add(p);
+                            pending.add(p);
                         }
                     }
                 }
             }
+            synchronized(pendingQueue) {
+                pendingQueue.addAll(pending);
+            }
         }
-        if (outgoingQueue.isEmpty()) {
-            disableWrite();
-        } else {
-            enableWrite();
-        }
-        return packetReceived;
     }
 
     @Override
@@ -263,21 +261,16 @@ public class ClientCnxnSocketNIO extends
                     sendThread.primeConnection();
                 }
             } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
-                if (outgoingQueue.size() > 0) {
-                    // We have something to send so it's the same
-                    // as if we do the send now.
-                    updateLastSend();
-                }
-                if (doIO(pendingQueue, outgoingQueue)) {
-                    updateLastHeard();
-                }
+                doIO(pendingQueue, outgoingQueue);
             }
         }
         if (sendThread.getZkState().isConnected()) {
-            if (outgoingQueue.size() > 0) {
-                enableWrite();
-            } else {
-                disableWrite();
+            synchronized(outgoingQueue) {
+                if (!outgoingQueue.isEmpty()) {
+                    enableWrite();
+                } else {
+                    disableWrite();
+                }
             }
         }
         selected.clear();

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java?rev=1146953&r1=1146952&r2=1146953&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java Fri Jul 15 04:10:45 2011
@@ -90,16 +90,22 @@ public class ZooKeeper {
     private final ZKWatchManager watchManager = new ZKWatchManager();
 
     List<String> getDataWatches() {
-        List<String> rc = new ArrayList<String>(watchManager.dataWatches.keySet());
-        return rc;
+        synchronized(watchManager.dataWatches) {
+            List<String> rc = new ArrayList<String>(watchManager.dataWatches.keySet());
+            return rc;
+        }
     }
     List<String> getExistWatches() {
-        List<String> rc =  new ArrayList<String>(watchManager.existWatches.keySet());
-        return rc;
+        synchronized(watchManager.existWatches) {
+            List<String> rc =  new ArrayList<String>(watchManager.existWatches.keySet());
+            return rc;
+        }
     }
     List<String> getChildWatches() {
-        List<String> rc = new ArrayList<String>(watchManager.childWatches.keySet());
-        return rc;
+        synchronized(watchManager.childWatches) {
+            List<String> rc = new ArrayList<String>(watchManager.childWatches.keySet());
+            return rc;
+        }
     }
 
 /**
@@ -138,27 +144,32 @@ public class ZooKeeper {
             switch (type) {
             case None:
                 result.add(defaultWatcher);
-                for(Set<Watcher> ws: dataWatches.values()) {
-                    result.addAll(ws);
-                }
-                for(Set<Watcher> ws: existWatches.values()) {
-                    result.addAll(ws);
-                }
-                for(Set<Watcher> ws: childWatches.values()) {
-                    result.addAll(ws);
-                }
+                boolean clear = ClientCnxn.getDisableAutoResetWatch() &&
+                        state != Watcher.Event.KeeperState.SyncConnected;
 
-                // clear the watches if auto watch reset is not enabled
-                if (ClientCnxn.getDisableAutoResetWatch() &&
-                        state != Watcher.Event.KeeperState.SyncConnected)
-                {
-                    synchronized(dataWatches) {
+                synchronized(dataWatches) {
+                    for(Set<Watcher> ws: dataWatches.values()) {
+                        result.addAll(ws);
+                    }
+                    if (clear) {
                         dataWatches.clear();
                     }
-                    synchronized(existWatches) {
+                }
+
+                synchronized(existWatches) {
+                    for(Set<Watcher> ws: existWatches.values()) {
+                        result.addAll(ws);
+                    }
+                    if (clear) {
                         existWatches.clear();
                     }
-                    synchronized(childWatches) {
+                }
+
+                synchronized(childWatches) {
+                    for(Set<Watcher> ws: childWatches.values()) {
+                        result.addAll(ws);
+                    }
+                    if (clear) {
                         childWatches.clear();
                     }
                 }