You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/22 23:43:04 UTC
svn commit: r902300 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra/net:
MessagingService.java OutboundTcpConnection.java
OutboundTcpConnectionPool.java
Author: jbellis
Date: Fri Jan 22 22:43:03 2010
New Revision: 902300
URL: http://svn.apache.org/viewvc?rev=902300&view=rev
Log:
move connecting into OutboundTcpConnection so write does not block for connection startup
patch by jbellis; reviewed by gdusbabek for CASSANDRA-728
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=902300&r1=902299&r2=902300&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Fri Jan 22 22:43:03 2010
@@ -293,10 +293,7 @@
}
// get pooled connection (really, connection queue)
- OutboundTcpConnection connection = null;
- connection = getConnection(to, message);
- if (connection == null)
- return;
+ OutboundTcpConnection connection = getConnection(to, message);
// pack message with header in a bytebuffer
byte[] data;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=902300&r1=902299&r2=902300&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java Fri Jan 22 22:43:03 2010
@@ -1,11 +1,9 @@
package org.apache.cassandra.net;
import java.io.DataOutputStream;
-import java.io.IOError;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
-import java.net.SocketException;
import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -13,58 +11,25 @@
import org.apache.log4j.Logger;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.utils.FBUtilities;
-public class OutboundTcpConnection
+public class OutboundTcpConnection extends Thread
{
- private static Logger logger = Logger.getLogger(OutboundTcpConnection.class);
+ private static final Logger logger = Logger.getLogger(OutboundTcpConnection.class);
- public BlockingQueue<ByteBuffer> queue = new LinkedBlockingQueue<ByteBuffer>();
- public DataOutputStream output;
- public Socket socket;
+ private static final ByteBuffer CLOSE_SENTINEL = ByteBuffer.allocate(0);
+ private static final int OPEN_RETRY_DELAY = 100; // ms between retries
+
+ private final OutboundTcpConnectionPool pool;
+ private final InetAddress endpoint;
+ private final BlockingQueue<ByteBuffer> queue = new LinkedBlockingQueue<ByteBuffer>();
+ private DataOutputStream output;
+ private Socket socket;
public OutboundTcpConnection(final OutboundTcpConnectionPool pool, final InetAddress remoteEp)
- throws IOException
{
- if (logger.isDebugEnabled())
- logger.debug("attempting to connect to " + remoteEp);
-
- socket = new Socket(remoteEp, DatabaseDescriptor.getStoragePort());
- socket.setTcpNoDelay(true);
- output = new DataOutputStream(socket.getOutputStream());
-
- new Thread(new Runnable()
- {
- public void run()
- {
- while (socket != null)
- {
- ByteBuffer bb;
- try
- {
- bb = queue.take();
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
- try
- {
- output.write(bb.array(), 0, bb.limit());
- if (queue.peek() == null)
- {
- output.flush();
- }
- }
- catch (IOException e)
- {
- logger.info("error writing to " + remoteEp);
- pool.reset();
- break;
- }
- }
- }
- }, "WRITE-" + remoteEp).start();
+ super("WRITE-" + remoteEp);
+ this.pool = pool;
+ this.endpoint = remoteEp;
}
public void write(ByteBuffer buffer)
@@ -81,15 +46,103 @@
public void closeSocket()
{
+ queue.clear();
+ write(CLOSE_SENTINEL);
+ }
+
+ public void run()
+ {
+ while (true)
+ {
+ ByteBuffer bb = take();
+ if (bb == CLOSE_SENTINEL)
+ {
+ disconnect();
+ continue;
+ }
+ if (socket != null || connect())
+ writeConnected(bb);
+ }
+ }
+
+ private void writeConnected(ByteBuffer bb)
+ {
try
{
- socket.close();
+ output.write(bb.array(), 0, bb.limit());
+ if (queue.peek() == null)
+ {
+ output.flush();
+ }
}
catch (IOException e)
{
- if (logger.isDebugEnabled())
- logger.debug("error closing socket", e);
+ logger.info("error writing to " + endpoint);
+ disconnect();
+ }
+ }
+
+ private void disconnect()
+ {
+ if (socket != null)
+ {
+ try
+ {
+ socket.close();
+ }
+ catch (IOException e)
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("exception closing connection to " + endpoint, e);
+ }
+ output = null;
+ socket = null;
+ }
+ }
+
+ private ByteBuffer take()
+ {
+ ByteBuffer bb;
+ try
+ {
+ bb = queue.take();
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ return bb;
+ }
+
+ private boolean connect()
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("attempting to connect to " + endpoint);
+ long start = System.currentTimeMillis();
+ while (System.currentTimeMillis() < start + DatabaseDescriptor.getRpcTimeout())
+ {
+ try
+ {
+ socket = new Socket(endpoint, DatabaseDescriptor.getStoragePort());
+ socket.setTcpNoDelay(true);
+ output = new DataOutputStream(socket.getOutputStream());
+ return true;
+ }
+ catch (IOException e)
+ {
+ socket = null;
+ if (logger.isTraceEnabled())
+ logger.trace("unable to connect to " + endpoint, e);
+ try
+ {
+ Thread.sleep(OPEN_RETRY_DELAY);
+ }
+ catch (InterruptedException e1)
+ {
+ throw new AssertionError(e1);
+ }
+ }
}
- socket = null;
+ return false;
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java?rev=902300&r1=902299&r2=902300&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java Fri Jan 22 22:43:03 2010
@@ -27,14 +27,9 @@
class OutboundTcpConnectionPool
{
- private static Logger logger = Logger.getLogger(OutboundTcpConnectionPool.class);
-
- private final int OPEN_RETRY_DELAY = 100; // ms between retries
-
private InetAddress remoteEp_;
private OutboundTcpConnection cmdCon;
private OutboundTcpConnection ackCon;
- private long lastFailedAttempt = Long.MIN_VALUE;
OutboundTcpConnectionPool(InetAddress remoteEp)
{
@@ -52,18 +47,8 @@
{
if (ackCon == null)
{
- if (System.currentTimeMillis() < lastFailedAttempt + OPEN_RETRY_DELAY)
- return null;
- try
- {
- ackCon = new OutboundTcpConnection(this, remoteEp_);
- }
- catch (IOException e)
- {
- lastFailedAttempt = System.currentTimeMillis();
- if (logger.isDebugEnabled())
- logger.debug("unable to connect to " + remoteEp_, e);
- }
+ ackCon = new OutboundTcpConnection(this, remoteEp_);
+ ackCon.start();
}
return ackCon;
}
@@ -71,18 +56,8 @@
{
if (cmdCon == null)
{
- if (System.currentTimeMillis() < lastFailedAttempt + OPEN_RETRY_DELAY)
- return null;
- try
- {
- cmdCon = new OutboundTcpConnection(this, remoteEp_);
- }
- catch (IOException e)
- {
- lastFailedAttempt = System.currentTimeMillis();
- if (logger.isDebugEnabled())
- logger.debug("unable to connect to " + remoteEp_, e);
- }
+ cmdCon = new OutboundTcpConnection(this, remoteEp_);
+ cmdCon.start();
}
return cmdCon;
}
@@ -93,7 +68,5 @@
for (OutboundTcpConnection con : new OutboundTcpConnection[] { cmdCon, ackCon })
if (con != null)
con.closeSocket();
- cmdCon = null;
- ackCon = null;
}
}