You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/04/28 23:15:38 UTC

svn commit: r769541 - in /incubator/cassandra/trunk/src/org/apache/cassandra/net: MessagingService.java SelectionKeyHandler.java SelectorManager.java TcpConnection.java http/HttpConnection.java

Author: jbellis
Date: Tue Apr 28 21:15:38 2009
New Revision: 769541

URL: http://svn.apache.org/viewvc?rev=769541&view=rev
Log:
r/m unnecessary & race-prone cancelled/read/write key sets (a main reason to use
select is that select does this for you!).
patch by jbellis; reviewed by Eric Evans for #97

Modified:
    incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingService.java
    incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectionKeyHandler.java
    incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectorManager.java
    incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnection.java
    incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpConnection.java

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingService.java?rev=769541&r1=769540&r2=769541&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingService.java Tue Apr 28 21:15:38 2009
@@ -547,7 +547,12 @@
             /* Stop listening on any socket */            
             for( SelectionKey skey : listenSockets_.values() )
             {
-                SelectorManager.getSelectorManager().cancel(skey);
+                skey.cancel();
+                try
+                {
+                    skey.channel().close();
+                }
+                catch (IOException e) {}
             }
             listenSockets_.clear();
             

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectionKeyHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectionKeyHandler.java?rev=769541&r1=769540&r2=769541&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectionKeyHandler.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectionKeyHandler.java Tue Apr 28 21:15:38 2009
@@ -26,21 +26,6 @@
 
 public class SelectionKeyHandler 
 {
-    public void modifyKey(SelectionKey key)
-    {
-        throw new UnsupportedOperationException("modifyKey() cannot be called on " + getClass().getName() + "!");
-    }
-    
-    public void modifyKeyForRead(SelectionKey key)
-    {
-        throw new UnsupportedOperationException("modifyKeyForRead() cannot be called on " + getClass().getName() + "!");
-    }
-    
-    public void modifyKeyForWrite(SelectionKey key)
-    {
-        throw new UnsupportedOperationException("modifyKeyForWrite() cannot be called on " + getClass().getName() + "!");
-    }
-    
     /**
      * Method which is called when the key becomes acceptable.
      *

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectorManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectorManager.java?rev=769541&r1=769540&r2=769541&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectorManager.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectorManager.java Tue Apr 28 21:15:38 2009
@@ -22,9 +22,6 @@
 import java.nio.channels.SelectableChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
 
 import org.apache.log4j.Logger;
 
@@ -39,12 +36,6 @@
     // the underlying selector used
     protected Selector selector_;
 
-    protected HashSet<SelectionKey> modifyKeysForRead_;
-    protected HashSet<SelectionKey> modifyKeysForWrite_;
-    
-    // the list of keys waiting to be cancelled
-    protected HashSet<SelectionKey> cancelledKeys_;    
-
     // The static selector manager which is used by all applications
     private static SelectorManager manager_;
     
@@ -53,10 +44,7 @@
 
     private SelectorManager(String name)
     {
-        super(name);                        
-        this.modifyKeysForRead_ = new HashSet<SelectionKey>();
-        this.modifyKeysForWrite_ = new HashSet<SelectionKey>();
-        this.cancelledKeys_ = new HashSet<SelectionKey>();
+        super(name);
 
         try
         {
@@ -72,28 +60,6 @@
     }
 
     /**
-     * Method which asks the Selector Manager to add the given key to the
-     * cancelled set. If noone calls register on this key during the rest of
-     * this select() operation, the key will be cancelled. Otherwise, it will be
-     * returned as a result of the register operation.
-     * 
-     * @param key
-     *            The key to cancel
-     */
-    public void cancel(SelectionKey key)
-    {
-        if (key == null)
-        {
-            throw new NullPointerException();
-        }
-
-        synchronized ( cancelledKeys_ )
-        {
-            cancelledKeys_.add(key);
-        }
-    }
-
-    /**
      * Registers a new channel with the selector, and attaches the given
      * SelectionKeyHandler as the handler for the newly created key. Operations
      * which the hanlder is interested in will be called as available.
@@ -115,43 +81,10 @@
             throw new NullPointerException();
         }
 
-        selector_.wakeup();
         SelectionKey key = channel.register(selector_, ops, handler);
-        synchronized(cancelledKeys_)
-        {
-            cancelledKeys_.remove(key);
-        }
         selector_.wakeup();
         return key;
     }      
-    
-    public void modifyKeyForRead(SelectionKey key)
-    {
-        if (key == null)
-        {
-            throw new NullPointerException();
-        }
-
-        synchronized(modifyKeysForRead_)
-        {
-            modifyKeysForRead_.add(key);
-        }
-        selector_.wakeup();
-    }
-    
-    public void modifyKeyForWrite(SelectionKey key)
-    {
-        if (key == null)
-        {
-            throw new NullPointerException();
-        }
-
-        synchronized( modifyKeysForWrite_ )
-        {
-            modifyKeysForWrite_.add(key);
-        }
-        selector_.wakeup();
-    }
 
     /**
      * This method starts the socket manager listening for events. It is
@@ -159,55 +92,22 @@
      */
     public void run()
     {
-        try
-        {              
-            // loop while waiting for activity
-            while (true && !Thread.currentThread().interrupted() )
-            { 
-                try
-                {
-                    doProcess();                                                                          
-                    selector_.select(1000); 
-                    synchronized( cancelledKeys_ )
-                    {
-                        if (cancelledKeys_.size() > 0)
-                        {
-                            SelectionKey[] keys = cancelledKeys_.toArray( new SelectionKey[0]);                        
-                            
-                            for ( SelectionKey key : keys )
-                            {
-                                key.cancel();
-                                key.channel().close();
-                            }                                                
-                            cancelledKeys_.clear();
-                        }
-                    }
-                }
-                catch ( IOException e )
-                {
-                    logger_.warn(LogUtil.throwableToString(e));
-                }
-            }
-                         
-            manager_ = null;
-        }
-        catch (Throwable t)
+        while (true)
         {
-            logger_.error("ERROR (SelectorManager.run): " + t);
-            logger_.error(LogUtil.throwableToString(t));
-            System.exit(-1);
+            try
+            {
+                selector_.select(1000);
+                doProcess();
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
         }
-    }    
+    }
 
     protected void doProcess() throws IOException
     {
-        doInvocationsForRead();
-        doInvocationsForWrite();
-        doSelections();
-    }
-    
-    protected void doSelections() throws IOException
-    {
         SelectionKey[] keys = selector_.selectedKeys().toArray(new SelectionKey[0]);
 
         for (int i = 0; i < keys.length; i++)
@@ -253,44 +153,6 @@
             }
         }
     }
