You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/06/23 19:55:39 UTC

svn commit: r787765 - in /incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net: SelectionKeyHandler.java TcpConnection.java UdpConnection.java http/HttpConnection.java

Author: jbellis
Date: Tue Jun 23 17:55:39 2009
New Revision: 787765

URL: http://svn.apache.org/viewvc?rev=787765&view=rev
Log:
Under heavy load and large column values, we still saw lockups in tcp connection. Here is the problem. The
following code that sets the interest ops seems innocent, but it's the source of the problem. The reason is
that this operation is not atomic. Another thread could sneak in between the reading of the ops and the
setting of it. As a result, some wrong bits could be set.
   key_.interestOps(key_.interestOps() | SelectionKey.OP_READ)

This is a sequence that demonstrates how we can lose the OP_READ bit forever and thus jam the read channel:
1. Thread 1: we want to write a message and in write(Message) we are about to turn on OP_WRITE because the message can't be written in one shot.
2. Thread 2: a read comes in and in read(SelectionKey), we turn off OP_READ and submit the read request to ReadWorkItem in Thread 3.
3. Thread 1: read interestOps and see OP_READ as off.
4. Thread 3: finished processing the read request and turn OP_READ on
5. Thread 1: resumes and turn on OP_WRITE. However, by doing that, we also turned off OP_READ. The read channel is thus blocked forever after this.

patch by Jun Rao; reviewed by jbellis for CASSANDRA-220

Modified:
    incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/SelectionKeyHandler.java
    incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/TcpConnection.java
    incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/UdpConnection.java
    incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/http/HttpConnection.java

Modified: incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/SelectionKeyHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/SelectionKeyHandler.java?rev=787765&r1=787764&r2=787765&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/SelectionKeyHandler.java (original)
+++ incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/SelectionKeyHandler.java Tue Jun 23 17:55:39 2009
@@ -65,4 +65,20 @@
     {
         throw new UnsupportedOperationException("write() cannot be called on " + getClass().getName() + "!");
     }
+    
+    protected static void turnOnInterestOps(SelectionKey key, int ops)
+    {
+        synchronized(key)
+        {
+            key.interestOps(key.interestOps() | ops);
+        }
+    }
+    
+    protected static void turnOffInterestOps(SelectionKey key, int ops)
+    {
+        synchronized(key)
+        {
+            key.interestOps(key.interestOps() & (~ops) );
+        }
+    }
 }

Modified: incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/TcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/TcpConnection.java?rev=787765&r1=787764&r2=787765&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/TcpConnection.java (original)
+++ incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/TcpConnection.java Tue Jun 23 17:55:39 2009
@@ -183,7 +183,7 @@
                 if (buffer.remaining() > 0) 
                 {                   
                     pendingWrites_.add(buffer);
-                    key_.interestOps(key_.interestOps() | SelectionKey.OP_WRITE);
+                    turnOnInterestOps(key_, SelectionKey.OP_WRITE);
                 }
             }
         }
@@ -229,7 +229,7 @@
                     if (buffer.remaining() > 0)
                     {
                         pendingWrites_.add(buffer);
-                        key_.interestOps(key_.interestOps() | SelectionKey.OP_WRITE);
+                        turnOnInterestOps(key_, SelectionKey.OP_WRITE);
                         condition_.await();
                     }
                 }
@@ -245,7 +245,7 @@
                 */
                 if ( bytesTransferred < limit && bytesWritten != total )
                 {                    
-                    key_.interestOps(key_.interestOps() | SelectionKey.OP_WRITE);
+                    turnOnInterestOps(key_, SelectionKey.OP_WRITE);
                     condition_.await();
                 }
             }
@@ -346,17 +346,20 @@
     // called in the selector thread
     public void connect(SelectionKey key)
     {       
-        key.interestOps(key.interestOps() & (~SelectionKey.OP_CONNECT));
+        turnOffInterestOps(key, SelectionKey.OP_CONNECT);
         try
         {
             if (socketChannel_.finishConnect())
             {
-                key.interestOps(key.interestOps() | SelectionKey.OP_READ);
+                turnOnInterestOps(key, SelectionKey.OP_READ);
                 
-                // this will flush the pending                
-                if (!pendingWrites_.isEmpty()) 
+                synchronized(this)
                 {
-                    key_.interestOps(key_.interestOps() | SelectionKey.OP_WRITE);
+                    // this will flush the pending                
+                    if (!pendingWrites_.isEmpty()) 
+                    {
+                        turnOnInterestOps(key_, SelectionKey.OP_WRITE);
+                    }
                 }
                 resumeStreaming();
             } 
@@ -376,7 +379,7 @@
     // called in the selector thread
     public void write(SelectionKey key)
     {   
-        key.interestOps( key.interestOps() & ( ~SelectionKey.OP_WRITE ) );                
+        turnOffInterestOps(key, SelectionKey.OP_WRITE);                
         doPendingWrites();
         /*
          * This is executed only if we are in streaming mode.
@@ -415,7 +418,7 @@
             {    
                 if (!pendingWrites_.isEmpty())
                 {                    
-                    key_.interestOps(key_.interestOps() | SelectionKey.OP_WRITE);
+                    turnOnInterestOps(key_, SelectionKey.OP_WRITE);
                 }
             }
         }
@@ -424,7 +427,7 @@
     // called in the selector thread
     public void read(SelectionKey key)
     {
-        key.interestOps( key.interestOps() & ( ~SelectionKey.OP_READ ) );
+        turnOffInterestOps(key, SelectionKey.OP_READ);
         // publish this event onto to the TCPReadEvent Queue.
         MessagingService.getReadExecutor().execute(readWork_);
     }
@@ -486,7 +489,7 @@
             }
             finally
             {
-                key_.interestOps(key_.interestOps() | SelectionKey.OP_READ);
+                turnOnInterestOps(key_, SelectionKey.OP_READ);
             }
         }
         

Modified: incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/UdpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/UdpConnection.java?rev=787765&r1=787764&r2=787765&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/UdpConnection.java (original)
+++ incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/UdpConnection.java Tue Jun 23 17:55:39 2009
@@ -131,7 +131,7 @@
     
     public void read(SelectionKey key)
     {        
-        key.interestOps( key.interestOps() & (~SelectionKey.OP_READ) );
+        turnOffInterestOps(key, SelectionKey.OP_READ);
         ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
         try
         {
@@ -160,7 +160,7 @@
         }
         finally
         {
-            key.interestOps( key_.interestOps() | SelectionKey.OP_READ );
+            turnOnInterestOps(key_, SelectionKey.OP_READ );
         }
     }
 }

Modified: incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/http/HttpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/http/HttpConnection.java?rev=787765&r1=787764&r2=787765&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/http/HttpConnection.java (original)
+++ incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/http/HttpConnection.java Tue Jun 23 17:55:39 2009
@@ -136,7 +136,7 @@
             httpChannel_ = (SocketChannel)key.channel();
         }
         /* deregister interest for read */
-        key.interestOps( key.interestOps() & ( ~SelectionKey.OP_READ ) );
+        turnOffInterestOps(key, SelectionKey.OP_READ);
         /* Add a task to process the HTTP request */
         MessagingService.getReadExecutor().execute(httpReader_);
     }
@@ -330,7 +330,7 @@
         }
         finally
         {
-            httpKey_.interestOps(httpKey_.interestOps() | SelectionKey.OP_READ);
+            turnOnInterestOps(httpKey_, SelectionKey.OP_READ);
         }
     }