You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/11/09 17:57:17 UTC
svn commit: r834143 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra/net:
MessagingConfig.java MessagingService.java TcpConnection.java
TcpConnectionManager.java
Author: jbellis
Date: Mon Nov 9 16:57:08 2009
New Revision: 834143
URL: http://svn.apache.org/viewvc?rev=834143&view=rev
Log:
clean out unused code from TcpConnectionManager; split connections to a node into "command" and "ack", which will allow us to use backpressure on the command socket. patch by gdusbabek; reviewed by jbellis for CASSANDRA-488
Removed:
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingConfig.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=834143&r1=834142&r2=834143&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Mon Nov 9 16:57:08 2009
@@ -91,6 +91,8 @@
private static IMessagingService messagingService_ = new MessagingService();
+ private static final int MESSAGE_DESERIALIZE_THREADS = 4;
+
public static int getVersion()
{
return version_;
@@ -137,7 +139,7 @@
* which is the sum of the threads in the pool that adds shit into the table and the
* pool that retrives the callback from here.
*/
- int maxSize = MessagingConfig.getMessagingThreadCount();
+ int maxSize = MESSAGE_DESERIALIZE_THREADS;
callbackMap_ = new Cachetable<String, IAsyncCallback>( 2 * DatabaseDescriptor.getRpcTimeout() );
taskCompletionMap_ = new Cachetable<String, IAsyncResult>( 2 * DatabaseDescriptor.getRpcTimeout() );
@@ -185,7 +187,7 @@
public void listen(InetAddress localEp) throws IOException
{
ServerSocketChannel serverChannel = ServerSocketChannel.open();
- ServerSocket ss = serverChannel.socket();
+ ServerSocket ss = serverChannel.socket();
ss.bind(new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort()));
serverChannel.configureBlocking(false);
@@ -224,9 +226,7 @@
cp = poolTable_.get(key);
if (cp == null )
{
- cp = new TcpConnectionManager(MessagingConfig.getConnectionPoolInitialSize(),
- MessagingConfig.getConnectionPoolGrowthFactor(),
- MessagingConfig.getConnectionPoolMaxSize(), from, to);
+ cp = new TcpConnectionManager(from, to);
poolTable_.put(key, cp);
}
}
@@ -238,9 +238,9 @@
return cp;
}
- public static TcpConnection getConnection(InetAddress from, InetAddress to) throws IOException
+ public static TcpConnection getConnection(InetAddress from, InetAddress to, Message msg) throws IOException
{
- return getConnectionPool(from, to).getConnection();
+ return getConnectionPool(from, to).getConnection(msg);
}
private void checkForReservedVerb(String type)
@@ -324,14 +324,14 @@
Use this version for fire and forget style messaging.
*/
public void sendOneWay(Message message, InetAddress to)
- {
- // do local deliveries
+ {
+ // do local deliveries
if ( message.getFrom().equals(to) )
- {
+ {
MessagingService.receive(message);
return;
}
-
+
TcpConnection connection = null;
try
{
@@ -340,7 +340,7 @@
{
return;
}
- connection = MessagingService.getConnection(processedMessage.getFrom(), to);
+ connection = MessagingService.getConnection(processedMessage.getFrom(), to, message);
connection.write(message);
}
catch (SocketException se)
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java?rev=834143&r1=834142&r2=834143&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java Mon Nov 9 16:57:08 2009
@@ -67,14 +67,12 @@
private boolean bStream_ = false;
private Lock lock_;
private Condition condition_;
-
- // used from getConnection - outgoing
- TcpConnection(TcpConnectionManager pool, InetAddress from, InetAddress to) throws IOException
- {
- socketChannel_ = SocketChannel.open();
- socketChannel_.configureBlocking(false);
- pool_ = pool;
-
+
+ private TcpConnection(InetAddress from, InetAddress to, TcpConnectionManager pool, boolean streaming) throws IOException
+ {
+ socketChannel_ = SocketChannel.open();
+ socketChannel_.configureBlocking(false);
+
localEp_ = from;
remoteEp_ = to;
@@ -86,6 +84,24 @@
{
key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
}
+
+ if ((pool != null && streaming) || (pool == null && !streaming))
+ throw new RuntimeException("Invalid configuration. You must either specify a pool or streaming, not both or neither.");
+
+ if (pool != null)
+ pool_ = pool;
+ if (streaming)
+ {
+ bStream_ = true;
+ lock_ = new ReentrantLock();
+ condition_ = lock_.newCondition();
+ }
+ }
+
+ // used from getConnection - outgoing
+ TcpConnection(TcpConnectionManager pool, InetAddress from, InetAddress to) throws IOException
+ {
+ this(from, to, pool, false);
}
/*
@@ -93,23 +109,7 @@
*/
TcpConnection(InetAddress from, InetAddress to) throws IOException
{
- socketChannel_ = SocketChannel.open();
- socketChannel_.configureBlocking(false);
-
- localEp_ = from;
- remoteEp_ = to;
-
- if (!socketChannel_.connect(new InetSocketAddress(remoteEp_, DatabaseDescriptor.getStoragePort())))
- {
- key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_CONNECT);
- }
- else
- {
- key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
- }
- bStream_ = true;
- lock_ = new ReentrantLock();
- condition_ = lock_.newCondition();
+ this(from, to, null, true);
}
/*
@@ -135,27 +135,13 @@
isIncoming_ = isIncoming;
localEp_ = localEp;
}
-
- InetAddress getLocalEp()
- {
- return localEp_;
- }
-
- public void setLocalEp(InetAddress localEp)
- {
- localEp_ = localEp;
- }
+
public InetAddress getEndPoint()
{
return remoteEp_;
}
-
- public boolean isIncoming()
- {
- return isIncoming_;
- }
-
+
public SocketChannel getSocketChannel()
{
return socketChannel_;
@@ -274,8 +260,6 @@
public void close()
{
inUse_ = false;
- if ( pool_.contains(this) )
- pool_.decUsed();
}
public boolean isConnected()
@@ -305,10 +289,6 @@
void closeSocket()
{
logger_.warn("Closing down connection " + socketChannel_ + " with " + pendingWrites_.size() + " writes remaining.");
- if ( pool_ != null )
- {
- pool_.removeConnection(this);
- }
cancel(key_);
pendingWrites_.clear();
}
@@ -318,11 +298,8 @@
logger_.warn("Closing down connection " + socketChannel_);
pendingWrites_.clear();
cancel(key_);
- pendingWrites_.clear();
- if ( pool_ != null )
- {
- pool_.removeConnection(this);
- }
+ pendingWrites_.clear();
+ pool_.destroy(this);
}
private void cancel(SelectionKey key)
@@ -456,9 +433,6 @@
if (remoteEp_ == null)
{
remoteEp_ = socketChannel_.socket().getInetAddress();
- // put connection into pool if possible
- pool_ = MessagingService.getConnectionPool(localEp_, remoteEp_);
- pool_.addToPool(TcpConnection.this);
}
/* Deserialize and handle the message */
@@ -497,11 +471,6 @@
}
}
- public int pending()
- {
- return pendingWrites_.size();
- }
-
public int compareTo(Object o)
{
if (o instanceof TcpConnection)
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java?rev=834143&r1=834142&r2=834143&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java Mon Nov 9 16:57:08 2009
@@ -27,205 +27,61 @@
class TcpConnectionManager
{
- private Lock lock_ = new ReentrantLock();
- private List<TcpConnection> allConnections_;
private InetAddress localEp_;
private InetAddress remoteEp_;
- private int maxSize_;
+ private TcpConnection cmdCon;
+ private TcpConnection ackCon;
- private int inUse_;
-
- // TODO! this whole thing is a giant no-op, since "contains" only relies on TcpConnection.equals, which
- // is true for any (local, remote) pairs. So there is only ever at most one TcpConnection per Manager!
- TcpConnectionManager(int initialSize, int growthFactor, int maxSize, InetAddress localEp, InetAddress remoteEp)
+ TcpConnectionManager(InetAddress localEp, InetAddress remoteEp)
{
- maxSize_ = maxSize;
localEp_ = localEp;
remoteEp_ = remoteEp;
- allConnections_ = new ArrayList<TcpConnection>();
}
- /**
- * returns the least loaded connection to remoteEp, creating a new connection if necessary
- */
- TcpConnection getConnection() throws IOException
+ private TcpConnection newCon() throws IOException
{
- lock_.lock();
- try
- {
- if (allConnections_.isEmpty())
- {
- TcpConnection conn = new TcpConnection(this, localEp_, remoteEp_);
- addToPool(conn);
- conn.inUse_ = true;
- incUsed();
- return conn;
- }
-
- TcpConnection least = getLeastLoaded();
-
- if ((least != null && least.pending() == 0) || allConnections_.size() == maxSize_)
- {
- least.inUse_ = true;
- incUsed();
- return least;
- }
-
- TcpConnection connection = new TcpConnection(this, localEp_, remoteEp_);
- if (!contains(connection))
- {
- addToPool(connection);
- connection.inUse_ = true;
- incUsed();
- return connection;
- }
- else
- {
- connection.closeSocket();
- return getLeastLoaded();
- }
- }
- finally
- {
- lock_.unlock();
- }
+ TcpConnection con = new TcpConnection(this, localEp_, remoteEp_);
+ con.inUse_ = true;
+ return con;
}
- protected TcpConnection getLeastLoaded()
- {
- TcpConnection connection = null;
- lock_.lock();
- try
- {
- Collections.sort(allConnections_);
- connection = (allConnections_.size() > 0) ? allConnections_.get(0) : null;
- }
- finally
- {
- lock_.unlock();
- }
- return connection;
- }
-
- void removeConnection(TcpConnection connection)
- {
- lock_.lock();
- try
- {
- allConnections_.remove(connection);
- }
- finally
- {
- lock_.unlock();
- }
- }
-
- void incUsed()
- {
- inUse_++;
- }
-
- void decUsed()
- {
- inUse_--;
- }
-
- int getConnectionsInUse()
- {
- return inUse_;
- }
-
- void addToPool(TcpConnection connection)
- {
- lock_.lock();
- try
- {
- if (contains(connection))
- return;
-
- if (allConnections_.size() < maxSize_)
- {
- allConnections_.add(connection);
- }
- else
- {
- connection.closeSocket();
- }
- }
- finally
- {
- lock_.unlock();
- }
- }
-
- void shutdown()
- {
- lock_.lock();
- try
- {
- while (allConnections_.size() > 0)
- {
- TcpConnection connection = allConnections_.remove(0);
- connection.closeSocket();
- }
- }
- finally
- {
- lock_.unlock();
- }
- }
-
- int getPoolSize()
+ /**
+ * returns the appropriate connection based on message type.
+ */
+ synchronized TcpConnection getConnection(Message msg) throws IOException
{
- lock_.lock();
- try
+ if (MessagingService.responseStage_.equals(msg.getMessageType()))
{
- return allConnections_.size();
+ if (ackCon == null)
+ ackCon = newCon();
+ return ackCon;
}
- finally
+ else
{
- lock_.unlock();
+ if (cmdCon == null)
+ cmdCon = newCon();
+ return cmdCon;
}
}
- InetAddress getLocalEndPoint()
- {
- return localEp_;
- }
-
- InetAddress getRemoteEndPoint()
+ synchronized void shutdown()
{
- return remoteEp_;
- }
-
- int getPendingWrites()
- {
- int total = 0;
- lock_.lock();
- try
- {
- for (TcpConnection connection : allConnections_)
- {
- total += connection.pending();
- }
- }
- finally
- {
- lock_.unlock();
- }
- return total;
+ for (TcpConnection con : new TcpConnection[] { cmdCon, ackCon })
+ if (con != null)
+ con.closeSocket();
}
- boolean contains(TcpConnection connection)
+ synchronized void destroy(TcpConnection con)
{
- lock_.lock();
- try
+ assert con != null;
+ if (cmdCon == con)
{
- return allConnections_.contains(connection);
+ cmdCon = null;
}
- finally
+ else
{
- lock_.unlock();
+ assert ackCon == con;
+ ackCon = null;
}
}
}