-    
-    private void doInvocationsForRead()
-    {
-        Iterator<SelectionKey> it;
-        synchronized (modifyKeysForRead_)
-        {
-            it = new ArrayList<SelectionKey>(modifyKeysForRead_).iterator();
-            modifyKeysForRead_.clear();
-        }
-
-        while (it.hasNext())
-        {
-            SelectionKey key = it.next();
-            if (key.isValid() && (key.attachment() != null))
-            {
-                ((SelectionKeyHandler) key.attachment()).modifyKeyForRead(key);
-            }
-        }
-    }
-    
-    private void doInvocationsForWrite()
-    {
-        Iterator<SelectionKey> it;
-        synchronized (modifyKeysForWrite_)
-        {
-            it = new ArrayList<SelectionKey>(modifyKeysForWrite_).iterator();
-            modifyKeysForWrite_.clear();
-        }
-
-        while (it.hasNext())
-        {
-            SelectionKey key = it.next();
-            if (key.isValid() && (key.attachment() != null))
-            {
-                ((SelectionKeyHandler) key.attachment()).modifyKeyForWrite(key);
-            }
-        }
-    }
 
     /**
      * Returns the SelectorManager applications should use.

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnection.java?rev=769541&r1=769540&r2=769541&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnection.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnection.java Tue Apr 28 21:15:38 2009
@@ -194,10 +194,7 @@
                 if (buffer.remaining() > 0) 
                 {                   
                     pendingWrites_.add(buffer);
-                    if ((key_.interestOps() & SelectionKey.OP_WRITE) == 0)
-                    {                                    
-                        SelectorManager.getSelectorManager().modifyKeyForWrite(key_);                     
-                    }
+                    key_.interestOps(key_.interestOps() | SelectionKey.OP_WRITE);
                 }
             }
         }
@@ -254,10 +251,7 @@
                 */
                 if ( bytesTransferred < limit && bytesWritten != total )
                 {                    
-                    if ((key_.interestOps() & SelectionKey.OP_WRITE) == 0)
-                    {                    
-                        SelectorManager.getSelectorManager().modifyKeyForWrite(key_);                     
-                    }
+                    key_.interestOps(key_.interestOps() | SelectionKey.OP_WRITE);
                     waitToContinueStreaming();
                 }
             }
