You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/22 23:43:04 UTC

svn commit: r902300 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra/net: MessagingService.java OutboundTcpConnection.java OutboundTcpConnectionPool.java

Author: jbellis
Date: Fri Jan 22 22:43:03 2010
New Revision: 902300

URL: http://svn.apache.org/viewvc?rev=902300&view=rev
Log:
move connecting into OutboundTcpConnection so write does not block for connection startup
patch by jbellis; reviewed by gdusbabek for CASSANDRA-728

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=902300&r1=902299&r2=902300&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Fri Jan 22 22:43:03 2010
@@ -293,10 +293,7 @@
         }
 
         // get pooled connection (really, connection queue)
-        OutboundTcpConnection connection = null;
-        connection = getConnection(to, message);
-        if (connection == null)
-            return;
+        OutboundTcpConnection connection = getConnection(to, message);
 
         // pack message with header in a bytebuffer
         byte[] data;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=902300&r1=902299&r2=902300&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java Fri Jan 22 22:43:03 2010
@@ -1,11 +1,9 @@
 package org.apache.cassandra.net;
 
 import java.io.DataOutputStream;
-import java.io.IOError;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.Socket;
-import java.net.SocketException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -13,58 +11,25 @@
 import org.apache.log4j.Logger;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.utils.FBUtilities;
 
