You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/21 00:37:17 UTC

svn commit: r901435 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra/net: FileStreamTask.java MessagingService.java OutboundTcpConnection.java OutboundTcpConnectionPool.java TcpConnection.java TcpConnectionManager.java

Author: jbellis
Date: Wed Jan 20 23:37:17 2010
New Revision: 901435

URL: http://svn.apache.org/viewvc?rev=901435&view=rev
Log:
replace tcp writes w/ blocking i/o
patch by jbellis; reviewed by Brandon Williams for CASSANDRA-705

Added:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java   (with props)
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java   (contents, props changed)
      - copied, changed from r901433, incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java
Removed:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java?rev=901435&r1=901434&r2=901435&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java Wed Jan 20 23:37:17 2010
@@ -44,6 +44,8 @@
     
     public void run()
     {
+        /*
+        TODO
         TcpConnection connection = null;
         try
         {                        
@@ -61,6 +63,6 @@
             }
             throw new RuntimeException(e);
         }
+         */
     }
-
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=901435&r1=901434&r2=901435&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Wed Jan 20 23:37:17 2010
@@ -68,7 +68,7 @@
     /* Thread pool to handle messaging write activities */
     private static ExecutorService streamExecutor_;
     
-    private static NonBlockingHashMap<String, TcpConnectionManager> connectionManagers_ = new NonBlockingHashMap<String, TcpConnectionManager>();
+    private static NonBlockingHashMap<String, OutboundTcpConnectionPool> connectionManagers_ = new NonBlockingHashMap<String, OutboundTcpConnectionPool>();
     
     private static Logger logger_ = Logger.getLogger(MessagingService.class);
     
@@ -186,19 +186,19 @@
         }
     }
     
-    public static TcpConnectionManager getConnectionPool(InetAddress from, InetAddress to)
+    public static OutboundTcpConnectionPool getConnectionPool(InetAddress from, InetAddress to)
     {
         String key = from + ":" + to;
-        TcpConnectionManager cp = connectionManagers_.get(key);
+        OutboundTcpConnectionPool cp = connectionManagers_.get(key);
         if (cp == null)
         {
-            connectionManagers_.putIfAbsent(key, new TcpConnectionManager(from, to));
+            connectionManagers_.putIfAbsent(key, new OutboundTcpConnectionPool(from, to));
             cp = connectionManagers_.get(key);
         }
         return cp;
     }
 
-    public static TcpConnection getConnection(InetAddress from, InetAddress to, Message msg) throws IOException
+    public static OutboundTcpConnection getConnection(InetAddress from, InetAddress to, Message msg)
     {
         return getConnectionPool(from, to).getConnection(msg);
     }
@@ -331,20 +331,9 @@
         assert data.length > 0;
         ByteBuffer buffer = packIt(data , false, false);
 
-        TcpConnection connection = null;
-        try
-        {
-            connection = MessagingService.getConnection(processedMessage.getFrom(), to, message);
-            connection.write(buffer);
-        }
-        catch (IOException e)
-        {
-            if (connection != null)
-            {
-                connection.errorClose();
-            }
-            logger_.error("unexpected error writing " + message, e);
-        }
+        OutboundTcpConnection connection = null;
+        connection = getConnection(processedMessage.getFrom(), to, message);
+        connection.write(buffer);
     }
     
     public IAsyncResult sendRR(Message message, InetAddress to)

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=901435&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java Wed Jan 20 23:37:17 2010
@@ -0,0 +1,97 @@
+package org.apache.cassandra.net;
+
+import java.io.DataOutputStream;
+import java.io.IOError;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class OutboundTcpConnection
+{
+    private static Logger logger = Logger.getLogger(OutboundTcpConnection.class);
+
+    public BlockingQueue<ByteBuffer> queue = new LinkedBlockingQueue<ByteBuffer>();
+    public DataOutputStream output;
+    public Socket socket;
+
+    // TODO localEp is ignored, get rid of it
+    public OutboundTcpConnection(final OutboundTcpConnectionPool pool, InetAddress localEp, final InetAddress remoteEp)
+    {
+        try
+        {
+            socket = new Socket(remoteEp, DatabaseDescriptor.getStoragePort());
+            socket.setTcpNoDelay(true);
+            output = new DataOutputStream(socket.getOutputStream());
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
+        new Thread(new Runnable()
+        {
+            public void run()
+            {
+                while (socket != null)
+                {
+                    ByteBuffer bb;
+                    try
+                    {
+                        bb = queue.take();
+                    }
+                    catch (InterruptedException e)
+                    {
+                        throw new AssertionError(e);
+                    }
+                    try
+                    {
+                        output.write(bb.array(), 0, bb.limit());
+                        if (queue.peek() == null)
+                        {
+                            output.flush();
+                        }
+                    }
+                    catch (IOException e)
+                    {
+                        logger.info("error writing to " + remoteEp);
+                        pool.reset();
+                        break;
+                    }
+                }
+            }
+        }, "WRITE-" + remoteEp).start();
+    }
+
+    public void write(ByteBuffer buffer)
+    {
+        try
+        {
+            queue.put(buffer);
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
+    }
+
+    public void closeSocket()
+    {
+        try
+        {
+            socket.close();
+        }
+        catch (IOException e)
+        {
+            if (logger.isDebugEnabled())
+                logger.debug("error closing socket", e);
+        }
+        socket = null;
+    }
+}

Propchange: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java (from r901433, incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java)
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java&r1=901433&r2=901435&rev=901435&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java Wed Jan 20 23:37:17 2010
@@ -23,29 +23,29 @@
 
 import org.apache.cassandra.concurrent.StageManager;
 
-class TcpConnectionManager
+class OutboundTcpConnectionPool
 {
     private InetAddress localEp_;
     private InetAddress remoteEp_;
-    private TcpConnection cmdCon;
-    private TcpConnection ackCon;
+    private OutboundTcpConnection cmdCon;
+    private OutboundTcpConnection ackCon;
 
-    TcpConnectionManager(InetAddress localEp, InetAddress remoteEp)
+    // TODO localEp is ignored, get rid of it
+    OutboundTcpConnectionPool(InetAddress localEp, InetAddress remoteEp)
     {
         localEp_ = localEp;
         remoteEp_ = remoteEp;
     }
 
-    private TcpConnection newCon() throws IOException
+    private OutboundTcpConnection newCon()
     {
-        TcpConnection con = new TcpConnection(this, localEp_, remoteEp_);
-        return con;
+        return new OutboundTcpConnection(this, localEp_, remoteEp_);
     }
 
     /**
      * returns the appropriate connection based on message type.
      */
-    synchronized TcpConnection getConnection(Message msg) throws IOException
+    synchronized OutboundTcpConnection getConnection(Message msg)
     {
         if (StageManager.RESPONSE_STAGE.equals(msg.getMessageType()))
         {
@@ -63,7 +63,7 @@
 
     synchronized void reset()
     {
-        for (TcpConnection con : new TcpConnection[] { cmdCon, ackCon })
+        for (OutboundTcpConnection con : new OutboundTcpConnection[] { cmdCon, ackCon })
             if (con != null)
                 con.closeSocket();
         cmdCon = null;

Propchange: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
------------------------------------------------------------------------------
    svn:eol-style = native