You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/21 00:37:17 UTC
svn commit: r901435 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra/net:
FileStreamTask.java MessagingService.java OutboundTcpConnection.java
OutboundTcpConnectionPool.java TcpConnection.java TcpConnectionManager.java
Author: jbellis
Date: Wed Jan 20 23:37:17 2010
New Revision: 901435
URL: http://svn.apache.org/viewvc?rev=901435&view=rev
Log:
replace tcp writes w/ blocking i/o
patch by jbellis; reviewed by Brandon Williams for CASSANDRA-705
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java (with props)
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java (contents, props changed)
- copied, changed from r901433, incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java
Removed:
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java?rev=901435&r1=901434&r2=901435&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java Wed Jan 20 23:37:17 2010
@@ -44,6 +44,8 @@
public void run()
{
+ /*
+ TODO
TcpConnection connection = null;
try
{
@@ -61,6 +63,6 @@
}
throw new RuntimeException(e);
}
+ */
}
-
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=901435&r1=901434&r2=901435&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Wed Jan 20 23:37:17 2010
@@ -68,7 +68,7 @@
/* Thread pool to handle messaging write activities */
private static ExecutorService streamExecutor_;
- private static NonBlockingHashMap<String, TcpConnectionManager> connectionManagers_ = new NonBlockingHashMap<String, TcpConnectionManager>();
+ private static NonBlockingHashMap<String, OutboundTcpConnectionPool> connectionManagers_ = new NonBlockingHashMap<String, OutboundTcpConnectionPool>();
private static Logger logger_ = Logger.getLogger(MessagingService.class);
@@ -186,19 +186,19 @@
}
}
- public static TcpConnectionManager getConnectionPool(InetAddress from, InetAddress to)
+ public static OutboundTcpConnectionPool getConnectionPool(InetAddress from, InetAddress to)
{
String key = from + ":" + to;
- TcpConnectionManager cp = connectionManagers_.get(key);
+ OutboundTcpConnectionPool cp = connectionManagers_.get(key);
if (cp == null)
{
- connectionManagers_.putIfAbsent(key, new TcpConnectionManager(from, to));
+ connectionManagers_.putIfAbsent(key, new OutboundTcpConnectionPool(from, to));
cp = connectionManagers_.get(key);
}
return cp;
}
- public static TcpConnection getConnection(InetAddress from, InetAddress to, Message msg) throws IOException
+ public static OutboundTcpConnection getConnection(InetAddress from, InetAddress to, Message msg)
{
return getConnectionPool(from, to).getConnection(msg);
}
@@ -331,20 +331,9 @@
assert data.length > 0;
ByteBuffer buffer = packIt(data , false, false);
- TcpConnection connection = null;
- try
- {
- connection = MessagingService.getConnection(processedMessage.getFrom(), to, message);
- connection.write(buffer);
- }
- catch (IOException e)
- {
- if (connection != null)
- {
- connection.errorClose();
- }
- logger_.error("unexpected error writing " + message, e);
- }
+ OutboundTcpConnection connection = null;
+ connection = getConnection(processedMessage.getFrom(), to, message);
+ connection.write(buffer);
}
public IAsyncResult sendRR(Message message, InetAddress to)
Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=901435&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java Wed Jan 20 23:37:17 2010
@@ -0,0 +1,97 @@
+package org.apache.cassandra.net;
+
+import java.io.DataOutputStream;
+import java.io.IOError;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class OutboundTcpConnection
+{
+ private static Logger logger = Logger.getLogger(OutboundTcpConnection.class);
+
+ public BlockingQueue<ByteBuffer> queue = new LinkedBlockingQueue<ByteBuffer>();
+ public DataOutputStream output;
+ public Socket socket;
+
+ // TODO localEp is ignored, get rid of it
+ public OutboundTcpConnection(final OutboundTcpConnectionPool pool, InetAddress localEp, final InetAddress remoteEp)
+ {
+ try
+ {
+ socket = new Socket(remoteEp, DatabaseDescriptor.getStoragePort());
+ socket.setTcpNoDelay(true);
+ output = new DataOutputStream(socket.getOutputStream());
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ new Thread(new Runnable()
+ {
+ public void run()
+ {
+ while (socket != null)
+ {
+ ByteBuffer bb;
+ try
+ {
+ bb = queue.take();
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ try
+ {
+ output.write(bb.array(), 0, bb.limit());
+ if (queue.peek() == null)
+ {
+ output.flush();
+ }
+ }
+ catch (IOException e)
+ {
+ logger.info("error writing to " + remoteEp);
+ pool.reset();
+ break;
+ }
+ }
+ }
+ }, "WRITE-" + remoteEp).start();
+ }
+
+ public void write(ByteBuffer buffer)
+ {
+ try
+ {
+ queue.put(buffer);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+
+ public void closeSocket()
+ {
+ try
+ {
+ socket.close();
+ }
+ catch (IOException e)
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("error closing socket", e);
+ }
+ socket = null;
+ }
+}
Propchange: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java (from r901433, incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java)
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java&r1=901433&r2=901435&rev=901435&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java Wed Jan 20 23:37:17 2010
@@ -23,29 +23,29 @@
import org.apache.cassandra.concurrent.StageManager;
-class TcpConnectionManager
+class OutboundTcpConnectionPool
{
private InetAddress localEp_;
private InetAddress remoteEp_;
- private TcpConnection cmdCon;
- private TcpConnection ackCon;
+ private OutboundTcpConnection cmdCon;
+ private OutboundTcpConnection ackCon;
- TcpConnectionManager(InetAddress localEp, InetAddress remoteEp)
+ // TODO localEp is ignored, get rid of it
+ OutboundTcpConnectionPool(InetAddress localEp, InetAddress remoteEp)
{
localEp_ = localEp;
remoteEp_ = remoteEp;
}
- private TcpConnection newCon() throws IOException
+ private OutboundTcpConnection newCon()
{
- TcpConnection con = new TcpConnection(this, localEp_, remoteEp_);
- return con;
+ return new OutboundTcpConnection(this, localEp_, remoteEp_);
}
/**
* returns the appropriate connection based on message type.
*/
- synchronized TcpConnection getConnection(Message msg) throws IOException
+ synchronized OutboundTcpConnection getConnection(Message msg)
{
if (StageManager.RESPONSE_STAGE.equals(msg.getMessageType()))
{
@@ -63,7 +63,7 @@
synchronized void reset()
{
- for (TcpConnection con : new TcpConnection[] { cmdCon, ackCon })
+ for (OutboundTcpConnection con : new OutboundTcpConnection[] { cmdCon, ackCon })
if (con != null)
con.closeSocket();
cmdCon = null;
Propchange: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
------------------------------------------------------------------------------
svn:eol-style = native