You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/04/28 23:15:38 UTC
svn commit: r769541 - in
/incubator/cassandra/trunk/src/org/apache/cassandra/net:
MessagingService.java SelectionKeyHandler.java SelectorManager.java
TcpConnection.java http/HttpConnection.java
Author: jbellis
Date: Tue Apr 28 21:15:38 2009
New Revision: 769541
URL: http://svn.apache.org/viewvc?rev=769541&view=rev
Log:
r/m unnecessary & race-prone cancelled/read/write key sets (a main reason to use
select is that select does this for you!).
patch by jbellis; reviewed by Eric Evans for #97
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingService.java
incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectionKeyHandler.java
incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectorManager.java
incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnection.java
incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpConnection.java
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingService.java?rev=769541&r1=769540&r2=769541&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingService.java Tue Apr 28 21:15:38 2009
@@ -547,7 +547,12 @@
/* Stop listening on any socket */
for( SelectionKey skey : listenSockets_.values() )
{
- SelectorManager.getSelectorManager().cancel(skey);
+ skey.cancel();
+ try
+ {
+ skey.channel().close();
+ }
+ catch (IOException e) {}
}
listenSockets_.clear();
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectionKeyHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectionKeyHandler.java?rev=769541&r1=769540&r2=769541&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectionKeyHandler.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectionKeyHandler.java Tue Apr 28 21:15:38 2009
@@ -26,21 +26,6 @@
public class SelectionKeyHandler
{
- public void modifyKey(SelectionKey key)
- {
- throw new UnsupportedOperationException("modifyKey() cannot be called on " + getClass().getName() + "!");
- }
-
- public void modifyKeyForRead(SelectionKey key)
- {
- throw new UnsupportedOperationException("modifyKeyForRead() cannot be called on " + getClass().getName() + "!");
- }
-
- public void modifyKeyForWrite(SelectionKey key)
- {
- throw new UnsupportedOperationException("modifyKeyForWrite() cannot be called on " + getClass().getName() + "!");
- }
-
/**
* Method which is called when the key becomes acceptable.
*
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectorManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectorManager.java?rev=769541&r1=769540&r2=769541&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectorManager.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectorManager.java Tue Apr 28 21:15:38 2009
@@ -22,9 +22,6 @@
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
import org.apache.log4j.Logger;
@@ -39,12 +36,6 @@
// the underlying selector used
protected Selector selector_;
- protected HashSet<SelectionKey> modifyKeysForRead_;
- protected HashSet<SelectionKey> modifyKeysForWrite_;
-
- // the list of keys waiting to be cancelled
- protected HashSet<SelectionKey> cancelledKeys_;
-
// The static selector manager which is used by all applications
private static SelectorManager manager_;
@@ -53,10 +44,7 @@
private SelectorManager(String name)
{
- super(name);
- this.modifyKeysForRead_ = new HashSet<SelectionKey>();
- this.modifyKeysForWrite_ = new HashSet<SelectionKey>();
- this.cancelledKeys_ = new HashSet<SelectionKey>();
+ super(name);
try
{
@@ -72,28 +60,6 @@
}
/**
- * Method which asks the Selector Manager to add the given key to the
- * cancelled set. If noone calls register on this key during the rest of
- * this select() operation, the key will be cancelled. Otherwise, it will be
- * returned as a result of the register operation.
- *
- * @param key
- * The key to cancel
- */
- public void cancel(SelectionKey key)
- {
- if (key == null)
- {
- throw new NullPointerException();
- }
-
- synchronized ( cancelledKeys_ )
- {
- cancelledKeys_.add(key);
- }
- }
-
- /**
* Registers a new channel with the selector, and attaches the given
* SelectionKeyHandler as the handler for the newly created key. Operations
* which the hanlder is interested in will be called as available.
@@ -115,43 +81,10 @@
throw new NullPointerException();
}
- selector_.wakeup();
SelectionKey key = channel.register(selector_, ops, handler);
- synchronized(cancelledKeys_)
- {
- cancelledKeys_.remove(key);
- }
selector_.wakeup();
return key;
}
-
- public void modifyKeyForRead(SelectionKey key)
- {
- if (key == null)
- {
- throw new NullPointerException();
- }
-
- synchronized(modifyKeysForRead_)
- {
- modifyKeysForRead_.add(key);
- }
- selector_.wakeup();
- }
-
- public void modifyKeyForWrite(SelectionKey key)
- {
- if (key == null)
- {
- throw new NullPointerException();
- }
-
- synchronized( modifyKeysForWrite_ )
- {
- modifyKeysForWrite_.add(key);
- }
- selector_.wakeup();
- }
/**
* This method starts the socket manager listening for events. It is
@@ -159,55 +92,22 @@
*/
public void run()
{
- try
- {
- // loop while waiting for activity
- while (true && !Thread.currentThread().interrupted() )
- {
- try
- {
- doProcess();
- selector_.select(1000);
- synchronized( cancelledKeys_ )
- {
- if (cancelledKeys_.size() > 0)
- {
- SelectionKey[] keys = cancelledKeys_.toArray( new SelectionKey[0]);
-
- for ( SelectionKey key : keys )
- {
- key.cancel();
- key.channel().close();
- }
- cancelledKeys_.clear();
- }
- }
- }
- catch ( IOException e )
- {
- logger_.warn(LogUtil.throwableToString(e));
- }
- }
-
- manager_ = null;
- }
- catch (Throwable t)
+ while (true)
{
- logger_.error("ERROR (SelectorManager.run): " + t);
- logger_.error(LogUtil.throwableToString(t));
- System.exit(-1);
+ try
+ {
+ selector_.select(1000);
+ doProcess();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
}
- }
+ }
protected void doProcess() throws IOException
{
- doInvocationsForRead();
- doInvocationsForWrite();
- doSelections();
- }
-
- protected void doSelections() throws IOException
- {
SelectionKey[] keys = selector_.selectedKeys().toArray(new SelectionKey[0]);
for (int i = 0; i < keys.length; i++)
@@ -253,44 +153,6 @@
}
}
}
-
- private void doInvocationsForRead()
- {
- Iterator<SelectionKey> it;
- synchronized (modifyKeysForRead_)
- {
- it = new ArrayList<SelectionKey>(modifyKeysForRead_).iterator();
- modifyKeysForRead_.clear();
- }
-
- while (it.hasNext())
- {
- SelectionKey key = it.next();
- if (key.isValid() && (key.attachment() != null))
- {
- ((SelectionKeyHandler) key.attachment()).modifyKeyForRead(key);
- }
- }
- }
-
- private void doInvocationsForWrite()
- {
- Iterator<SelectionKey> it;
- synchronized (modifyKeysForWrite_)
- {
- it = new ArrayList<SelectionKey>(modifyKeysForWrite_).iterator();
- modifyKeysForWrite_.clear();
- }
-
- while (it.hasNext())
- {
- SelectionKey key = it.next();
- if (key.isValid() && (key.attachment() != null))
- {
- ((SelectionKeyHandler) key.attachment()).modifyKeyForWrite(key);
- }
- }
- }
/**
* Returns the SelectorManager applications should use.
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnection.java?rev=769541&r1=769540&r2=769541&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnection.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnection.java Tue Apr 28 21:15:38 2009
@@ -194,10 +194,7 @@
if (buffer.remaining() > 0)
{
pendingWrites_.add(buffer);
- if ((key_.interestOps() & SelectionKey.OP_WRITE) == 0)
- {
- SelectorManager.getSelectorManager().modifyKeyForWrite(key_);
- }
+ key_.interestOps(key_.interestOps() | SelectionKey.OP_WRITE);
}
}
}
@@ -254,10 +251,7 @@
*/
if ( bytesTransferred < limit && bytesWritten != total )
{
- if ((key_.interestOps() & SelectionKey.OP_WRITE) == 0)
- {
- SelectorManager.getSelectorManager().modifyKeyForWrite(key_);
- }
+ key_.interestOps(key_.interestOps() | SelectionKey.OP_WRITE);
waitToContinueStreaming();
}
}
@@ -273,10 +267,7 @@
if (buffer.remaining() > 0)
{
pendingWrites_.add(buffer);
- if ((key_.interestOps() & SelectionKey.OP_WRITE) == 0)
- {
- SelectorManager.getSelectorManager().modifyKeyForWrite(key_);
- }
+ key_.interestOps(key_.interestOps() | SelectionKey.OP_WRITE);
waitToContinueStreaming();
}
}
@@ -382,25 +373,32 @@
private void cancel(SelectionKey key)
{
if ( key != null )
- SelectorManager.getSelectorManager().cancel(key);
+ {
+ key.cancel();
+ try
+ {
+ key.channel().close();
+ }
+ catch (IOException e) {}
+ }
}
// called in the selector thread
public void connect(SelectionKey key)
{
- key.interestOps(key.interestOps() & (~SelectionKey.OP_CONNECT));
+ key.interestOps(key.interestOps() & (~SelectionKey.OP_CONNECT));
try
{
if (socketChannel_.finishConnect())
- {
- SelectorManager.getSelectorManager().modifyKeyForRead(key);
- connected_.set(true);
+ {
+ key.interestOps(key.interestOps() | SelectionKey.OP_READ);
+ connected_.set(true);
// this will flush the pending
if (!pendingWrites_.isEmpty())
- {
- SelectorManager.getSelectorManager().modifyKeyForWrite(key_);
- }
+ {
+ key_.interestOps(key_.interestOps() | SelectionKey.OP_WRITE);
+ }
resumeStreaming();
}
else
@@ -457,32 +455,22 @@
{
synchronized(this)
{
- if (!pendingWrites_.isEmpty() && (key_.interestOps() & SelectionKey.OP_WRITE) == 0)
+ if (!pendingWrites_.isEmpty())
{
- SelectorManager.getSelectorManager().modifyKeyForWrite(key_);
- }
+ key_.interestOps(key_.interestOps() | SelectionKey.OP_WRITE);
+ }
}
}
}
// called in the selector thread
public void read(SelectionKey key)
- {
- key.interestOps( key.interestOps() & ( ~SelectionKey.OP_READ ) );
+ {
+ key.interestOps( key.interestOps() & ( ~SelectionKey.OP_READ ) );
// publish this event onto to the TCPReadEvent Queue.
MessagingService.getReadExecutor().execute(readWork_);
}
- public void modifyKeyForRead(SelectionKey key)
- {
- key.interestOps( key_.interestOps() | SelectionKey.OP_READ );
- }
-
- public void modifyKeyForWrite(SelectionKey key)
- {
- key.interestOps( key_.interestOps() | SelectionKey.OP_WRITE );
- }
-
class ReadWorkItem implements Runnable
{
// called from the TCP READ thread pool
@@ -539,8 +527,8 @@
handleException(th);
}
finally
- {
- SelectorManager.getSelectorManager().modifyKeyForRead(key_);
+ {
+ key_.interestOps(key_.interestOps() | SelectionKey.OP_READ);
}
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpConnection.java?rev=769541&r1=769540&r2=769541&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpConnection.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpConnection.java Tue Apr 28 21:15:38 2009
@@ -165,12 +165,6 @@
/* Add a task to process the HTTP request */
MessagingService.getReadExecutor().execute(httpReader_);
}
-
- public void modifyKeyForRead(SelectionKey key)
- {
- key.interestOps( httpKey_.interestOps() | SelectionKey.OP_READ );
- }
-
private void resetParserState()
{
startLineParser_.resetParserState();
@@ -187,7 +181,14 @@
{
logger_.info("Closing HTTP socket ...");
if ( httpKey_ != null )
- SelectorManager.getSelectorManager().cancel(httpKey_);
+ {
+ httpKey_.cancel();
+ try
+ {
+ httpKey_.channel().close();
+ }
+ catch (IOException e) {}
+ }
}
/*
@@ -356,7 +357,7 @@
}
finally
{
- SelectorManager.getSelectorManager().modifyKeyForRead(httpKey_);
+ httpKey_.interestOps(httpKey_.interestOps() | SelectionKey.OP_READ);
}
}