You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/21 00:36:57 UTC

svn commit: r901433 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra/net: ./ io/

Author: jbellis
Date: Wed Jan 20 23:36:56 2010
New Revision: 901433

URL: http://svn.apache.org/viewvc?rev=901433&view=rev
Log:
replace tcp socket reads w/ blocking i/o
patch by jbellis; reviewed by Brandon Williams for CASSANDRA-705

Added:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java   (with props)
Removed:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ContentLengthState.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ContentState.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ContentStreamState.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/DoneState.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ISerializer.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ProtocolHeaderState.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ProtocolState.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/SerializerAttribute.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StartState.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/TcpReader.java
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeserializationTask.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=901433&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Wed Jan 20 23:36:56 2010
@@ -0,0 +1,59 @@
+package org.apache.cassandra.net;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOError;
+import java.io.IOException;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+
+import org.apache.log4j.Logger;
+
+public class IncomingTcpConnection extends Thread
+{
+    private static Logger logger = Logger.getLogger(IncomingTcpConnection.class);
+
+    private final DataInputStream input;
+    private final byte[] protocolBytes = new byte[MessagingService.PROTOCOL_SIZE];
+    private final byte[] headerBytes = new byte[4];
+    private final byte[] sizeBytes = new byte[4];
+    private final ByteBuffer sizeBuffer = ByteBuffer.wrap(sizeBytes).asReadOnlyBuffer();
+
+    public IncomingTcpConnection(Socket socket)
+    {
+        try
+        {
+            input = new DataInputStream(socket.getInputStream());
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
+    }
+
+    @Override
+    public void run()
+    {
+        while (true)
+        {
+            try
+            {
+                input.readFully(protocolBytes);
+                MessagingService.validateProtocol(protocolBytes);
+                input.readFully(headerBytes);
+                input.readFully(sizeBytes);
+                int size = sizeBuffer.getInt();
+                sizeBuffer.clear();
+                byte[] contentBytes = new byte[size];
+                input.readFully(contentBytes);
+                MessagingService.getDeserializationExecutor().submit(new MessageDeserializationTask(new ByteArrayInputStream(contentBytes)));
+            }
+            catch (IOException e)
+            {
+                if (logger.isDebugEnabled())
+                    logger.debug("error reading from socket; closing", e);
+                break;
+            }
+        }
+    }
+}

Propchange: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeserializationTask.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeserializationTask.java?rev=901433&r1=901432&r2=901433&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeserializationTask.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeserializationTask.java Wed Jan 20 23:36:56 2010
@@ -28,12 +28,13 @@
 
 class MessageDeserializationTask implements Runnable
 {
-    private static Logger logger_ = Logger.getLogger(MessageDeserializationTask.class); 
-    private byte[] bytes_ = new byte[0];
+    private static Logger logger_ = Logger.getLogger(MessageDeserializationTask.class);
     
-    MessageDeserializationTask(byte[] bytes)
+    private ByteArrayInputStream bytes;
+    
+    MessageDeserializationTask(ByteArrayInputStream bytes)
     {
-        bytes_ = bytes;        
+        this.bytes = bytes;
     }
     
     public void run()
@@ -41,7 +42,7 @@
         Message message = null;
         try
         {
-            message = Message.serializer().deserialize(new DataInputStream(new ByteArrayInputStream(bytes_)));
+            message = Message.serializer().deserialize(new DataInputStream(bytes));
         }
         catch (IOException e)
         {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=901433&r1=901432&r2=901433&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Wed Jan 20 23:36:56 2010
@@ -35,8 +35,8 @@
 import java.net.ServerSocket;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.Socket;
 import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
 import java.nio.channels.ServerSocketChannel;
 import java.security.MessageDigest;
 import java.util.*;
@@ -49,8 +49,9 @@
     private static int version_ = 1;
     //TODO: make this parameter dynamic somehow.  Not sure if config is appropriate.
     private static SerializerType serializerType_ = SerializerType.BINARY;
-    
-    private static byte[] protocol_ = new byte[16];
+
+    public static final int PROTOCOL_SIZE = 16;
+    private static byte[] protocol_ = new byte[PROTOCOL_SIZE];
     /* Verb Handler for the Response */
     public static final String responseVerbHandler_ = "RESPONSE";
 
@@ -61,9 +62,6 @@
     /* Lookup table for registering message handlers based on the verb. */
     private static Map<String, IVerbHandler> verbHandlers_;
 
-    /* Thread pool to handle messaging read activities of Socket and default stage */
-    private static ExecutorService messageReadExecutor_;
-    
     /* Thread pool to handle deserialization of messages read from the socket. */
     private static ExecutorService messageDeserializerExecutor_;
     
@@ -99,10 +97,6 @@
         callbackMap_ = new Cachetable<String, IAsyncCallback>( 2 * DatabaseDescriptor.getRpcTimeout() );
         taskCompletionMap_ = new Cachetable<String, IAsyncResult>( 2 * DatabaseDescriptor.getRpcTimeout() );        
 
-        // read executor will have one runnable enqueued per connection with stuff to read on it,
-        // so there is no need to make it bounded, and one thread should be plenty.
-        messageReadExecutor_ = new JMXEnabledThreadPoolExecutor("MS-CONNECTION-READ-POOL");
-
         // read executor puts messages to deserialize on this.
         messageDeserializerExecutor_ = new JMXEnabledThreadPoolExecutor(1,
                                                                         Runtime.getRuntime().availableProcessors(),
@@ -116,6 +110,8 @@
         protocol_ = hash("MD5", "FB-MESSAGING".getBytes());        
         /* register the response verb handler */
         registerVerbHandlers(MessagingService.responseVerbHandler_, new ResponseVerbHandler());
+
+        FailureDetector.instance.registerFailureDetectionEventListener(this);
     }
     
     public byte[] hash(String type, byte data[])
@@ -147,15 +143,28 @@
     public void listen(InetAddress localEp) throws IOException
     {        
         ServerSocketChannel serverChannel = ServerSocketChannel.open();
-        ServerSocket ss = serverChannel.socket();
+        final ServerSocket ss = serverChannel.socket();
         ss.setReuseAddress(true);
         ss.bind(new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort()));
-        serverChannel.configureBlocking(false);
-        
-        SelectionKeyHandler handler = new TcpConnectionHandler(localEp);
 
-        SelectionKey key = SelectorManager.getSelectorManager().register(serverChannel, handler, SelectionKey.OP_ACCEPT);          
-        FailureDetector.instance.registerFailureDetectionEventListener(this);
+        new Thread(new Runnable()
+        {
+            public void run()
+            {
+                while (true)
+                {
+                    try
+                    {
+                        Socket socket = ss.accept();
+                        new IncomingTcpConnection(socket).start();
+                    }
+                    catch (IOException e)
+                    {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+        }, "ACCEPT-" + localEp).start();
     }
     
     /**
@@ -396,7 +405,6 @@
     {
         logger_.info("Shutting down ...");
 
-        messageReadExecutor_.shutdownNow();
         messageDeserializerExecutor_.shutdownNow();
         streamExecutor_.shutdownNow();
         StageManager.shutdownNow();
@@ -440,11 +448,6 @@
         return taskCompletionMap_.remove(key);
     }
 
-    public static ExecutorService getReadExecutor()
-    {
-        return messageReadExecutor_;
-    }
-
     public static ExecutorService getDeserializationExecutor()
     {
         return messageDeserializerExecutor_;
@@ -454,6 +457,12 @@
     {
         return isEqual(protocol_, protocol);
     }
+
+    public static void validateProtocol(byte[] protocol) throws IOException
+    {
+        if (!isProtocolValid(protocol))
+            throw new IOException("invalid protocol header");
+    }
     
     public static boolean isEqual(byte digestA[], byte digestB[])
     {
@@ -494,7 +503,7 @@
         /* Finished the protocol header setup */
 
         byte[] header = FBUtilities.toByteArray(n);
-        ByteBuffer buffer = ByteBuffer.allocate(16 + header.length + size.length + bytes.length);
+        ByteBuffer buffer = ByteBuffer.allocate(PROTOCOL_SIZE + header.length + size.length + bytes.length);
         buffer.put(protocol_);
         buffer.put(header);
         buffer.put(size);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java?rev=901433&r1=901432&r2=901433&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java Wed Jan 20 23:36:56 2010
@@ -33,9 +33,6 @@
 import java.net.InetSocketAddress;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.net.io.ProtocolState;
-import org.apache.cassandra.net.io.StartState;
-import org.apache.cassandra.net.io.TcpReader;
 
 import org.apache.log4j.Logger;
 
@@ -47,8 +44,6 @@
     private SelectionKey key_;
     private TcpConnectionManager pool_;
     private boolean isIncoming_ = false;
-    private TcpReader tcpReader_;
-    private ConnectionReader reader_ = new ConnectionReader();
     private Queue<ByteBuffer> pendingWrites_ = new ConcurrentLinkedQueue<ByteBuffer>();
     private InetAddress localEp_;
     private InetAddress remoteEp_;
@@ -108,31 +103,6 @@
     {
         this(from, to, null, true);
     }
-    
-    /*
-     * This method is invoked by the TcpConnectionHandler to accept incoming TCP connections.
-     * Accept the connection and then register interest for reads.
-    */
-    static void acceptConnection(SocketChannel socketChannel, InetAddress localEp, boolean isIncoming) throws IOException
-    {
-        TcpConnection tcpConnection = new TcpConnection(socketChannel, localEp, true);
-        tcpConnection.registerReadInterest();
-    }
-    
-    private void registerReadInterest() throws IOException
-    {
-        key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
-    }
-    
-    // used for incoming connections
-    TcpConnection(SocketChannel socketChannel, InetAddress localEp, boolean isIncoming) throws IOException
-    {       
-        socketChannel_ = socketChannel;
-        socketChannel_.configureBlocking(false);                           
-        isIncoming_ = isIncoming;
-        localEp_ = localEp;
-    }
-
 
     public InetAddress getEndPoint()
     {
@@ -395,79 +365,7 @@
             }
         }
     }
-    
-    // called in the selector thread
-    public void read(SelectionKey key)
-    {
-        turnOffInterestOps(key, SelectionKey.OP_READ);
-        // publish this event onto to the TCPReadEvent Queue.
-        MessagingService.getReadExecutor().execute(reader_);
-    }
-    
-    class ConnectionReader implements Runnable
-    {                 
-        // called from the TCP READ executor
-        public void run()
-        {                         
-            if ( tcpReader_ == null )
-            {
-                tcpReader_ = new TcpReader(TcpConnection.this);    
-                StartState nextState = tcpReader_.getSocketState(TcpReader.TcpReaderState.PREAMBLE);
-                if ( nextState == null )
-                {
-                    nextState = new ProtocolState(tcpReader_);
-                    tcpReader_.putSocketState(TcpReader.TcpReaderState.PREAMBLE, nextState);
-                }
-                tcpReader_.morphState(nextState);
-            }
-            
-            try
-            {           
-                byte[] bytes;
-                while ( (bytes = tcpReader_.read()).length > 0 )
-                {                       
-                    ProtocolHeader pH = tcpReader_.getProtocolHeader();                    
-                    if ( !pH.isStreamingMode_ )
-                    {
-                        /* first message received */
-                        if (remoteEp_ == null)
-                        {
-                            remoteEp_ = socketChannel_.socket().getInetAddress();
-                        }
-                        
-                        /* Deserialize and handle the message */
-                        MessagingService.getDeserializationExecutor().submit(new MessageDeserializationTask(bytes));                                                  
-                        tcpReader_.resetState();
-                    }
-                    else
-                    {
-                        closeSocket();
-                    }                    
-                }
-            }
-            catch ( IOException ex )
-            {                   
-                handleException(ex);
-            }
-            catch ( Throwable th )
-            {
-                handleException(th);
-            }
-            finally
-            {
-                if (key_.isValid()) //not valid if closeSocket has been called above
-                    turnOnInterestOps(key_, SelectionKey.OP_READ);
-            }
-        }
-        
-        private void handleException(Throwable th)
-        {
-            logger_.warn("Problem reading from socket connected to : " + socketChannel_, th);
-            // This is to fix the weird Linux bug with NIO.
-            errorClose();
-        }
-    }
-    
+
     public int compareTo(Object o)
     {
         if (o instanceof TcpConnection)