You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/21 00:36:57 UTC
svn commit: r901433 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra/net: ./ io/
Author: jbellis
Date: Wed Jan 20 23:36:56 2010
New Revision: 901433
URL: http://svn.apache.org/viewvc?rev=901433&view=rev
Log:
replace tcp socket reads w/ blocking i/o
patch by jbellis; reviewed by Brandon Williams for CASSANDRA-705
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (with props)
Removed:
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ContentLengthState.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ContentState.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ContentStreamState.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/DoneState.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ISerializer.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ProtocolHeaderState.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ProtocolState.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/SerializerAttribute.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StartState.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/TcpReader.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeserializationTask.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java
Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=901433&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Wed Jan 20 23:36:56 2010
@@ -0,0 +1,59 @@
+package org.apache.cassandra.net;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOError;
+import java.io.IOException;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+
+import org.apache.log4j.Logger;
+
+public class IncomingTcpConnection extends Thread
+{
+ private static Logger logger = Logger.getLogger(IncomingTcpConnection.class);
+
+ private final DataInputStream input;
+ private final byte[] protocolBytes = new byte[MessagingService.PROTOCOL_SIZE];
+ private final byte[] headerBytes = new byte[4];
+ private final byte[] sizeBytes = new byte[4];
+ private final ByteBuffer sizeBuffer = ByteBuffer.wrap(sizeBytes).asReadOnlyBuffer();
+
+ public IncomingTcpConnection(Socket socket)
+ {
+ try
+ {
+ input = new DataInputStream(socket.getInputStream());
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+
+ @Override
+ public void run()
+ {
+ while (true)
+ {
+ try
+ {
+ input.readFully(protocolBytes);
+ MessagingService.validateProtocol(protocolBytes);
+ input.readFully(headerBytes);
+ input.readFully(sizeBytes);
+ int size = sizeBuffer.getInt();
+ sizeBuffer.clear();
+ byte[] contentBytes = new byte[size];
+ input.readFully(contentBytes);
+ MessagingService.getDeserializationExecutor().submit(new MessageDeserializationTask(new ByteArrayInputStream(contentBytes)));
+ }
+ catch (IOException e)
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("error reading from socket; closing", e);
+ break;
+ }
+ }
+ }
+}
Propchange: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeserializationTask.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeserializationTask.java?rev=901433&r1=901432&r2=901433&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeserializationTask.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeserializationTask.java Wed Jan 20 23:36:56 2010
@@ -28,12 +28,13 @@
class MessageDeserializationTask implements Runnable
{
- private static Logger logger_ = Logger.getLogger(MessageDeserializationTask.class);
- private byte[] bytes_ = new byte[0];
+ private static Logger logger_ = Logger.getLogger(MessageDeserializationTask.class);
- MessageDeserializationTask(byte[] bytes)
+ private ByteArrayInputStream bytes;
+
+ MessageDeserializationTask(ByteArrayInputStream bytes)
{
- bytes_ = bytes;
+ this.bytes = bytes;
}
public void run()
@@ -41,7 +42,7 @@
Message message = null;
try
{
- message = Message.serializer().deserialize(new DataInputStream(new ByteArrayInputStream(bytes_)));
+ message = Message.serializer().deserialize(new DataInputStream(bytes));
}
catch (IOException e)
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=901433&r1=901432&r2=901433&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Wed Jan 20 23:36:56 2010
@@ -35,8 +35,8 @@
import java.net.ServerSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.Socket;
import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.security.MessageDigest;
import java.util.*;
@@ -49,8 +49,9 @@
private static int version_ = 1;
//TODO: make this parameter dynamic somehow. Not sure if config is appropriate.
private static SerializerType serializerType_ = SerializerType.BINARY;
-
- private static byte[] protocol_ = new byte[16];
+
+ public static final int PROTOCOL_SIZE = 16;
+ private static byte[] protocol_ = new byte[PROTOCOL_SIZE];
/* Verb Handler for the Response */
public static final String responseVerbHandler_ = "RESPONSE";
@@ -61,9 +62,6 @@
/* Lookup table for registering message handlers based on the verb. */
private static Map<String, IVerbHandler> verbHandlers_;
- /* Thread pool to handle messaging read activities of Socket and default stage */
- private static ExecutorService messageReadExecutor_;
-
/* Thread pool to handle deserialization of messages read from the socket. */
private static ExecutorService messageDeserializerExecutor_;
@@ -99,10 +97,6 @@
callbackMap_ = new Cachetable<String, IAsyncCallback>( 2 * DatabaseDescriptor.getRpcTimeout() );
taskCompletionMap_ = new Cachetable<String, IAsyncResult>( 2 * DatabaseDescriptor.getRpcTimeout() );
- // read executor will have one runnable enqueued per connection with stuff to read on it,
- // so there is no need to make it bounded, and one thread should be plenty.
- messageReadExecutor_ = new JMXEnabledThreadPoolExecutor("MS-CONNECTION-READ-POOL");
-
// read executor puts messages to deserialize on this.
messageDeserializerExecutor_ = new JMXEnabledThreadPoolExecutor(1,
Runtime.getRuntime().availableProcessors(),
@@ -116,6 +110,8 @@
protocol_ = hash("MD5", "FB-MESSAGING".getBytes());
/* register the response verb handler */
registerVerbHandlers(MessagingService.responseVerbHandler_, new ResponseVerbHandler());
+
+ FailureDetector.instance.registerFailureDetectionEventListener(this);
}
public byte[] hash(String type, byte data[])
@@ -147,15 +143,28 @@
public void listen(InetAddress localEp) throws IOException
{
ServerSocketChannel serverChannel = ServerSocketChannel.open();
- ServerSocket ss = serverChannel.socket();
+ final ServerSocket ss = serverChannel.socket();
ss.setReuseAddress(true);
ss.bind(new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort()));
- serverChannel.configureBlocking(false);
-
- SelectionKeyHandler handler = new TcpConnectionHandler(localEp);
- SelectionKey key = SelectorManager.getSelectorManager().register(serverChannel, handler, SelectionKey.OP_ACCEPT);
- FailureDetector.instance.registerFailureDetectionEventListener(this);
+ new Thread(new Runnable()
+ {
+ public void run()
+ {
+ while (true)
+ {
+ try
+ {
+ Socket socket = ss.accept();
+ new IncomingTcpConnection(socket).start();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }, "ACCEPT-" + localEp).start();
}
/**
@@ -396,7 +405,6 @@
{
logger_.info("Shutting down ...");
- messageReadExecutor_.shutdownNow();
messageDeserializerExecutor_.shutdownNow();
streamExecutor_.shutdownNow();
StageManager.shutdownNow();
@@ -440,11 +448,6 @@
return taskCompletionMap_.remove(key);
}
- public static ExecutorService getReadExecutor()
- {
- return messageReadExecutor_;
- }
-
public static ExecutorService getDeserializationExecutor()
{
return messageDeserializerExecutor_;
@@ -454,6 +457,12 @@
{
return isEqual(protocol_, protocol);
}
+
+ public static void validateProtocol(byte[] protocol) throws IOException
+ {
+ if (!isProtocolValid(protocol))
+ throw new IOException("invalid protocol header");
+ }
public static boolean isEqual(byte digestA[], byte digestB[])
{
@@ -494,7 +503,7 @@
/* Finished the protocol header setup */
byte[] header = FBUtilities.toByteArray(n);
- ByteBuffer buffer = ByteBuffer.allocate(16 + header.length + size.length + bytes.length);
+ ByteBuffer buffer = ByteBuffer.allocate(PROTOCOL_SIZE + header.length + size.length + bytes.length);
buffer.put(protocol_);
buffer.put(header);
buffer.put(size);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java?rev=901433&r1=901432&r2=901433&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java Wed Jan 20 23:36:56 2010
@@ -33,9 +33,6 @@
import java.net.InetSocketAddress;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.net.io.ProtocolState;
-import org.apache.cassandra.net.io.StartState;
-import org.apache.cassandra.net.io.TcpReader;
import org.apache.log4j.Logger;
@@ -47,8 +44,6 @@
private SelectionKey key_;
private TcpConnectionManager pool_;
private boolean isIncoming_ = false;
- private TcpReader tcpReader_;
- private ConnectionReader reader_ = new ConnectionReader();
private Queue<ByteBuffer> pendingWrites_ = new ConcurrentLinkedQueue<ByteBuffer>();
private InetAddress localEp_;
private InetAddress remoteEp_;
@@ -108,31 +103,6 @@
{
this(from, to, null, true);
}
-
- /*
- * This method is invoked by the TcpConnectionHandler to accept incoming TCP connections.
- * Accept the connection and then register interest for reads.
- */
- static void acceptConnection(SocketChannel socketChannel, InetAddress localEp, boolean isIncoming) throws IOException
- {
- TcpConnection tcpConnection = new TcpConnection(socketChannel, localEp, true);
- tcpConnection.registerReadInterest();
- }
-
- private void registerReadInterest() throws IOException
- {
- key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
- }
-
- // used for incoming connections
- TcpConnection(SocketChannel socketChannel, InetAddress localEp, boolean isIncoming) throws IOException
- {
- socketChannel_ = socketChannel;
- socketChannel_.configureBlocking(false);
- isIncoming_ = isIncoming;
- localEp_ = localEp;
- }
-
public InetAddress getEndPoint()
{
@@ -395,79 +365,7 @@
}
}
}
-
- // called in the selector thread
- public void read(SelectionKey key)
- {
- turnOffInterestOps(key, SelectionKey.OP_READ);
- // publish this event onto to the TCPReadEvent Queue.
- MessagingService.getReadExecutor().execute(reader_);
- }
-
- class ConnectionReader implements Runnable
- {
- // called from the TCP READ executor
- public void run()
- {
- if ( tcpReader_ == null )
- {
- tcpReader_ = new TcpReader(TcpConnection.this);
- StartState nextState = tcpReader_.getSocketState(TcpReader.TcpReaderState.PREAMBLE);
- if ( nextState == null )
- {
- nextState = new ProtocolState(tcpReader_);
- tcpReader_.putSocketState(TcpReader.TcpReaderState.PREAMBLE, nextState);
- }
- tcpReader_.morphState(nextState);
- }
-
- try
- {
- byte[] bytes;
- while ( (bytes = tcpReader_.read()).length > 0 )
- {
- ProtocolHeader pH = tcpReader_.getProtocolHeader();
- if ( !pH.isStreamingMode_ )
- {
- /* first message received */
- if (remoteEp_ == null)
- {
- remoteEp_ = socketChannel_.socket().getInetAddress();
- }
-
- /* Deserialize and handle the message */
- MessagingService.getDeserializationExecutor().submit(new MessageDeserializationTask(bytes));
- tcpReader_.resetState();
- }
- else
- {
- closeSocket();
- }
- }
- }
- catch ( IOException ex )
- {
- handleException(ex);
- }
- catch ( Throwable th )
- {
- handleException(th);
- }
- finally
- {
- if (key_.isValid()) //not valid if closeSocket has been called above
- turnOnInterestOps(key_, SelectionKey.OP_READ);
- }
- }
-
- private void handleException(Throwable th)
- {
- logger_.warn("Problem reading from socket connected to : " + socketChannel_, th);
- // This is to fix the weird Linux bug with NIO.
- errorClose();
- }
- }
-
+
public int compareTo(Object o)
{
if (o instanceof TcpConnection)