@@ -273,10 +267,7 @@
         if (buffer.remaining() > 0) 
         {            
             pendingWrites_.add(buffer);
-            if ((key_.interestOps() & SelectionKey.OP_WRITE) == 0)
-            {                    
-                SelectorManager.getSelectorManager().modifyKeyForWrite(key_);                     
-            }
+            key_.interestOps(key_.interestOps() | SelectionKey.OP_WRITE);
             waitToContinueStreaming();
         }     
     }
@@ -382,25 +373,32 @@
     private void cancel(SelectionKey key)
     {
         if ( key != null )
-            SelectorManager.getSelectorManager().cancel(key);
+        {
+            key.cancel();
+            try
+            {
+                key.channel().close();
+            }
+            catch (IOException e) {}
+        }
     }
     
     // called in the selector thread
     public void connect(SelectionKey key)
     {       
-        key.interestOps(key.interestOps() & (~SelectionKey.OP_CONNECT));        
+        key.interestOps(key.interestOps() & (~SelectionKey.OP_CONNECT));
         try
         {
             if (socketChannel_.finishConnect())
-            {                                
-                SelectorManager.getSelectorManager().modifyKeyForRead(key);
-                connected_.set(true);                
+            {
+                key.interestOps(key.interestOps() | SelectionKey.OP_READ);
+                connected_.set(true);
                 
                 // this will flush the pending                
                 if (!pendingWrites_.isEmpty()) 
-                {                    
-                    SelectorManager.getSelectorManager().modifyKeyForWrite(key_);  
-                } 
+                {
+                    key_.interestOps(key_.interestOps() | SelectionKey.OP_WRITE);
+                }
                 resumeStreaming();
             } 
             else 
@@ -457,32 +455,22 @@
         {    
             synchronized(this)
             {
-                if (!pendingWrites_.isEmpty() && (key_.interestOps() & SelectionKey.OP_WRITE) == 0)
+                if (!pendingWrites_.isEmpty())
                 {                    
-                    SelectorManager.getSelectorManager().modifyKeyForWrite(key_); 
-                }  
+                    key_.interestOps(key_.interestOps() | SelectionKey.OP_WRITE);
+                }
             }
         }
     }
     
     // called in the selector thread
     public void read(SelectionKey key)
-    {          
-        key.interestOps( key.interestOps() & ( ~SelectionKey.OP_READ ) );         
+    {
+        key.interestOps( key.interestOps() & ( ~SelectionKey.OP_READ ) );
         // publish this event onto to the TCPReadEvent Queue.
         MessagingService.getReadExecutor().execute(readWork_);
     }
     
-    public void modifyKeyForRead(SelectionKey key)
-    {
-        key.interestOps( key_.interestOps() | SelectionKey.OP_READ );
-    }
-    
-    public void modifyKeyForWrite(SelectionKey key)
-    {        
-        key.interestOps( key_.interestOps() | SelectionKey.OP_WRITE );
-    }
-    
     class ReadWorkItem implements Runnable
     {                 
         // called from the TCP READ thread pool
@@ -539,8 +527,8 @@
                 handleException(th);
             }
             finally
-            {                                     
-                SelectorManager.getSelectorManager().modifyKeyForRead(key_);                
+            {
+                key_.interestOps(key_.interestOps() | SelectionKey.OP_READ);
             }
         }
         

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpConnection.java?rev=769541&r1=769540&r2=769541&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpConnection.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpConnection.java Tue Apr 28 21:15:38 2009
@@ -165,12 +165,6 @@
         /* Add a task to process the HTTP request */
         MessagingService.getReadExecutor().execute(httpReader_);
     }
-
-    public void modifyKeyForRead(SelectionKey key)
-    {
-        key.interestOps( httpKey_.interestOps() | SelectionKey.OP_READ );
-    }
-
     private void resetParserState()
     {
         startLineParser_.resetParserState();
@@ -187,7 +181,14 @@
     {        
         logger_.info("Closing HTTP socket ...");
         if ( httpKey_ != null )
-            SelectorManager.getSelectorManager().cancel(httpKey_);
+        {
+            httpKey_.cancel();
+            try
+            {
+                httpKey_.channel().close();
+            }
+            catch (IOException e) {}
+        }
     }
 
     /*
@@ -356,7 +357,7 @@
         }
         finally
         {
-            SelectorManager.getSelectorManager().modifyKeyForRead(httpKey_);
+            httpKey_.interestOps(httpKey_.interestOps() | SelectionKey.OP_READ);
         }
     }