You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/04/28 23:15:44 UTC
svn commit: r769542 - in
/incubator/cassandra/trunk/src/org/apache/cassandra/net:
SelectorManager.java TcpConnection.java
Author: jbellis
Date: Tue Apr 28 21:15:44 2009
New Revision: 769542
URL: http://svn.apache.org/viewvc?rev=769542&view=rev
Log:
renaming and final cleanup. patch by jbellis; reviewed by Eric Evans for #97
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectorManager.java
incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnection.java
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectorManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectorManager.java?rev=769542&r1=769541&r2=769542&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectorManager.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectorManager.java Tue Apr 28 21:15:44 2009
@@ -25,22 +25,18 @@
import org.apache.log4j.Logger;
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
public class SelectorManager extends Thread
{
- private static final Logger logger_ = Logger.getLogger(SelectorManager.class);
+ private static final Logger logger = Logger.getLogger(SelectorManager.class);
// the underlying selector used
- protected Selector selector_;
+ protected Selector selector;
// The static selector manager which is used by all applications
- private static SelectorManager manager_;
+ private static SelectorManager manager;
// The static UDP selector manager which is used by all applications
- private static SelectorManager udpManager_;
+ private static SelectorManager udpManager;
private SelectorManager(String name)
{
@@ -48,7 +44,7 @@
try
{
- selector_ = Selector.open();
+ selector = Selector.open();
}
catch (IOException e)
{
@@ -71,7 +67,7 @@
* @param ops
* The initial interest operations
* @return The SelectionKey which uniquely identifies this channel
- * @exception IOException
+ * @exception IOException if the channel is closed
*/
public SelectionKey register(SelectableChannel channel,
SelectionKeyHandler handler, int ops) throws IOException
@@ -81,9 +77,7 @@
throw new NullPointerException();
}
- SelectionKey key = channel.register(selector_, ops, handler);
- selector_.wakeup();
- return key;
+ return channel.register(selector, ops, handler);
}
/**
@@ -96,7 +90,7 @@
{
try
{
- selector_.select(1000);
+ selector.select(100);
doProcess();
}
catch (IOException e)
@@ -108,48 +102,42 @@
protected void doProcess() throws IOException
{
- SelectionKey[] keys = selector_.selectedKeys().toArray(new SelectionKey[0]);
+ SelectionKey[] keys = selector.selectedKeys().toArray(new SelectionKey[0]);
- for (int i = 0; i < keys.length; i++)
+ for (SelectionKey key : keys)
{
- selector_.selectedKeys().remove(keys[i]);
+ selector.selectedKeys().remove(key);
- synchronized (keys[i])
+ synchronized (key)
{
- SelectionKeyHandler skh = (SelectionKeyHandler) keys[i]
- .attachment();
+ SelectionKeyHandler skh = (SelectionKeyHandler) key.attachment();
if (skh != null)
{
// accept
- if (keys[i].isValid() && keys[i].isAcceptable())
+ if (key.isValid() && key.isAcceptable())
{
- skh.accept(keys[i]);
+ skh.accept(key);
}
// connect
- if (keys[i].isValid() && keys[i].isConnectable())
+ if (key.isValid() && key.isConnectable())
{
- skh.connect(keys[i]);
+ skh.connect(key);
}
// read
- if (keys[i].isValid() && keys[i].isReadable())
+ if (key.isValid() && key.isReadable())
{
- skh.read(keys[i]);
+ skh.read(key);
}
// write
- if (keys[i].isValid() && keys[i].isWritable())
+ if (key.isValid() && key.isWritable())
{
- skh.write(keys[i]);
+ skh.write(key);
}
}
- else
- {
- keys[i].channel().close();
- keys[i].cancel();
- }
}
}
}
@@ -163,23 +151,23 @@
{
synchronized (SelectorManager.class)
{
- if (manager_ == null)
+ if (manager == null)
{
- manager_ = new SelectorManager("TCP Selector Manager");
+ manager = new SelectorManager("TCP Selector Manager");
}
}
- return manager_;
+ return manager;
}
public static SelectorManager getUdpSelectorManager()
{
synchronized (SelectorManager.class)
{
- if (udpManager_ == null)
+ if (udpManager == null)
{
- udpManager_ = new SelectorManager("UDP Selector Manager");
+ udpManager = new SelectorManager("UDP Selector Manager");
}
}
- return udpManager_;
+ return udpManager;
}
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnection.java?rev=769542&r1=769541&r2=769542&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnection.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnection.java Tue Apr 28 21:15:44 2009
@@ -61,7 +61,6 @@
private TcpReader tcpReader_;
private ReadWorkItem readWork_ = new ReadWorkItem();
private List<ByteBuffer> pendingWrites_ = new Vector<ByteBuffer>();
- private AtomicBoolean connected_ = new AtomicBoolean(false);
private EndPoint localEp_;
private EndPoint remoteEp_;
boolean inUse_ = false;
@@ -94,8 +93,7 @@
else
{
key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
- connected_.set(true);
- }
+ }
}
/*
@@ -116,8 +114,7 @@
else
{
key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
- connected_.set(true);
- }
+ }
bStream_ = true;
lock_ = new ReentrantLock();
condition_ = lock_.newCondition();
@@ -144,8 +141,7 @@
socketChannel_ = socketChannel;
socketChannel_.configureBlocking(false);
isIncoming_ = isIncoming;
- connected_.set(true);
- localEp_ = localEp;
+ localEp_ = localEp;
}
EndPoint getLocalEp()
@@ -182,7 +178,7 @@
ByteBuffer buffer = MessagingService.packIt( data , false, false, listening);
synchronized(this)
{
- if (!pendingWrites_.isEmpty() || !connected_.get())
+ if (!pendingWrites_.isEmpty() || !socketChannel_.isConnected())
{
pendingWrites_.add(buffer);
return;
@@ -223,7 +219,7 @@
*/
long waitTime = 2;
int retry = 0;
- while ( !connected_.get() )
+ while (!socketChannel_.isConnected())
{
if ( retry == 3 )
throw new IOException("Unable to connect to " + remoteEp_ + " after " + retry + " attempts.");
@@ -392,7 +388,6 @@
if (socketChannel_.finishConnect())
{
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
- connected_.set(true);
// this will flush the pending
if (!pendingWrites_.isEmpty())