-public class OutboundTcpConnection
+public class OutboundTcpConnection extends Thread
 {
-    private static Logger logger = Logger.getLogger(OutboundTcpConnection.class);
+    private static final Logger logger = Logger.getLogger(OutboundTcpConnection.class);
 
-    public BlockingQueue<ByteBuffer> queue = new LinkedBlockingQueue<ByteBuffer>();
-    public DataOutputStream output;
-    public Socket socket;
+    private static final ByteBuffer CLOSE_SENTINEL = ByteBuffer.allocate(0);
+    private static final int OPEN_RETRY_DELAY = 100; // ms between retries
+
+    private final OutboundTcpConnectionPool pool;
+    private final InetAddress endpoint;
+    private final BlockingQueue<ByteBuffer> queue = new LinkedBlockingQueue<ByteBuffer>();
+    private DataOutputStream output;
+    private Socket socket;
 
     public OutboundTcpConnection(final OutboundTcpConnectionPool pool, final InetAddress remoteEp)
-    throws IOException
     {
-        if (logger.isDebugEnabled())
-            logger.debug("attempting to connect to " + remoteEp);
-
-        socket = new Socket(remoteEp, DatabaseDescriptor.getStoragePort());
-        socket.setTcpNoDelay(true);
-        output = new DataOutputStream(socket.getOutputStream());
-
-        new Thread(new Runnable()
-        {
-            public void run()
-            {
-                while (socket != null)
-                {
-                    ByteBuffer bb;
-                    try
-                    {
-                        bb = queue.take();
-                    }
-                    catch (InterruptedException e)
-                    {
-                        throw new AssertionError(e);
-                    }
-                    try
-                    {
-                        output.write(bb.array(), 0, bb.limit());
-                        if (queue.peek() == null)
-                        {
-                            output.flush();
-                        }
-                    }
-                    catch (IOException e)
-                    {
-                        logger.info("error writing to " + remoteEp);
-                        pool.reset();
-                        break;
-                    }
-                }
-            }
-        }, "WRITE-" + remoteEp).start();
+        super("WRITE-" + remoteEp);
+        this.pool = pool;
+        this.endpoint = remoteEp;
     }
 
     public void write(ByteBuffer buffer)
@@ -81,15 +46,103 @@
 
     public void closeSocket()
     {
+        queue.clear();
+        write(CLOSE_SENTINEL);
+    }
+
+    public void run()
+    {
+        while (true)
+        {
+            ByteBuffer bb = take();
+            if (bb == CLOSE_SENTINEL)
+            {
+                disconnect();
+                continue;
+            }
+            if (socket != null || connect())
+                writeConnected(bb);
+        }
+    }
+
+    private void writeConnected(ByteBuffer bb)
+    {
         try
         {
-            socket.close();
+            output.write(bb.array(), 0, bb.limit());
+            if (queue.peek() == null)
+            {
+                output.flush();
+            }
         }
         catch (IOException e)
         {
-            if (logger.isDebugEnabled())
-                logger.debug("error closing socket", e);
+            logger.info("error writing to " + endpoint);
+            disconnect();
+        }
+    }
+
+    private void disconnect()
+    {
+        if (socket != null)
+        {
+            try
+            {
+                socket.close();
+            }
+            catch (IOException e)
+            {
+                if (logger.isDebugEnabled())
+                    logger.debug("exception closing connection to " + endpoint, e);
+            }
+            output = null;
+            socket = null;
+        }
+    }
+
+    private ByteBuffer take()
+    {
+        ByteBuffer bb;
+        try
+        {
+            bb = queue.take();
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
+        return bb;
+    }
+
+    private boolean connect()
+    {
+        if (logger.isDebugEnabled())
+            logger.debug("attempting to connect to " + endpoint);
+        long start = System.currentTimeMillis();
+        while (System.currentTimeMillis() < start + DatabaseDescriptor.getRpcTimeout())
+        {
+            try
+            {
+                socket = new Socket(endpoint, DatabaseDescriptor.getStoragePort());
+                socket.setTcpNoDelay(true);
+                output = new DataOutputStream(socket.getOutputStream());
+                return true;
+            }
+            catch (IOException e)
+            {
+                socket = null;
+                if (logger.isTraceEnabled())
+                    logger.trace("unable to connect to " + endpoint, e);
+                try
+                {
+                    Thread.sleep(OPEN_RETRY_DELAY);
+                }
+                catch (InterruptedException e1)
+                {
+                    throw new AssertionError(e1);
+                }
+            }
         }
-        socket = null;
+        return false;
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java?rev=902300&r1=902299&r2=902300&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java Fri Jan 22 22:43:03 2010
@@ -27,14 +27,9 @@
 
 class OutboundTcpConnectionPool
 {
-    private static Logger logger = Logger.getLogger(OutboundTcpConnectionPool.class);
-
-    private final int OPEN_RETRY_DELAY = 100; // ms between retries
-
     private InetAddress remoteEp_;
     private OutboundTcpConnection cmdCon;
     private OutboundTcpConnection ackCon;
-    private long lastFailedAttempt = Long.MIN_VALUE;
 
     OutboundTcpConnectionPool(InetAddress remoteEp)
     {
@@ -52,18 +47,8 @@
         {
             if (ackCon == null)
             {
-                if (System.currentTimeMillis() < lastFailedAttempt + OPEN_RETRY_DELAY)
-                    return null;
-                try
-                {
-                    ackCon = new OutboundTcpConnection(this, remoteEp_);
-                }
-                catch (IOException e)
-                {
-                    lastFailedAttempt = System.currentTimeMillis();
-                    if (logger.isDebugEnabled())
-                        logger.debug("unable to connect to " + remoteEp_, e);
-                }
+                ackCon = new OutboundTcpConnection(this, remoteEp_);
+                ackCon.start();
             }
             return ackCon;
         }
@@ -71,18 +56,8 @@
         {
             if (cmdCon == null)
             {
-                if (System.currentTimeMillis() < lastFailedAttempt + OPEN_RETRY_DELAY)
-                    return null;
-                try
-                {
-                    cmdCon = new OutboundTcpConnection(this, remoteEp_);
-                }
-                catch (IOException e)
-                {
-                    lastFailedAttempt = System.currentTimeMillis();
-                    if (logger.isDebugEnabled())
-                        logger.debug("unable to connect to " + remoteEp_, e);
-                }
+                cmdCon = new OutboundTcpConnection(this, remoteEp_);
+                cmdCon.start();
             }
             return cmdCon;
         }
@@ -93,7 +68,5 @@
         for (OutboundTcpConnection con : new OutboundTcpConnection[] { cmdCon, ackCon })
             if (con != null)
                 con.closeSocket();
-        cmdCon = null;
-        ackCon = null;
     }
 }