You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/11 22:19:44 UTC
svn commit: r898068 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra/net:
MessagingService.java TcpConnection.java
Author: jbellis
Date: Mon Jan 11 21:19:43 2010
New Revision: 898068
URL: http://svn.apache.org/viewvc?rev=898068&view=rev
Log:
move message serialization into MessagingService. patch by jbellis
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=898068&r1=898067&r2=898068&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Mon Jan 11 21:19:43 2010
@@ -22,6 +22,8 @@
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.IFailureDetectionEventListener;
+import org.apache.cassandra.net.io.FastSerializer;
+import org.apache.cassandra.net.io.ISerializer;
import org.apache.cassandra.net.io.SerializerType;
import org.apache.cassandra.net.sink.SinkManager;
import org.apache.cassandra.utils.*;
@@ -80,6 +82,8 @@
private static Logger logger_ = Logger.getLogger(MessagingService.class);
+ private static FastSerializer serializer_ = new FastSerializer();
+
private static volatile MessagingService messagingService_ = new MessagingService();
public static final int MESSAGE_DESERIALIZE_THREADS = 4;
@@ -365,16 +369,29 @@
return;
}
+ Message processedMessage = SinkManager.processClientMessageSink(message);
+ if (processedMessage == null)
+ {
+ return;
+ }
+
+ byte[] data;
+ try
+ {
+ data = serializer_.serialize(message);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ assert data.length > 0;
+ ByteBuffer buffer = MessagingService.packIt(data , false, false);
+
TcpConnection connection = null;
try
{
- Message processedMessage = SinkManager.processClientMessageSink(message);
- if (processedMessage == null)
- {
- return;
- }
connection = MessagingService.getConnection(processedMessage.getFrom(), to, message);
- connection.write(message);
+ connection.write(buffer);
}
catch (IOException e)
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java?rev=898068&r1=898067&r2=898068&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java Mon Jan 11 21:19:43 2010
@@ -45,12 +45,11 @@
{
// logging and profiling.
private static Logger logger_ = Logger.getLogger(TcpConnection.class);
- private static ISerializer serializer_ = new FastSerializer();
private SocketChannel socketChannel_;
private SelectionKey key_;
private TcpConnectionManager pool_;
- private boolean isIncoming_ = false;
- private TcpReader tcpReader_;
+ private boolean isIncoming_ = false;
+ private TcpReader tcpReader_;
private ReadWorkItem readWork_ = new ReadWorkItem();
private Queue<ByteBuffer> pendingWrites_ = new ConcurrentLinkedQueue<ByteBuffer>();
private InetAddress localEp_;
@@ -147,28 +146,20 @@
return socketChannel_;
}
- public void write(Message message) throws IOException
+ public synchronized void write(ByteBuffer buffer) throws IOException
{
- byte[] data = serializer_.serialize(message);
- if ( data.length > 0 )
- {
- ByteBuffer buffer = MessagingService.packIt(data , false, false);
- synchronized(this)
- {
- if (!pendingWrites_.isEmpty() || !socketChannel_.isConnected())
- {
- pendingWrites_.add(buffer);
- return;
- }
-
- socketChannel_.write(buffer);
-
- if (buffer.remaining() > 0)
- {
- pendingWrites_.add(buffer);
- turnOnInterestOps(key_, SelectionKey.OP_WRITE);
- }
- }
+ if (!pendingWrites_.isEmpty() || !socketChannel_.isConnected())
+ {
+ pendingWrites_.add(buffer);
+ return;
+ }
+
+ socketChannel_.write(buffer);
+
+ if (buffer.remaining() > 0)
+ {
+ pendingWrites_.add(buffer);
+ turnOnInterestOps(key_, SelectionKey.OP_WRITE);
}
}