You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/11 22:19:44 UTC

svn commit: r898068 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra/net: MessagingService.java TcpConnection.java

Author: jbellis
Date: Mon Jan 11 21:19:43 2010
New Revision: 898068

URL: http://svn.apache.org/viewvc?rev=898068&view=rev
Log:
move message serialization into MessagingService.  patch by jbellis

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=898068&r1=898067&r2=898068&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Mon Jan 11 21:19:43 2010
@@ -22,6 +22,8 @@
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.gms.IFailureDetectionEventListener;
+import org.apache.cassandra.net.io.FastSerializer;
+import org.apache.cassandra.net.io.ISerializer;
 import org.apache.cassandra.net.io.SerializerType;
 import org.apache.cassandra.net.sink.SinkManager;
 import org.apache.cassandra.utils.*;
@@ -80,6 +82,8 @@
     
     private static Logger logger_ = Logger.getLogger(MessagingService.class);
     
+    private static FastSerializer serializer_ = new FastSerializer();
+
     private static volatile MessagingService messagingService_ = new MessagingService();
 
     public static final int MESSAGE_DESERIALIZE_THREADS = 4;
@@ -365,16 +369,29 @@
             return;
         }
 
+        Message processedMessage = SinkManager.processClientMessageSink(message);
+        if (processedMessage == null)
+        {
+            return;
+        }
+
+        byte[] data;
+        try
+        {
+            data = serializer_.serialize(message);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+        assert data.length > 0;
+        ByteBuffer buffer = MessagingService.packIt(data , false, false);
+
         TcpConnection connection = null;
         try
         {
-            Message processedMessage = SinkManager.processClientMessageSink(message);
-            if (processedMessage == null)
-            {
-                return;
-            }
             connection = MessagingService.getConnection(processedMessage.getFrom(), to, message);
-            connection.write(message);
+            connection.write(buffer);
         }
         catch (IOException e)
         {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java?rev=898068&r1=898067&r2=898068&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java Mon Jan 11 21:19:43 2010
@@ -45,12 +45,11 @@
 {
     // logging and profiling.
     private static Logger logger_ = Logger.getLogger(TcpConnection.class);  
-    private static ISerializer serializer_ = new FastSerializer();
     private SocketChannel socketChannel_;
     private SelectionKey key_;
     private TcpConnectionManager pool_;
-    private boolean isIncoming_ = false;       
-    private TcpReader tcpReader_;    
+    private boolean isIncoming_ = false;
+    private TcpReader tcpReader_;
     private ReadWorkItem readWork_ = new ReadWorkItem(); 
     private Queue<ByteBuffer> pendingWrites_ = new ConcurrentLinkedQueue<ByteBuffer>();
     private InetAddress localEp_;
@@ -147,28 +146,20 @@
         return socketChannel_;
     }    
     
-    public void write(Message message) throws IOException
+    public synchronized void write(ByteBuffer buffer) throws IOException
     {           
-        byte[] data = serializer_.serialize(message);        
-        if ( data.length > 0 )
-        {    
-            ByteBuffer buffer = MessagingService.packIt(data , false, false);
-            synchronized(this)
-            {
-                if (!pendingWrites_.isEmpty() || !socketChannel_.isConnected())
-                {                     
-                    pendingWrites_.add(buffer);                
-                    return;
-                }
-                
-                socketChannel_.write(buffer);                
-                
-                if (buffer.remaining() > 0) 
-                {                   
-                    pendingWrites_.add(buffer);
-                    turnOnInterestOps(key_, SelectionKey.OP_WRITE);
-                }
-            }
+        if (!pendingWrites_.isEmpty() || !socketChannel_.isConnected())
+        {
+            pendingWrites_.add(buffer);
+            return;
+        }
+
+        socketChannel_.write(buffer);
+
+        if (buffer.remaining() > 0)
+        {
+            pendingWrites_.add(buffer);
+            turnOnInterestOps(key_, SelectionKey.OP_WRITE);
         }
     }