You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/21 00:38:59 UTC

svn commit: r901440 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra/net: FileStreamTask.java IncomingTcpConnection.java MessagingService.java

Author: jbellis
Date: Wed Jan 20 23:38:58 2010
New Revision: 901440

URL: http://svn.apache.org/viewvc?rev=901440&view=rev
Log:
simplify header code
patch by jbellis; reviewed by Brandon Williams for CASSANDRA-705

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java?rev=901440&r1=901439&r2=901440&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java Wed Jan 20 23:38:58 2010
@@ -80,7 +80,7 @@
         {
             FileChannel fc = raf.getChannel();
 
-            ByteBuffer buffer = MessagingService.constructStreamHeader(false, true);
+            ByteBuffer buffer = MessagingService.constructStreamHeader(false);
             channel.write(buffer);
             assert buffer.remaining() == 0;
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=901440&r1=901439&r2=901440&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Wed Jan 20 23:38:58 2010
@@ -17,10 +17,6 @@
     private static Logger logger = Logger.getLogger(IncomingTcpConnection.class);
 
     private final DataInputStream input;
-    private final byte[] protocolBytes = new byte[MessagingService.PROTOCOL_SIZE];
-    private final byte[] headerBytes = new byte[4];
-    private final byte[] sizeBytes = new byte[4];
-    private final ByteBuffer sizeBuffer = ByteBuffer.wrap(sizeBytes).asReadOnlyBuffer();
     private Socket socket;
 
     public IncomingTcpConnection(Socket socket)
@@ -43,14 +39,11 @@
         {
             try
             {
-                input.readFully(protocolBytes);
-                MessagingService.validateProtocol(protocolBytes);
-
-                input.readFully(headerBytes);
-                int pH = FBUtilities.byteArrayToInt(headerBytes);
-                int type = MessagingService.getBits(pH, 1, 2);
-                boolean isStream = MessagingService.getBits(pH, 3, 1) == 1;
-                int version = MessagingService.getBits(pH, 15, 8);
+                MessagingService.validateMagic(input.readInt());
+                int header = input.readInt();
+                int type = MessagingService.getBits(header, 1, 2);
+                boolean isStream = MessagingService.getBits(header, 3, 1) == 1;
+                int version = MessagingService.getBits(header, 15, 8);
 
                 if (isStream)
                 {
@@ -58,10 +51,7 @@
                 }
                 else
                 {
-                    input.readFully(sizeBytes);
-                    int size = sizeBuffer.getInt();
-                    sizeBuffer.clear();
-
+                    int size = input.readInt();
                     byte[] contentBytes = new byte[size];
                     input.readFully(contentBytes);
                     MessagingService.getDeserializationExecutor().submit(new MessageDeserializationTask(new ByteArrayInputStream(contentBytes)));

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=901440&r1=901439&r2=901440&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Wed Jan 20 23:38:58 2010
@@ -50,8 +50,8 @@
     //TODO: make this parameter dynamic somehow.  Not sure if config is appropriate.
     private static SerializerType serializerType_ = SerializerType.BINARY;
 
-    public static final int PROTOCOL_SIZE = 16;
-    private static byte[] protocol_ = new byte[PROTOCOL_SIZE];
+    /** we preface every message with this number so the recipient can validate the sender is sane */
+    public static final int PROTOCOL_MAGIC = 0xCA552DFA;
     /* Verb Handler for the Response */
     public static final String responseVerbHandler_ = "RESPONSE";
 
@@ -107,7 +107,6 @@
 
         streamExecutor_ = new JMXEnabledThreadPoolExecutor("MESSAGE-STREAMING-POOL");
                 
-        protocol_ = hash("MD5", "FB-MESSAGING".getBytes());        
         /* register the response verb handler */
         registerVerbHandlers(MessagingService.responseVerbHandler_, new ResponseVerbHandler());
 
@@ -317,7 +316,7 @@
             throw new RuntimeException(e);
         }
         assert data.length > 0;
-        ByteBuffer buffer = packIt(data , false, false);
+        ByteBuffer buffer = packIt(data , false);
 
         // write it
         connection.write(buffer);
@@ -399,14 +398,9 @@
         return messageDeserializerExecutor_;
     }
 
-    public static boolean isProtocolValid(byte[] protocol)
+    public static void validateMagic(int magic) throws IOException
     {
-        return isEqual(protocol_, protocol);
-    }
-
-    public static void validateProtocol(byte[] protocol) throws IOException
-    {
-        if (!isProtocolValid(protocol))
+        if (magic != PROTOCOL_MAGIC)
             throw new IOException("invalid protocol header");
     }
     
@@ -420,10 +414,9 @@
         return x >>> (p + 1) - n & ~(-1 << n);
     }
         
-    public static ByteBuffer packIt(byte[] bytes, boolean compress, boolean stream)
+    public static ByteBuffer packIt(byte[] bytes, boolean compress)
     {
-        byte[] size = FBUtilities.toByteArray(bytes.length);
-        /* 
+        /*
              Setting up the protocol header. This is 4 bytes long
              represented as an integer. The first 2 bits indicate
              the serializer type. The 3rd bit indicates if compression
@@ -433,32 +426,25 @@
              The next 8 bits indicate a version number. Remaining 15 bits 
              are not used currently.            
         */
-        int n = 0;
+        int header = 0;
         // Setting up the serializer bit
-        n |= serializerType_.ordinal();
+        header |= serializerType_.ordinal();
         // set compression bit.
-        if ( compress )
-            n |= 4;
-        
-        // set streaming bit
-        if ( stream )
-            n |= 8;
-        
+        if (compress)
+            header |= 4;
         // Setting up the version bit
-        n |= (version_ << 8);               
-        /* Finished the protocol header setup */
+        header |= (version_ << 8);
 
-        byte[] header = FBUtilities.toByteArray(n);
-        ByteBuffer buffer = ByteBuffer.allocate(PROTOCOL_SIZE + header.length + size.length + bytes.length);
-        buffer.put(protocol_);
-        buffer.put(header);
-        buffer.put(size);
+        ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + 4 + bytes.length);
+        buffer.putInt(PROTOCOL_MAGIC);
+        buffer.putInt(header);
+        buffer.putInt(bytes.length);
         buffer.put(bytes);
         buffer.flip();
         return buffer;
     }
         
-    public static ByteBuffer constructStreamHeader(boolean compress, boolean stream)
+    public static ByteBuffer constructStreamHeader(boolean compress)
     {
         /* 
         Setting up the protocol header. This is 4 bytes long
@@ -470,25 +456,21 @@
         The next 8 bits indicate a version number. Remaining 15 bits 
         are not used currently.            
         */
-        int n = 0;
+        int header = 0;
         // Setting up the serializer bit
-        n |= serializerType_.ordinal();
+        header |= serializerType_.ordinal();
         // set compression bit.
         if ( compress )
-            n |= 4;
-       
+            header |= 4;
         // set streaming bit
-        if ( stream )
-            n |= 8;
-       
-        // Setting up the version bit 
-        n |= (version_ << 8);              
+        header |= 8;
+        // Setting up the version bit
+        header |= (version_ << 8);
         /* Finished the protocol header setup */
 
-        byte[] header = FBUtilities.toByteArray(n);
-        ByteBuffer buffer = ByteBuffer.allocate(16 + header.length);
-        buffer.put(protocol_);
-        buffer.put(header);
+        ByteBuffer buffer = ByteBuffer.allocate(4 + 4);
+        buffer.putInt(PROTOCOL_MAGIC);
+        buffer.putInt(header);
         buffer.flip();
         return buffer;
     }