You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/11/09 17:57:17 UTC

svn commit: r834143 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra/net: MessagingConfig.java MessagingService.java TcpConnection.java TcpConnectionManager.java

Author: jbellis
Date: Mon Nov  9 16:57:08 2009
New Revision: 834143

URL: http://svn.apache.org/viewvc?rev=834143&view=rev
Log:
clean out unused code from TcpConnectionManager; split connections to a node into "command" and "ack", which will allow us to use backpressure on the command socket.  patch by gdusbabek; reviewed by jbellis for CASSANDRA-488

Removed:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingConfig.java
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=834143&r1=834142&r2=834143&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Mon Nov  9 16:57:08 2009
@@ -91,6 +91,8 @@
     
     private static IMessagingService messagingService_ = new MessagingService();
 
+    private static final int MESSAGE_DESERIALIZE_THREADS = 4;
+
     public static int getVersion()
     {
         return version_;
@@ -137,7 +139,7 @@
          * which is the sum of the threads in the pool that adds shit into the table and the 
          * pool that retrives the callback from here.
         */ 
-        int maxSize = MessagingConfig.getMessagingThreadCount();
+        int maxSize = MESSAGE_DESERIALIZE_THREADS;
         callbackMap_ = new Cachetable<String, IAsyncCallback>( 2 * DatabaseDescriptor.getRpcTimeout() );
         taskCompletionMap_ = new Cachetable<String, IAsyncResult>( 2 * DatabaseDescriptor.getRpcTimeout() );        
         
@@ -185,7 +187,7 @@
     public void listen(InetAddress localEp) throws IOException
     {        
         ServerSocketChannel serverChannel = ServerSocketChannel.open();
-        ServerSocket ss = serverChannel.socket();            
+        ServerSocket ss = serverChannel.socket();
         ss.bind(new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort()));
         serverChannel.configureBlocking(false);
         
@@ -224,9 +226,7 @@
                 cp = poolTable_.get(key);
                 if (cp == null )
                 {
-                    cp = new TcpConnectionManager(MessagingConfig.getConnectionPoolInitialSize(), 
-                            MessagingConfig.getConnectionPoolGrowthFactor(), 
-                            MessagingConfig.getConnectionPoolMaxSize(), from, to);
+                    cp = new TcpConnectionManager(from, to);
                     poolTable_.put(key, cp);
                 }
             }
@@ -238,9 +238,9 @@
         return cp;
     }
 
-    public static TcpConnection getConnection(InetAddress from, InetAddress to) throws IOException
+    public static TcpConnection getConnection(InetAddress from, InetAddress to, Message msg) throws IOException
     {
-        return getConnectionPool(from, to).getConnection();
+        return getConnectionPool(from, to).getConnection(msg);
     }
     
     private void checkForReservedVerb(String type)
@@ -324,14 +324,14 @@
         Use this version for fire and forget style messaging.
     */
     public void sendOneWay(Message message, InetAddress to)
