You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/21 00:38:59 UTC
svn commit: r901440 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra/net:
FileStreamTask.java IncomingTcpConnection.java MessagingService.java
Author: jbellis
Date: Wed Jan 20 23:38:58 2010
New Revision: 901440
URL: http://svn.apache.org/viewvc?rev=901440&view=rev
Log:
simplify header code
patch by jbellis; reviewed by Brandon Williams for CASSANDRA-705
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java?rev=901440&r1=901439&r2=901440&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java Wed Jan 20 23:38:58 2010
@@ -80,7 +80,7 @@
{
FileChannel fc = raf.getChannel();
- ByteBuffer buffer = MessagingService.constructStreamHeader(false, true);
+ ByteBuffer buffer = MessagingService.constructStreamHeader(false);
channel.write(buffer);
assert buffer.remaining() == 0;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=901440&r1=901439&r2=901440&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Wed Jan 20 23:38:58 2010
@@ -17,10 +17,6 @@
private static Logger logger = Logger.getLogger(IncomingTcpConnection.class);
private final DataInputStream input;
- private final byte[] protocolBytes = new byte[MessagingService.PROTOCOL_SIZE];
- private final byte[] headerBytes = new byte[4];
- private final byte[] sizeBytes = new byte[4];
- private final ByteBuffer sizeBuffer = ByteBuffer.wrap(sizeBytes).asReadOnlyBuffer();
private Socket socket;
public IncomingTcpConnection(Socket socket)
@@ -43,14 +39,11 @@
{
try
{
- input.readFully(protocolBytes);
- MessagingService.validateProtocol(protocolBytes);
-
- input.readFully(headerBytes);
- int pH = FBUtilities.byteArrayToInt(headerBytes);
- int type = MessagingService.getBits(pH, 1, 2);
- boolean isStream = MessagingService.getBits(pH, 3, 1) == 1;
- int version = MessagingService.getBits(pH, 15, 8);
+ MessagingService.validateMagic(input.readInt());
+ int header = input.readInt();
+ int type = MessagingService.getBits(header, 1, 2);
+ boolean isStream = MessagingService.getBits(header, 3, 1) == 1;
+ int version = MessagingService.getBits(header, 15, 8);
if (isStream)
{
@@ -58,10 +51,7 @@
}
else
{
- input.readFully(sizeBytes);
- int size = sizeBuffer.getInt();
- sizeBuffer.clear();
-
+ int size = input.readInt();
byte[] contentBytes = new byte[size];
input.readFully(contentBytes);
MessagingService.getDeserializationExecutor().submit(new MessageDeserializationTask(new ByteArrayInputStream(contentBytes)));
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=901440&r1=901439&r2=901440&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Wed Jan 20 23:38:58 2010
@@ -50,8 +50,8 @@
//TODO: make this parameter dynamic somehow. Not sure if config is appropriate.
private static SerializerType serializerType_ = SerializerType.BINARY;
- public static final int PROTOCOL_SIZE = 16;
- private static byte[] protocol_ = new byte[PROTOCOL_SIZE];
+ /** we preface every message with this number so the recipient can validate the sender is sane */
+ public static final int PROTOCOL_MAGIC = 0xCA552DFA;
/* Verb Handler for the Response */
public static final String responseVerbHandler_ = "RESPONSE";
@@ -107,7 +107,6 @@
streamExecutor_ = new JMXEnabledThreadPoolExecutor("MESSAGE-STREAMING-POOL");
- protocol_ = hash("MD5", "FB-MESSAGING".getBytes());
/* register the response verb handler */
registerVerbHandlers(MessagingService.responseVerbHandler_, new ResponseVerbHandler());
@@ -317,7 +316,7 @@
throw new RuntimeException(e);
}
assert data.length > 0;
- ByteBuffer buffer = packIt(data , false, false);
+ ByteBuffer buffer = packIt(data , false);
// write it
connection.write(buffer);
@@ -399,14 +398,9 @@
return messageDeserializerExecutor_;
}
- public static boolean isProtocolValid(byte[] protocol)
+ public static void validateMagic(int magic) throws IOException
{
- return isEqual(protocol_, protocol);
- }
-
- public static void validateProtocol(byte[] protocol) throws IOException
- {
- if (!isProtocolValid(protocol))
+ if (magic != PROTOCOL_MAGIC)
throw new IOException("invalid protocol header");
}
@@ -420,10 +414,9 @@
return x >>> (p + 1) - n & ~(-1 << n);
}
- public static ByteBuffer packIt(byte[] bytes, boolean compress, boolean stream)
+ public static ByteBuffer packIt(byte[] bytes, boolean compress)
{
- byte[] size = FBUtilities.toByteArray(bytes.length);
- /*
+ /*
Setting up the protocol header. This is 4 bytes long
represented as an integer. The first 2 bits indicate
the serializer type. The 3rd bit indicates if compression
@@ -433,32 +426,25 @@
The next 8 bits indicate a version number. Remaining 15 bits
are not used currently.
*/
- int n = 0;
+ int header = 0;
// Setting up the serializer bit
- n |= serializerType_.ordinal();
+ header |= serializerType_.ordinal();
// set compression bit.
- if ( compress )
- n |= 4;
-
- // set streaming bit
- if ( stream )
- n |= 8;
-
+ if (compress)
+ header |= 4;
// Setting up the version bit
- n |= (version_ << 8);
- /* Finished the protocol header setup */
+ header |= (version_ << 8);
- byte[] header = FBUtilities.toByteArray(n);
- ByteBuffer buffer = ByteBuffer.allocate(PROTOCOL_SIZE + header.length + size.length + bytes.length);
- buffer.put(protocol_);
- buffer.put(header);
- buffer.put(size);
+ ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + 4 + bytes.length);
+ buffer.putInt(PROTOCOL_MAGIC);
+ buffer.putInt(header);
+ buffer.putInt(bytes.length);
buffer.put(bytes);
buffer.flip();
return buffer;
}
- public static ByteBuffer constructStreamHeader(boolean compress, boolean stream)
+ public static ByteBuffer constructStreamHeader(boolean compress)
{
/*
Setting up the protocol header. This is 4 bytes long
@@ -470,25 +456,21 @@
The next 8 bits indicate a version number. Remaining 15 bits
are not used currently.
*/
- int n = 0;
+ int header = 0;
// Setting up the serializer bit
- n |= serializerType_.ordinal();
+ header |= serializerType_.ordinal();
// set compression bit.
if ( compress )
- n |= 4;
-
+ header |= 4;
// set streaming bit
- if ( stream )
- n |= 8;
-
- // Setting up the version bit
- n |= (version_ << 8);
+ header |= 8;
+ // Setting up the version bit
+ header |= (version_ << 8);
/* Finished the protocol header setup */
- byte[] header = FBUtilities.toByteArray(n);
- ByteBuffer buffer = ByteBuffer.allocate(16 + header.length);
- buffer.put(protocol_);
- buffer.put(header);
+ ByteBuffer buffer = ByteBuffer.allocate(4 + 4);
+ buffer.putInt(PROTOCOL_MAGIC);
+ buffer.putInt(header);
buffer.flip();
return buffer;
}