You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/04/28 23:15:44 UTC

svn commit: r769542 - in /incubator/cassandra/trunk/src/org/apache/cassandra/net: SelectorManager.java TcpConnection.java

Author: jbellis
Date: Tue Apr 28 21:15:44 2009
New Revision: 769542

URL: http://svn.apache.org/viewvc?rev=769542&view=rev
Log:
renaming and final cleanup.  patch by jbellis; reviewed by Eric Evans for #97

Modified:
    incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectorManager.java
    incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnection.java

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectorManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectorManager.java?rev=769542&r1=769541&r2=769542&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectorManager.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectorManager.java Tue Apr 28 21:15:44 2009
@@ -25,22 +25,18 @@
 
 import org.apache.log4j.Logger;
 
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
 public class SelectorManager extends Thread
 {
-    private static final Logger logger_ = Logger.getLogger(SelectorManager.class); 
+    private static final Logger logger = Logger.getLogger(SelectorManager.class); 
 
     // the underlying selector used
-    protected Selector selector_;
+    protected Selector selector;
 
     // The static selector manager which is used by all applications
-    private static SelectorManager manager_;
+    private static SelectorManager manager;
     
     // The static UDP selector manager which is used by all applications
-    private static SelectorManager udpManager_;
+    private static SelectorManager udpManager;
 
     private SelectorManager(String name)
     {
@@ -48,7 +44,7 @@
 
         try
         {
-            selector_ = Selector.open();
+            selector = Selector.open();
         }
         catch (IOException e)
         {
@@ -71,7 +67,7 @@
      * @param ops
      *            The initial interest operations
      * @return The SelectionKey which uniquely identifies this channel
-     * @exception IOException
+     * @exception IOException if the channel is closed
      */
     public SelectionKey register(SelectableChannel channel,
             SelectionKeyHandler handler, int ops) throws IOException
@@ -81,9 +77,7 @@
             throw new NullPointerException();
         }
 
-        SelectionKey key = channel.register(selector_, ops, handler);
-        selector_.wakeup();
-        return key;
+        return channel.register(selector, ops, handler);
     }      
 
     /**
@@ -96,7 +90,7 @@
         {
             try
             {
-                selector_.select(1000);
+                selector.select(100);
                 doProcess();
             }
             catch (IOException e)
@@ -108,48 +102,42 @@
 
     protected void doProcess() throws IOException
     {
-        SelectionKey[] keys = selector_.selectedKeys().toArray(new SelectionKey[0]);
+        SelectionKey[] keys = selector.selectedKeys().toArray(new SelectionKey[0]);
 
-        for (int i = 0; i < keys.length; i++)
+        for (SelectionKey key : keys)
         {
-            selector_.selectedKeys().remove(keys[i]);
+            selector.selectedKeys().remove(key);
 
-            synchronized (keys[i])
+            synchronized (key)
             {
-                SelectionKeyHandler skh = (SelectionKeyHandler) keys[i]
-                        .attachment();
+                SelectionKeyHandler skh = (SelectionKeyHandler) key.attachment();
 
                 if (skh != null)
                 {
                     // accept
-                    if (keys[i].isValid() && keys[i].isAcceptable())
+                    if (key.isValid() && key.isAcceptable())
                     {
-                        skh.accept(keys[i]);
+                        skh.accept(key);
                     }
 
                     // connect
-                    if (keys[i].isValid() && keys[i].isConnectable())
+                    if (key.isValid() && key.isConnectable())
                     {
-                        skh.connect(keys[i]);
+                        skh.connect(key);
                     }
 
                     // read
-                    if (keys[i].isValid() && keys[i].isReadable())
+                    if (key.isValid() && key.isReadable())
                     {
-                        skh.read(keys[i]);
+                        skh.read(key);
                     }
 
                     // write
-                    if (keys[i].isValid() && keys[i].isWritable())
+                    if (key.isValid() && key.isWritable())
                     {
-                        skh.write(keys[i]);
+                        skh.write(key);
                     }
                 }
-                else
-                {
-                    keys[i].channel().close();
-                    keys[i].cancel();
-                }
             }
         }
     }
@@ -163,23 +151,23 @@
     {
         synchronized (SelectorManager.class)
         {
-            if (manager_ == null)
+            if (manager == null)
             {
-                manager_ = new SelectorManager("TCP Selector Manager");
+                manager = new SelectorManager("TCP Selector Manager");
             }            
         }
-        return manager_;
+        return manager;
     }
     
     public static SelectorManager getUdpSelectorManager()
     {
         synchronized (SelectorManager.class)
         {
-            if (udpManager_ == null)
+            if (udpManager == null)
             {
-                udpManager_ = new SelectorManager("UDP Selector Manager");
+                udpManager = new SelectorManager("UDP Selector Manager");
             }            
         }
-        return udpManager_;
+        return udpManager;
     }
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnection.java?rev=769542&r1=769541&r2=769542&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnection.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnection.java Tue Apr 28 21:15:44 2009
@@ -61,7 +61,6 @@
     private TcpReader tcpReader_;    
     private ReadWorkItem readWork_ = new ReadWorkItem(); 
     private List<ByteBuffer> pendingWrites_ = new Vector<ByteBuffer>();  
-    private AtomicBoolean connected_ = new AtomicBoolean(false);    
     private EndPoint localEp_;
     private EndPoint remoteEp_;
     boolean inUse_ = false;
@@ -94,8 +93,7 @@
         else
         {
             key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
-            connected_.set(true);     
-        }         
+        }
     }
     
     /*
@@ -116,8 +114,7 @@
         else
         {
             key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
-            connected_.set(true);     
-        }        
+        }
         bStream_ = true;
         lock_ = new ReentrantLock();
         condition_ = lock_.newCondition();
@@ -144,8 +141,7 @@
         socketChannel_ = socketChannel;
         socketChannel_.configureBlocking(false);                           
         isIncoming_ = isIncoming;
-        connected_.set(true);       
-        localEp_ = localEp;           
+        localEp_ = localEp;
     }
     
     EndPoint getLocalEp()
@@ -182,7 +178,7 @@
             ByteBuffer buffer = MessagingService.packIt( data , false, false, listening);   
             synchronized(this)
             {
-                if (!pendingWrites_.isEmpty() || !connected_.get()) 
+                if (!pendingWrites_.isEmpty() || !socketChannel_.isConnected())
                 {                     
                     pendingWrites_.add(buffer);                
                     return;
@@ -223,7 +219,7 @@
             */
             long waitTime = 2;
             int retry = 0;
-            while ( !connected_.get() )
+            while (!socketChannel_.isConnected())
             {
                 if ( retry == 3 )
                     throw new IOException("Unable to connect to " + remoteEp_ + " after " + retry + " attempts.");
@@ -392,7 +388,6 @@
             if (socketChannel_.finishConnect())
             {
                 key.interestOps(key.interestOps() | SelectionKey.OP_READ);
-                connected_.set(true);
                 
                 // this will flush the pending                
                 if (!pendingWrites_.isEmpty())