-    {        
-        // do local deliveries        
+    {
+        // do local deliveries
         if ( message.getFrom().equals(to) )
-        {            
+        {
             MessagingService.receive(message);
             return;
         }
-        
+
         TcpConnection connection = null;
         try
         {
@@ -340,7 +340,7 @@
             {
                 return;
             }
-            connection = MessagingService.getConnection(processedMessage.getFrom(), to);
+            connection = MessagingService.getConnection(processedMessage.getFrom(), to, message);
             connection.write(message);
         }
         catch (SocketException se)

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java?rev=834143&r1=834142&r2=834143&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java Mon Nov  9 16:57:08 2009
@@ -67,14 +67,12 @@
     private boolean bStream_ = false;
     private Lock lock_;
     private Condition condition_;
-    
-    // used from getConnection - outgoing
-    TcpConnection(TcpConnectionManager pool, InetAddress from, InetAddress to) throws IOException
-    {          
-        socketChannel_ = SocketChannel.open();            
-        socketChannel_.configureBlocking(false);        
-        pool_ = pool;
-        
+
+    private TcpConnection(InetAddress from, InetAddress to, TcpConnectionManager pool, boolean streaming) throws IOException
+    {
+        socketChannel_ = SocketChannel.open();
+        socketChannel_.configureBlocking(false);
+
         localEp_ = from;
         remoteEp_ = to;
 
@@ -86,6 +84,24 @@
         {
             key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
         }
+
+        if ((pool != null && streaming) || (pool == null && !streaming))
+            throw new RuntimeException("Invalid configuration. You must either specify a pool or streaming, not both or neither.");
+
+        if (pool != null)
+            pool_ = pool;
+        if (streaming)
+        {
+            bStream_ = true;
+            lock_ = new ReentrantLock();
+            condition_ = lock_.newCondition();
+        }
+    }
+    
+    // used from getConnection - outgoing
+    TcpConnection(TcpConnectionManager pool, InetAddress from, InetAddress to) throws IOException
+    {          
+        this(from, to, pool, false);
     }
     
     /*
@@ -93,23 +109,7 @@
     */
     TcpConnection(InetAddress from, InetAddress to) throws IOException
     {
-        socketChannel_ = SocketChannel.open();               
-        socketChannel_.configureBlocking(false);       
-        
-        localEp_ = from;
-        remoteEp_ = to;
-        
-        if (!socketChannel_.connect(new InetSocketAddress(remoteEp_, DatabaseDescriptor.getStoragePort())))
-        {
-            key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_CONNECT);
-        }
-        else
-        {
-            key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
-        }
-        bStream_ = true;
-        lock_ = new ReentrantLock();
-        condition_ = lock_.newCondition();
+        this(from, to, null, true);
     }
     
     /*
@@ -135,27 +135,13 @@
         isIncoming_ = isIncoming;
         localEp_ = localEp;
     }
-    
-    InetAddress getLocalEp()
-    {
-        return localEp_;
-    }
-    
-    public void setLocalEp(InetAddress localEp)
-    {
-        localEp_ = localEp;
-    }
+
 
     public InetAddress getEndPoint()
     {
         return remoteEp_;
     }
-    
-    public boolean isIncoming()
-    {
-        return isIncoming_;
-    }    
-    
+
     public SocketChannel getSocketChannel()
     {
         return socketChannel_;
@@ -274,8 +260,6 @@
     public void close()
     {
         inUse_ = false;
-        if ( pool_.contains(this) )
-            pool_.decUsed();               
     }
 
     public boolean isConnected()
@@ -305,10 +289,6 @@
     void closeSocket()
     {
         logger_.warn("Closing down connection " + socketChannel_ + " with " + pendingWrites_.size() + " writes remaining.");            
-        if ( pool_ != null )
-        {
-            pool_.removeConnection(this);
-        }
         cancel(key_);
         pendingWrites_.clear();
     }
@@ -318,11 +298,8 @@
         logger_.warn("Closing down connection " + socketChannel_);
         pendingWrites_.clear();
         cancel(key_);
-        pendingWrites_.clear();        
-        if ( pool_ != null )
-        {
-            pool_.removeConnection(this);            
-        }
+        pendingWrites_.clear();
+        pool_.destroy(this);
     }
     
     private void cancel(SelectionKey key)
@@ -456,9 +433,6 @@
                         if (remoteEp_ == null)
                         {
                             remoteEp_ = socketChannel_.socket().getInetAddress();
-                            // put connection into pool if possible
-                            pool_ = MessagingService.getConnectionPool(localEp_, remoteEp_);                            
-                            pool_.addToPool(TcpConnection.this);                            
                         }
                         
                         /* Deserialize and handle the message */
@@ -497,11 +471,6 @@
         }
     }
     
-    public int pending()
-    {
-        return pendingWrites_.size();
-    }
-    
     public int compareTo(Object o)
     {
         if (o instanceof TcpConnection) 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java?rev=834143&r1=834142&r2=834143&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java Mon Nov  9 16:57:08 2009
@@ -27,205 +27,61 @@
 
 class TcpConnectionManager
 {
-    private Lock lock_ = new ReentrantLock();
-    private List<TcpConnection> allConnections_;
     private InetAddress localEp_;
     private InetAddress remoteEp_;
-    private int maxSize_;
+    private TcpConnection cmdCon;
+    private TcpConnection ackCon;
 
-    private int inUse_;
-
-    // TODO! this whole thing is a giant no-op, since "contains" only relies on TcpConnection.equals, which
-    // is true for any (local, remote) pairs.  So there is only ever at most one TcpConnection per Manager!
-    TcpConnectionManager(int initialSize, int growthFactor, int maxSize, InetAddress localEp, InetAddress remoteEp)
+    TcpConnectionManager(InetAddress localEp, InetAddress remoteEp)
     {
-        maxSize_ = maxSize;
         localEp_ = localEp;
         remoteEp_ = remoteEp;
-        allConnections_ = new ArrayList<TcpConnection>();
     }
 
-    /**
-     * returns the least loaded connection to remoteEp, creating a new connection if necessary
-     */
-    TcpConnection getConnection() throws IOException
+    private TcpConnection newCon() throws IOException
     {
-        lock_.lock();
-        try
-        {
-            if (allConnections_.isEmpty())
-            {
-                TcpConnection conn = new TcpConnection(this, localEp_, remoteEp_);
-                addToPool(conn);
-                conn.inUse_ = true;
-                incUsed();
-                return conn;
-            }
-
-            TcpConnection least = getLeastLoaded();
-
-            if ((least != null && least.pending() == 0) || allConnections_.size() == maxSize_)
-            {
-                least.inUse_ = true;
-                incUsed();
-                return least;
-            }
-
-            TcpConnection connection = new TcpConnection(this, localEp_, remoteEp_);
-            if (!contains(connection))
-            {
-                addToPool(connection);
-                connection.inUse_ = true;
-                incUsed();
-                return connection;
-            }
-            else
-            {
-                connection.closeSocket();
-                return getLeastLoaded();
-            }
-        }
-        finally
-        {
-            lock_.unlock();
-        }
+        TcpConnection con = new TcpConnection(this, localEp_, remoteEp_);
+        con.inUse_ = true;
+        return con;
     }
 
-    protected TcpConnection getLeastLoaded()
-    {
-        TcpConnection connection = null;
-        lock_.lock();
-        try
-        {
-            Collections.sort(allConnections_);
-            connection = (allConnections_.size() > 0) ? allConnections_.get(0) : null;
-        }
-        finally
-        {
-            lock_.unlock();
-        }
-        return connection;
-    }
-
-    void removeConnection(TcpConnection connection)
-    {
-        lock_.lock();
-        try
-        {
-            allConnections_.remove(connection);
-        }
-        finally
-        {
-            lock_.unlock();
-        }
-    }
-
-    void incUsed()
-    {
-        inUse_++;
-    }
-
-    void decUsed()
-    {
-        inUse_--;
-    }
-
-    int getConnectionsInUse()
-    {
-        return inUse_;
-    }
-
-    void addToPool(TcpConnection connection)
-    {
-        lock_.lock();
-        try
-        {
-            if (contains(connection))
-                return;
-
-            if (allConnections_.size() < maxSize_)
-            {
-                allConnections_.add(connection);
-            }
-            else
-            {
-                connection.closeSocket();
-            }
-        }
-        finally
-        {
-            lock_.unlock();
-        }
-    }
-
-    void shutdown()
-    {
-        lock_.lock();
-        try
-        {
-            while (allConnections_.size() > 0)
-            {
-                TcpConnection connection = allConnections_.remove(0);
-                connection.closeSocket();
-            }
-        }
-        finally
-        {
-            lock_.unlock();
-        }
-    }
-
-    int getPoolSize()
+    /**
+     * returns the appropriate connection based on message type.
+     */
+    synchronized TcpConnection getConnection(Message msg) throws IOException
     {
-        lock_.lock();
-        try
+        if (MessagingService.responseStage_.equals(msg.getMessageType()))
         {
-            return allConnections_.size();
+            if (ackCon == null)
+                ackCon = newCon();
+            return ackCon;
         }
-        finally
+        else
         {
-            lock_.unlock();
+            if (cmdCon == null)
+                cmdCon = newCon();
+            return cmdCon;
         }
     }
 
-    InetAddress getLocalEndPoint()
-    {
-        return localEp_;
-    }
-
-    InetAddress getRemoteEndPoint()
+    synchronized void shutdown()
     {
-        return remoteEp_;
-    }
-
-    int getPendingWrites()
-    {
-        int total = 0;
-        lock_.lock();
-        try
-        {
-            for (TcpConnection connection : allConnections_)
-            {
-                total += connection.pending();
-            }
-        }
-        finally
-        {
-            lock_.unlock();
-        }
-        return total;
+        for (TcpConnection con : new TcpConnection[] { cmdCon, ackCon })
+            if (con != null)
+                con.closeSocket();
     }
 
-    boolean contains(TcpConnection connection)
+    synchronized void destroy(TcpConnection con)
     {
-        lock_.lock();
-        try
+        assert con != null;
+        if (cmdCon == con)
         {
-            return allConnections_.contains(connection);
+            cmdCon = null;
         }
-        finally
+        else
         {
-            lock_.unlock();
+            assert ackCon == con;
+            ackCon = null;
         }
     }
 }