You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/07/30 17:30:27 UTC
svn commit: r799331 [20/29] - in /incubator/cassandra/trunk: ./
src/java/org/apache/cassandra/concurrent/
src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/gms/ src/...
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java Thu Jul 30 15:30:21 2009
@@ -1,182 +1,182 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net;
-
-import java.lang.reflect.Array;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.*;
-
-import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.log4j.Logger;
-import org.apache.cassandra.utils.*;
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class Message implements java.io.Serializable
-{
- static final long serialVersionUID = 6329198792470413221L;
- private static ICompactSerializer<Message> serializer_;
-
- static
- {
- serializer_ = new MessageSerializer();
- }
-
- public static ICompactSerializer<Message> serializer()
- {
- return serializer_;
- }
-
- Header header_;
- private byte[] body_;
-
- protected Message(String id, EndPoint from, String messageType, String verb, byte[] body)
- {
- this(new Header(id, from, messageType, verb), body);
- }
-
- protected Message(Header header, byte[] body)
- {
- header_ = header;
- body_ = body;
- }
-
- public Message(EndPoint from, String messageType, String verb, byte[] body)
- {
- this(new Header(from, messageType, verb), body);
- }
-
- public byte[] getHeader(Object key)
- {
- return header_.getDetail(key);
- }
-
- public void removeHeader(Object key)
- {
- header_.removeDetail(key);
- }
-
- public void setMessageType(String type)
- {
- header_.setMessageType(type);
- }
-
- public void setMessageVerb(String verb)
- {
- header_.setMessageVerb(verb);
- }
-
- public void addHeader(String key, byte[] value)
- {
- header_.addDetail(key, value);
- }
-
- public Map<String, byte[]> getHeaders()
- {
- return header_.getDetails();
- }
-
- public byte[] getMessageBody()
- {
- return body_;
- }
-
- public void setMessageBody(byte[] body)
- {
- body_ = body;
- }
-
- public EndPoint getFrom()
- {
- return header_.getFrom();
- }
-
- public String getMessageType()
- {
- return header_.getMessageType();
- }
-
- public String getVerb()
- {
- return header_.getVerb();
- }
-
- public String getMessageId()
- {
- return header_.getMessageId();
- }
-
- void setMessageId(String id)
- {
- header_.setMessageId(id);
- }
-
- public Message getReply(EndPoint from, byte[] args)
- {
- Message response = new Message(getMessageId(),
- from,
- MessagingService.responseStage_,
- MessagingService.responseVerbHandler_,
- args);
- return response;
- }
-
- public String toString()
- {
- StringBuilder sbuf = new StringBuilder("");
- String separator = System.getProperty("line.separator");
- sbuf.append("ID:" + getMessageId())
- .append(separator)
- .append("FROM:" + getFrom())
- .append(separator)
- .append("TYPE:" + getMessageType())
- .append(separator)
- .append("VERB:" + getVerb())
- .append(separator);
- return sbuf.toString();
- }
-}
-
-class MessageSerializer implements ICompactSerializer<Message>
-{
- public void serialize(Message t, DataOutputStream dos) throws IOException
- {
- Header.serializer().serialize( t.header_, dos);
- byte[] bytes = t.getMessageBody();
- dos.writeInt(bytes.length);
- dos.write(bytes);
- }
-
- public Message deserialize(DataInputStream dis) throws IOException
- {
- Header header = Header.serializer().deserialize(dis);
- int size = dis.readInt();
- byte[] bytes = new byte[size];
- dis.readFully(bytes);
- // return new Message(header.getMessageId(), header.getFrom(), header.getMessageType(), header.getVerb(), new Object[]{bytes});
- return new Message(header, bytes);
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.lang.reflect.Array;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.*;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.utils.*;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class Message implements java.io.Serializable
+{
+ static final long serialVersionUID = 6329198792470413221L;
+ private static ICompactSerializer<Message> serializer_;
+
+ static
+ {
+ serializer_ = new MessageSerializer();
+ }
+
+ public static ICompactSerializer<Message> serializer()
+ {
+ return serializer_;
+ }
+
+ Header header_;
+ private byte[] body_;
+
+ protected Message(String id, EndPoint from, String messageType, String verb, byte[] body)
+ {
+ this(new Header(id, from, messageType, verb), body);
+ }
+
+ protected Message(Header header, byte[] body)
+ {
+ header_ = header;
+ body_ = body;
+ }
+
+ public Message(EndPoint from, String messageType, String verb, byte[] body)
+ {
+ this(new Header(from, messageType, verb), body);
+ }
+
+ public byte[] getHeader(Object key)
+ {
+ return header_.getDetail(key);
+ }
+
+ public void removeHeader(Object key)
+ {
+ header_.removeDetail(key);
+ }
+
+ public void setMessageType(String type)
+ {
+ header_.setMessageType(type);
+ }
+
+ public void setMessageVerb(String verb)
+ {
+ header_.setMessageVerb(verb);
+ }
+
+ public void addHeader(String key, byte[] value)
+ {
+ header_.addDetail(key, value);
+ }
+
+ public Map<String, byte[]> getHeaders()
+ {
+ return header_.getDetails();
+ }
+
+ public byte[] getMessageBody()
+ {
+ return body_;
+ }
+
+ public void setMessageBody(byte[] body)
+ {
+ body_ = body;
+ }
+
+ public EndPoint getFrom()
+ {
+ return header_.getFrom();
+ }
+
+ public String getMessageType()
+ {
+ return header_.getMessageType();
+ }
+
+ public String getVerb()
+ {
+ return header_.getVerb();
+ }
+
+ public String getMessageId()
+ {
+ return header_.getMessageId();
+ }
+
+ void setMessageId(String id)
+ {
+ header_.setMessageId(id);
+ }
+
+ public Message getReply(EndPoint from, byte[] args)
+ {
+ Message response = new Message(getMessageId(),
+ from,
+ MessagingService.responseStage_,
+ MessagingService.responseVerbHandler_,
+ args);
+ return response;
+ }
+
+ public String toString()
+ {
+ StringBuilder sbuf = new StringBuilder("");
+ String separator = System.getProperty("line.separator");
+ sbuf.append("ID:" + getMessageId())
+ .append(separator)
+ .append("FROM:" + getFrom())
+ .append(separator)
+ .append("TYPE:" + getMessageType())
+ .append(separator)
+ .append("VERB:" + getVerb())
+ .append(separator);
+ return sbuf.toString();
+ }
+}
+
+class MessageSerializer implements ICompactSerializer<Message>
+{
+ public void serialize(Message t, DataOutputStream dos) throws IOException
+ {
+ Header.serializer().serialize( t.header_, dos);
+ byte[] bytes = t.getMessageBody();
+ dos.writeInt(bytes.length);
+ dos.write(bytes);
+ }
+
+ public Message deserialize(DataInputStream dis) throws IOException
+ {
+ Header header = Header.serializer().deserialize(dis);
+ int size = dis.readInt();
+ byte[] bytes = new byte[size];
+ dis.readFully(bytes);
+ // return new Message(header.getMessageId(), header.getFrom(), header.getMessageType(), header.getVerb(), new Object[]{bytes});
+ return new Message(header, bytes);
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java Thu Jul 30 15:30:21 2009
@@ -1,47 +1,47 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net;
-
-import org.apache.log4j.Logger;
-
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class MessageDeliveryTask implements Runnable
-{
- private Message message_;
- private static Logger logger_ = Logger.getLogger(MessageDeliveryTask.class);
-
- public MessageDeliveryTask(Message message)
- {
- message_ = message;
- }
-
- public void run()
- {
- String verb = message_.getVerb();
- IVerbHandler verbHandler = MessagingService.getMessagingInstance().getVerbHandler(verb);
- if ( verbHandler != null )
- {
- verbHandler.doVerb(message_);
- }
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import org.apache.log4j.Logger;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class MessageDeliveryTask implements Runnable
+{
+ private Message message_;
+ private static Logger logger_ = Logger.getLogger(MessageDeliveryTask.class);
+
+ public MessageDeliveryTask(Message message)
+ {
+ message_ = message;
+ }
+
+ public void run()
+ {
+ String verb = message_.getVerb();
+ IVerbHandler verbHandler = MessagingService.getMessagingInstance().getVerbHandler(verb);
+ if ( verbHandler != null )
+ {
+ verbHandler.doVerb(message_);
+ }
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeserializationTask.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeserializationTask.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeserializationTask.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeserializationTask.java Thu Jul 30 15:30:21 2009
@@ -1,68 +1,68 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net;
-
-import java.io.IOException;
-
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
-import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.net.io.FastSerializer;
-import org.apache.cassandra.net.io.ISerializer;
-import org.apache.cassandra.net.sink.SinkManager;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-class MessageDeserializationTask implements Runnable
-{
- private static Logger logger_ = Logger.getLogger(MessageDeserializationTask.class);
- private static ISerializer serializer_ = new FastSerializer();
- private int serializerType_;
- private byte[] bytes_ = new byte[0];
-
- MessageDeserializationTask(int serializerType, byte[] bytes)
- {
- serializerType_ = serializerType;
- bytes_ = bytes;
- }
-
- public void run()
- {
- Message message = null;
- try
- {
- message = serializer_.deserialize(bytes_);
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
-
- if ( message != null )
- {
- message = SinkManager.processServerMessageSink(message);
- MessagingService.receive(message);
- }
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.net.io.FastSerializer;
+import org.apache.cassandra.net.io.ISerializer;
+import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class MessageDeserializationTask implements Runnable
+{
+ private static Logger logger_ = Logger.getLogger(MessageDeserializationTask.class);
+ private static ISerializer serializer_ = new FastSerializer();
+ private int serializerType_;
+ private byte[] bytes_ = new byte[0];
+
+ MessageDeserializationTask(int serializerType, byte[] bytes)
+ {
+ serializerType_ = serializerType;
+ bytes_ = bytes;
+ }
+
+ public void run()
+ {
+ Message message = null;
+ try
+ {
+ message = serializer_.deserialize(bytes_);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ if ( message != null )
+ {
+ message = SinkManager.processServerMessageSink(message);
+ MessagingService.receive(message);
+ }
+ }
+
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageSerializationTask.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageSerializationTask.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageSerializationTask.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageSerializationTask.java Thu Jul 30 15:30:21 2009
@@ -1,101 +1,101 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net;
-
-import java.io.IOException;
-import java.net.SocketException;
-
-import org.apache.cassandra.concurrent.Context;
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
-import org.apache.cassandra.concurrent.ThreadLocalContext;
-import org.apache.cassandra.net.sink.SinkManager;
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
-
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-class MessageSerializationTask implements Runnable
-{
- private static Logger logger_ = Logger.getLogger(MessageSerializationTask.class);
- private Message message_;
- private EndPoint to_;
-
- public MessageSerializationTask(Message message, EndPoint to)
- {
- message_ = message;
- to_ = to;
- }
-
- public Message getMessage()
- {
- return message_;
- }
-
- public void run()
- {
- /* Adding the message to be serialized in the TLS. For accessing in the afterExecute() */
- Context ctx = new Context();
- ctx.put(this.getClass().getName(), message_);
- ThreadLocalContext.put(ctx);
-
- TcpConnection connection = null;
- try
- {
- Message message = SinkManager.processClientMessageSink(message_);
- if(null == message)
- return;
- connection = MessagingService.getConnection(message_.getFrom(), to_);
- connection.write(message);
- }
- catch ( SocketException se )
- {
- // Shutting down the entire pool. May be too conservative an approach.
- MessagingService.getConnectionPool(message_.getFrom(), to_).shutdown();
- logger_.warn(LogUtil.throwableToString(se));
- }
- catch ( IOException e )
- {
- logConnectAndIOException(e, connection);
- }
- catch (Throwable th)
- {
- logger_.warn(LogUtil.throwableToString(th));
- }
- finally
- {
- if ( connection != null )
- {
- connection.close();
- }
- }
- }
-
- private void logConnectAndIOException(IOException ex, TcpConnection connection)
- {
- if ( connection != null )
- {
- connection.errorClose();
- }
- logger_.warn(LogUtil.throwableToString(ex));
- }
-}
-
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+import java.net.SocketException;
+
+import org.apache.cassandra.concurrent.Context;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.ThreadLocalContext;
+import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class MessageSerializationTask implements Runnable
+{
+ private static Logger logger_ = Logger.getLogger(MessageSerializationTask.class);
+ private Message message_;
+ private EndPoint to_;
+
+ public MessageSerializationTask(Message message, EndPoint to)
+ {
+ message_ = message;
+ to_ = to;
+ }
+
+ public Message getMessage()
+ {
+ return message_;
+ }
+
+ public void run()
+ {
+ /* Adding the message to be serialized in the TLS. For accessing in the afterExecute() */
+ Context ctx = new Context();
+ ctx.put(this.getClass().getName(), message_);
+ ThreadLocalContext.put(ctx);
+
+ TcpConnection connection = null;
+ try
+ {
+ Message message = SinkManager.processClientMessageSink(message_);
+ if(null == message)
+ return;
+ connection = MessagingService.getConnection(message_.getFrom(), to_);
+ connection.write(message);
+ }
+ catch ( SocketException se )
+ {
+ // Shutting down the entire pool. May be too conservative an approach.
+ MessagingService.getConnectionPool(message_.getFrom(), to_).shutdown();
+ logger_.warn(LogUtil.throwableToString(se));
+ }
+ catch ( IOException e )
+ {
+ logConnectAndIOException(e, connection);
+ }
+ catch (Throwable th)
+ {
+ logger_.warn(LogUtil.throwableToString(th));
+ }
+ finally
+ {
+ if ( connection != null )
+ {
+ connection.close();
+ }
+ }
+ }
+
+ private void logConnectAndIOException(IOException ex, TcpConnection connection)
+ {
+ if ( connection != null )
+ {
+ connection.errorClose();
+ }
+ logger_.warn(LogUtil.throwableToString(ex));
+ }
+}
+
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingConfig.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingConfig.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingConfig.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingConfig.java Thu Jul 30 15:30:21 2009
@@ -1,96 +1,96 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class MessagingConfig
-{
- // The expected time for one message round trip. It does not reflect message processing
- // time at the receiver.
- private static int expectedRoundTripTime_ = 400;
- private static int numberOfPorts_ = 2;
- private static int threadCount_ = 4;
-
- public static int getMessagingThreadCount()
- {
- return threadCount_;
- }
-
- public static void setMessagingThreadCount(int threadCount)
- {
- threadCount_ = threadCount;
- }
-
- public static void setExpectedRoundTripTime(int roundTripTimeMillis) {
- if(roundTripTimeMillis > 0 )
- expectedRoundTripTime_ = roundTripTimeMillis;
- }
-
- public static int getExpectedRoundTripTime()
- {
- return expectedRoundTripTime_;
- }
-
- public static int getConnectionPoolInitialSize()
- {
- return ConnectionPoolConfiguration.initialSize_;
- }
-
- public static int getConnectionPoolGrowthFactor()
- {
- return ConnectionPoolConfiguration.growthFactor_;
- }
-
- public static int getConnectionPoolMaxSize()
- {
- return ConnectionPoolConfiguration.maxSize_;
- }
-
- public static int getConnectionPoolWaitTimeout()
- {
- return ConnectionPoolConfiguration.waitTimeout_;
- }
-
- public static int getConnectionPoolMonitorInterval()
- {
- return ConnectionPoolConfiguration.monitorInterval_;
- }
-
- public static void setNumberOfPorts(int n)
- {
- numberOfPorts_ = n;
- }
-
- public static int getNumberOfPorts()
- {
- return numberOfPorts_;
- }
-}
-
-class ConnectionPoolConfiguration
-{
- public static int initialSize_ = 1;
- public static int growthFactor_ = 1;
- public static int maxSize_ = 1;
- public static int waitTimeout_ = 10;
- public static int monitorInterval_ = 300;
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class MessagingConfig
+{
+ // The expected time for one message round trip. It does not reflect message processing
+ // time at the receiver.
+ private static int expectedRoundTripTime_ = 400;
+ private static int numberOfPorts_ = 2;
+ private static int threadCount_ = 4;
+
+ public static int getMessagingThreadCount()
+ {
+ return threadCount_;
+ }
+
+ public static void setMessagingThreadCount(int threadCount)
+ {
+ threadCount_ = threadCount;
+ }
+
+ public static void setExpectedRoundTripTime(int roundTripTimeMillis) {
+ if(roundTripTimeMillis > 0 )
+ expectedRoundTripTime_ = roundTripTimeMillis;
+ }
+
+ public static int getExpectedRoundTripTime()
+ {
+ return expectedRoundTripTime_;
+ }
+
+ public static int getConnectionPoolInitialSize()
+ {
+ return ConnectionPoolConfiguration.initialSize_;
+ }
+
+ public static int getConnectionPoolGrowthFactor()
+ {
+ return ConnectionPoolConfiguration.growthFactor_;
+ }
+
+ public static int getConnectionPoolMaxSize()
+ {
+ return ConnectionPoolConfiguration.maxSize_;
+ }
+
+ public static int getConnectionPoolWaitTimeout()
+ {
+ return ConnectionPoolConfiguration.waitTimeout_;
+ }
+
+ public static int getConnectionPoolMonitorInterval()
+ {
+ return ConnectionPoolConfiguration.monitorInterval_;
+ }
+
+ public static void setNumberOfPorts(int n)
+ {
+ numberOfPorts_ = n;
+ }
+
+ public static int getNumberOfPorts()
+ {
+ return numberOfPorts_;
+ }
+}
+
+class ConnectionPoolConfiguration
+{
+ public static int initialSize_ = 1;
+ public static int growthFactor_ = 1;
+ public static int maxSize_ = 1;
+ public static int waitTimeout_ = 10;
+ public static int monitorInterval_ = 300;
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Thu Jul 30 15:30:21 2009
@@ -1,759 +1,759 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net;
-
-import org.apache.cassandra.concurrent.*;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.net.io.SerializerType;
-import org.apache.cassandra.utils.*;
-import org.apache.log4j.Logger;
-
-import java.io.IOException;
-import java.net.MulticastSocket;
-import java.net.ServerSocket;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.ServerSocketChannel;
-import java.security.MessageDigest;
-import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class MessagingService implements IMessagingService
-{
- private static boolean debugOn_ = false;
-
- private static int version_ = 1;
- //TODO: make this parameter dynamic somehow. Not sure if config is appropriate.
- private static SerializerType serializerType_ = SerializerType.BINARY;
-
- private static byte[] protocol_ = new byte[16];
- /* Verb Handler for the Response */
- public static final String responseVerbHandler_ = "RESPONSE";
- /* Stage for responses. */
- public static final String responseStage_ = "RESPONSE-STAGE";
- private enum ReservedVerbs_ {RESPONSE};
-
- private static Map<String, String> reservedVerbs_ = new Hashtable<String, String>();
- /* Indicate if we are currently streaming data to another node or receiving streaming data */
- private static AtomicBoolean isStreaming_ = new AtomicBoolean(false);
-
- /* This records all the results mapped by message Id */
- private static ICachetable<String, IAsyncCallback> callbackMap_;
- private static ICachetable<String, IAsyncResult> taskCompletionMap_;
-
- /* Manages the table of endpoints it is listening on */
- private static Set<EndPoint> endPoints_;
-
- /* List of sockets we are listening on */
- private static Map<EndPoint, SelectionKey> listenSockets_ = new HashMap<EndPoint, SelectionKey>();
-
- /* Lookup table for registering message handlers based on the verb. */
- private static Map<String, IVerbHandler> verbHandlers_;
-
- private static Map<String, MulticastSocket> mCastMembership_ = new HashMap<String, MulticastSocket>();
-
- /* Thread pool to handle messaging read activities of Socket and default stage */
- private static ExecutorService messageDeserializationExecutor_;
-
- /* Thread pool to handle messaging write activities */
- private static ExecutorService messageSerializerExecutor_;
-
- /* Thread pool to handle deserialization of messages read from the socket. */
- private static ExecutorService messageDeserializerExecutor_;
-
- /* Thread pool to handle messaging write activities */
- private static ExecutorService streamExecutor_;
-
- private final static ReentrantLock lock_ = new ReentrantLock();
- private static Map<String, TcpConnectionManager> poolTable_ = new Hashtable<String, TcpConnectionManager>();
-
- private static boolean bShutdown_ = false;
-
- private static Logger logger_ = Logger.getLogger(MessagingService.class);
-
- private static IMessagingService messagingService_ = new MessagingService();
-
- public static boolean isDebugOn()
- {
- return debugOn_;
- }
-
- public static void debugOn(boolean on)
- {
- debugOn_ = on;
- }
-
- public static SerializerType getSerializerType()
- {
- return serializerType_;
- }
-
- public synchronized static void serializerType(String type)
- {
- if ( type.equalsIgnoreCase("binary") )
- {
- serializerType_ = SerializerType.BINARY;
- }
- else if ( type.equalsIgnoreCase("java") )
- {
- serializerType_ = SerializerType.JAVA;
- }
- else if ( type.equalsIgnoreCase("xml") )
- {
- serializerType_ = SerializerType.XML;
- }
- }
-
- public static int getVersion()
- {
- return version_;
- }
-
- public static void setVersion(int version)
- {
- version_ = version;
- }
-
- public static IMessagingService getMessagingInstance()
- {
- if ( bShutdown_ )
- {
- lock_.lock();
- try
- {
- if ( bShutdown_ )
- {
- messagingService_ = new MessagingService();
- bShutdown_ = false;
- }
- }
- finally
- {
- lock_.unlock();
- }
- }
- return messagingService_;
- }
-
- public Object clone() throws CloneNotSupportedException
- {
- //Prevents the singleton from being cloned
- throw new CloneNotSupportedException();
- }
-
- protected MessagingService()
- {
- for ( ReservedVerbs_ verbs : ReservedVerbs_.values() )
- {
- reservedVerbs_.put(verbs.toString(), verbs.toString());
- }
- verbHandlers_ = new HashMap<String, IVerbHandler>();
- endPoints_ = new HashSet<EndPoint>();
- /*
- * Leave callbacks in the cachetable long enough that any related messages will arrive
- * before the callback is evicted from the table. The concurrency level is set at 128
- * which is the sum of the threads in the pool that adds shit into the table and the
- * pool that retrives the callback from here.
- */
- int maxSize = MessagingConfig.getMessagingThreadCount();
- callbackMap_ = new Cachetable<String, IAsyncCallback>( 2 * DatabaseDescriptor.getRpcTimeout() );
- taskCompletionMap_ = new Cachetable<String, IAsyncResult>( 2 * DatabaseDescriptor.getRpcTimeout() );
-
- messageDeserializationExecutor_ = new DebuggableThreadPoolExecutor( maxSize,
- maxSize,
- Integer.MAX_VALUE,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(),
- new ThreadFactoryImpl("MESSAGING-SERVICE-POOL")
- );
-
- messageSerializerExecutor_ = new DebuggableThreadPoolExecutor( maxSize,
- maxSize,
- Integer.MAX_VALUE,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(),
- new ThreadFactoryImpl("MESSAGE-SERIALIZER-POOL")
- );
-
- messageDeserializerExecutor_ = new DebuggableThreadPoolExecutor( maxSize,
- maxSize,
- Integer.MAX_VALUE,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(),
- new ThreadFactoryImpl("MESSAGE-DESERIALIZER-POOL")
- );
-
- streamExecutor_ = new DebuggableThreadPoolExecutor("MESSAGE-STREAMING-POOL");
-
- protocol_ = hash(HashingSchemes.MD5, "FB-MESSAGING".getBytes());
- /* register the response verb handler */
- registerVerbHandlers(MessagingService.responseVerbHandler_, new ResponseVerbHandler());
- /* register stage for response */
- StageManager.registerStage(MessagingService.responseStage_, new MultiThreadedStage("RESPONSE-STAGE", maxSize) );
- }
-
- public byte[] hash(String type, byte data[])
- {
- byte result[] = null;
- try
- {
- MessageDigest messageDigest = MessageDigest.getInstance(type);
- result = messageDigest.digest(data);
- }
- catch(Exception e)
- {
- if (logger_.isDebugEnabled())
- logger_.debug(LogUtil.throwableToString(e));
- }
- return result;
- }
-
- public void listen(EndPoint localEp) throws IOException
- {
- ServerSocketChannel serverChannel = ServerSocketChannel.open();
- ServerSocket ss = serverChannel.socket();
- ss.bind(localEp.getInetAddress());
- serverChannel.configureBlocking(false);
-
- SelectionKeyHandler handler = new TcpConnectionHandler(localEp);
-
- SelectionKey key = SelectorManager.getSelectorManager().register(serverChannel, handler, SelectionKey.OP_ACCEPT);
- endPoints_.add(localEp);
- listenSockets_.put(localEp, key);
- }
-
- public void listenUDP(EndPoint localEp)
- {
- UdpConnection connection = new UdpConnection();
- if (logger_.isDebugEnabled())
- logger_.debug("Starting to listen on " + localEp);
- try
- {
- connection.init(localEp.getPort());
- endPoints_.add(localEp);
- }
- catch ( IOException e )
- {
- logger_.warn(LogUtil.throwableToString(e));
- }
- }
-
- public static TcpConnectionManager getConnectionPool(EndPoint from, EndPoint to)
- {
- String key = from + ":" + to;
- TcpConnectionManager cp = poolTable_.get(key);
- if( cp == null )
- {
- lock_.lock();
- try
- {
- cp = poolTable_.get(key);
- if (cp == null )
- {
- cp = new TcpConnectionManager(MessagingConfig.getConnectionPoolInitialSize(),
- MessagingConfig.getConnectionPoolGrowthFactor(),
- MessagingConfig.getConnectionPoolMaxSize(), from, to);
- poolTable_.put(key, cp);
- }
- }
- finally
- {
- lock_.unlock();
- }
- }
- return cp;
- }
-
- public static ConnectionStatistics[] getPoolStatistics()
- {
- Set<ConnectionStatistics> stats = new HashSet<ConnectionStatistics>();
- Iterator<TcpConnectionManager> it = poolTable_.values().iterator();
- while ( it.hasNext() )
- {
- TcpConnectionManager cp = it.next();
- ConnectionStatistics cs = new ConnectionStatistics(cp.getLocalEndPoint(), cp.getRemoteEndPoint(), cp.getPoolSize(), cp.getConnectionsInUse());
- stats.add( cs );
- }
- return stats.toArray(new ConnectionStatistics[0]);
- }
-
- public static TcpConnection getConnection(EndPoint from, EndPoint to) throws IOException
- {
- return getConnectionPool(from, to).getConnection();
- }
-
- private void checkForReservedVerb(String type)
- {
- if ( reservedVerbs_.get(type) != null && verbHandlers_.get(type) != null )
- {
- throw new IllegalArgumentException( type + " is a reserved verb handler. Scram!");
- }
- }
-
- public void registerVerbHandlers(String type, IVerbHandler verbHandler)
- {
- checkForReservedVerb(type);
- verbHandlers_.put(type, verbHandler);
- }
-
- public void deregisterAllVerbHandlers(EndPoint localEndPoint)
- {
- Iterator keys = verbHandlers_.keySet().iterator();
- String key = null;
-
- /*
- * endpoint specific verbhandlers can be distinguished because
- * their key's contain the name of the endpoint.
- */
- while(keys.hasNext())
- {
- key = (String)keys.next();
- if (key.contains(localEndPoint.toString()))
- keys.remove();
- }
- }
-
- public void deregisterVerbHandlers(String type)
- {
- verbHandlers_.remove(type);
- }
-
- public IVerbHandler getVerbHandler(String type)
- {
- IVerbHandler handler = (IVerbHandler)verbHandlers_.get(type);
- return handler;
- }
-
- public String sendRR(Message message, EndPoint[] to, IAsyncCallback cb)
- {
- String messageId = message.getMessageId();
- callbackMap_.put(messageId, cb);
- for ( int i = 0; i < to.length; ++i )
- {
- sendOneWay(message, to[i]);
- }
- return messageId;
- }
-
- public String sendRR(Message message, EndPoint to, IAsyncCallback cb)
- {
- String messageId = message.getMessageId();
- callbackMap_.put(messageId, cb);
- sendOneWay(message, to);
- return messageId;
- }
-
- public String sendRR(Message[] messages, EndPoint[] to, IAsyncCallback cb)
- {
- if ( messages.length != to.length )
- {
- throw new IllegalArgumentException("Number of messages and the number of endpoints need to be same.");
- }
- String groupId = GuidGenerator.guid();
- callbackMap_.put(groupId, cb);
- for ( int i = 0; i < messages.length; ++i )
- {
- messages[i].setMessageId(groupId);
- sendOneWay(messages[i], to[i]);
- }
- return groupId;
- }
-
- public IAsyncResult sendRR(Message[] messages, EndPoint[] to)
- {
- if ( messages.length != to.length )
- {
- throw new IllegalArgumentException("Number of messages and the number of endpoints need to be same.");
- }
-
- IAsyncResult iar = new MultiAsyncResult(messages.length);
- String groupId = GuidGenerator.guid();
- taskCompletionMap_.put(groupId, iar);
- for ( int i = 0; i < messages.length; ++i )
- {
- messages[i].setMessageId(groupId);
- sendOneWay(messages[i], to[i]);
- }
-
- return iar;
- }
-
- public String sendRR(Message[][] messages, EndPoint[][] to, IAsyncCallback cb)
- {
- if ( messages.length != to.length )
- {
- throw new IllegalArgumentException("Number of messages and the number of endpoints need to be same.");
- }
-
- int length = messages.length;
- String[] gids = new String[length];
- /* Generate the requisite GUID's */
- for ( int i = 0; i < length; ++i )
- {
- gids[i] = GuidGenerator.guid();
- }
- /* attach this context to the callback */
- cb.attachContext(gids);
- for ( int i = 0; i < length; ++i )
- {
- callbackMap_.put(gids[i], cb);
- for ( int j = 0; j < messages[i].length; ++j )
- {
- messages[i][j].setMessageId(gids[i]);
- sendOneWay(messages[i][j], to[i][j]);
- }
- }
- return gids[0];
- }
-
- /*
- Use this version for fire and forget style messaging.
- */
- public void sendOneWay(Message message, EndPoint to)
- {
- // do local deliveries
- if ( message.getFrom().equals(to) )
- {
- MessagingService.receive(message);
- return;
- }
-
- Runnable tcpWriteEvent = new MessageSerializationTask(message, to);
- messageSerializerExecutor_.execute(tcpWriteEvent);
- }
-
- public IAsyncResult sendRR(Message message, EndPoint to)
- {
- IAsyncResult iar = new AsyncResult();
- taskCompletionMap_.put(message.getMessageId(), iar);
- sendOneWay(message, to);
- return iar;
- }
-
- public void sendUdpOneWay(Message message, EndPoint to)
- {
- EndPoint from = message.getFrom();
- if (message.getFrom().equals(to)) {
- MessagingService.receive(message);
- return;
- }
-
- UdpConnection connection = null;
- try
- {
- connection = new UdpConnection();
- connection.init();
- connection.write(message, to);
- }
- catch ( IOException e )
- {
- logger_.warn(LogUtil.throwableToString(e));
- }
- finally
- {
- if ( connection != null )
- connection.close();
- }
- }
-
- public void stream(String file, long startPosition, long total, EndPoint from, EndPoint to)
- {
- isStreaming_.set(true);
- /* Streaming asynchronously on streamExector_ threads. */
- Runnable streamingTask = new FileStreamTask(file, startPosition, total, from, to);
- streamExecutor_.execute(streamingTask);
- }
-
- /*
- * Does the application determine if we are currently streaming data.
- * This would imply either streaming to a receiver, receiving streamed
- * data or both.
- */
- public static boolean isStreaming()
- {
- return isStreaming_.get();
- }
-
- public static void setStreamingMode(boolean bVal)
- {
- isStreaming_.set(bVal);
- }
-
- public static void shutdown()
- {
- logger_.info("Shutting down ...");
- synchronized ( MessagingService.class )
- {
- /* Stop listening on any socket */
- for( SelectionKey skey : listenSockets_.values() )
- {
- skey.cancel();
- try
- {
- skey.channel().close();
- }
- catch (IOException e) {}
- }
- listenSockets_.clear();
-
- /* Shutdown the threads in the EventQueue's */
- messageDeserializationExecutor_.shutdownNow();
- messageSerializerExecutor_.shutdownNow();
- messageDeserializerExecutor_.shutdownNow();
- streamExecutor_.shutdownNow();
-
- /* shut down the cachetables */
- taskCompletionMap_.shutdown();
- callbackMap_.shutdown();
-
- /* Interrupt the selector manager thread */
- SelectorManager.getSelectorManager().interrupt();
-
- poolTable_.clear();
- verbHandlers_.clear();
- bShutdown_ = true;
- }
- if (logger_.isDebugEnabled())
- logger_.debug("Shutdown invocation complete.");
- }
-
- public static void receive(Message message)
- {
- enqueueRunnable(message.getMessageType(), new MessageDeliveryTask(message));
- }
-
- public static boolean isLocalEndPoint(EndPoint ep)
- {
- return ( endPoints_.contains(ep) );
- }
-
- private static void enqueueRunnable(String stageName, Runnable runnable){
-
- IStage stage = StageManager.getStage(stageName);
-
- if ( stage != null )
- {
- stage.execute(runnable);
- }
- else
- {
- logger_.info("Running on default stage - beware");
- messageSerializerExecutor_.execute(runnable);
- }
- }
-
- public static IAsyncCallback getRegisteredCallback(String key)
- {
- return callbackMap_.get(key);
- }
-
- public static void removeRegisteredCallback(String key)
- {
- callbackMap_.remove(key);
- }
-
- public static IAsyncResult getAsyncResult(String key)
- {
- return taskCompletionMap_.remove(key);
- }
-
- public static void removeAsyncResult(String key)
- {
- taskCompletionMap_.remove(key);
- }
-
- public static byte[] getProtocol()
- {
- return protocol_;
- }
-
- public static ExecutorService getReadExecutor()
- {
- return messageDeserializationExecutor_;
- }
-
- public static ExecutorService getWriteExecutor()
- {
- return messageSerializerExecutor_;
- }
-
- public static ExecutorService getDeserilizationExecutor()
- {
- return messageDeserializerExecutor_;
- }
-
- public static boolean isProtocolValid(byte[] protocol)
- {
- return isEqual(protocol_, protocol);
- }
-
- public static boolean isEqual(byte digestA[], byte digestB[])
- {
- return MessageDigest.isEqual(digestA, digestB);
- }
-
- public static byte[] toByteArray(int i)
- {
- byte bytes[] = new byte[4];
- bytes[0] = (byte)(i >>> 24 & 0xff);
- bytes[1] = (byte)(i >>> 16 & 0xff);
- bytes[2] = (byte)(i >>> 8 & 0xff);
- bytes[3] = (byte)(i & 0xff);
- return bytes;
- }
-
- public static byte[] toByteArray(short s)
- {
- byte bytes[] = new byte[2];
- bytes[0] = (byte)(s >>> 8 & 0xff);
- bytes[1] = (byte)(s & 0xff);
- return bytes;
- }
-
- public static short byteArrayToShort(byte bytes[])
- {
- return byteArrayToShort(bytes, 0);
- }
-
- public static short byteArrayToShort(byte bytes[], int offset)
- {
- if(bytes.length - offset < 2)
- throw new IllegalArgumentException("A short must be 2 bytes in size.");
- short n = 0;
- for(int i = 0; i < 2; i++)
- {
- n <<= 8;
- n |= bytes[offset + i] & 0xff;
- }
-
- return n;
- }
-
- public static int getBits(int x, int p, int n)
- {
- return x >>> (p + 1) - n & ~(-1 << n);
- }
-
- public static int byteArrayToInt(byte bytes[])
- {
- return byteArrayToInt(bytes, 0);
- }
-
- public static int byteArrayToInt(byte bytes[], int offset)
- {
- if(bytes.length - offset < 4)
- throw new IllegalArgumentException("An integer must be 4 bytes in size.");
- int n = 0;
- for(int i = 0; i < 4; i++)
- {
- n <<= 8;
- n |= bytes[offset + i] & 0xff;
- }
-
- return n;
- }
-
- public static ByteBuffer packIt(byte[] bytes, boolean compress, boolean stream, boolean listening)
- {
- byte[] size = toByteArray(bytes.length);
- /*
- Setting up the protocol header. This is 4 bytes long
- represented as an integer. The first 2 bits indicate
- the serializer type. The 3rd bit indicates if compression
- is turned on or off. It is turned off by default. The 4th
- bit indicates if we are in streaming mode. It is turned off
- by default. The 5th bit is used to indicate that the sender
- is not listening on any well defined port. This implies the
- receiver needs to cache the connection using the port on the
- socket. The following 3 bits are reserved for future use.
- The next 8 bits indicate a version number. Remaining 15 bits
- are not used currently.
- */
- int n = 0;
- // Setting up the serializer bit
- n |= serializerType_.ordinal();
- // set compression bit.
- if ( compress )
- n |= 4;
-
- // set streaming bit
- if ( stream )
- n |= 8;
-
- // set listening 5th bit
- if ( listening )
- n |= 16;
-
- // Setting up the version bit
- n |= (version_ << 8);
- /* Finished the protocol header setup */
-
- byte[] header = toByteArray(n);
- ByteBuffer buffer = ByteBuffer.allocate(16 + header.length + size.length + bytes.length);
- buffer.put(protocol_);
- buffer.put(header);
- buffer.put(size);
- buffer.put(bytes);
- buffer.flip();
- return buffer;
- }
-
- public static ByteBuffer constructStreamHeader(boolean compress, boolean stream)
- {
- /*
- Setting up the protocol header. This is 4 bytes long
- represented as an integer. The first 2 bits indicate
- the serializer type. The 3rd bit indicates if compression
- is turned on or off. It is turned off by default. The 4th
- bit indicates if we are in streaming mode. It is turned off
- by default. The following 4 bits are reserved for future use.
- The next 8 bits indicate a version number. Remaining 15 bits
- are not used currently.
- */
- int n = 0;
- // Setting up the serializer bit
- n |= serializerType_.ordinal();
- // set compression bit.
- if ( compress )
- n |= 4;
-
- // set streaming bit
- if ( stream )
- n |= 8;
-
- // Setting up the version bit
- n |= (version_ << 8);
- /* Finished the protocol header setup */
-
- byte[] header = toByteArray(n);
- ByteBuffer buffer = ByteBuffer.allocate(16 + header.length);
- buffer.put(protocol_);
- buffer.put(header);
- buffer.flip();
- return buffer;
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import org.apache.cassandra.concurrent.*;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.io.SerializerType;
+import org.apache.cassandra.utils.*;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.net.MulticastSocket;
+import java.net.ServerSocket;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.ServerSocketChannel;
+import java.security.MessageDigest;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class MessagingService implements IMessagingService
+{
+ private static boolean debugOn_ = false;
+
+ private static int version_ = 1;
+ //TODO: make this parameter dynamic somehow. Not sure if config is appropriate.
+ private static SerializerType serializerType_ = SerializerType.BINARY;
+
+ private static byte[] protocol_ = new byte[16];
+ /* Verb Handler for the Response */
+ public static final String responseVerbHandler_ = "RESPONSE";
+ /* Stage for responses. */
+ public static final String responseStage_ = "RESPONSE-STAGE";
+ private enum ReservedVerbs_ {RESPONSE};
+
+ private static Map<String, String> reservedVerbs_ = new Hashtable<String, String>();
+ /* Indicate if we are currently streaming data to another node or receiving streaming data */
+ private static AtomicBoolean isStreaming_ = new AtomicBoolean(false);
+
+ /* This records all the results mapped by message Id */
+ private static ICachetable<String, IAsyncCallback> callbackMap_;
+ private static ICachetable<String, IAsyncResult> taskCompletionMap_;
+
+ /* Manages the table of endpoints it is listening on */
+ private static Set<EndPoint> endPoints_;
+
+ /* List of sockets we are listening on */
+ private static Map<EndPoint, SelectionKey> listenSockets_ = new HashMap<EndPoint, SelectionKey>();
+
+ /* Lookup table for registering message handlers based on the verb. */
+ private static Map<String, IVerbHandler> verbHandlers_;
+
+ private static Map<String, MulticastSocket> mCastMembership_ = new HashMap<String, MulticastSocket>();
+
+ /* Thread pool to handle messaging read activities of Socket and default stage */
+ private static ExecutorService messageDeserializationExecutor_;
+
+ /* Thread pool to handle messaging write activities */
+ private static ExecutorService messageSerializerExecutor_;
+
+ /* Thread pool to handle deserialization of messages read from the socket. */
+ private static ExecutorService messageDeserializerExecutor_;
+
+ /* Thread pool to handle messaging write activities */
+ private static ExecutorService streamExecutor_;
+
+ private final static ReentrantLock lock_ = new ReentrantLock();
+ private static Map<String, TcpConnectionManager> poolTable_ = new Hashtable<String, TcpConnectionManager>();
+
+ private static boolean bShutdown_ = false;
+
+ private static Logger logger_ = Logger.getLogger(MessagingService.class);
+
+ private static IMessagingService messagingService_ = new MessagingService();
+
+ public static boolean isDebugOn()
+ {
+ return debugOn_;
+ }
+
+ public static void debugOn(boolean on)
+ {
+ debugOn_ = on;
+ }
+
+ public static SerializerType getSerializerType()
+ {
+ return serializerType_;
+ }
+
+ public synchronized static void serializerType(String type)
+ {
+ if ( type.equalsIgnoreCase("binary") )
+ {
+ serializerType_ = SerializerType.BINARY;
+ }
+ else if ( type.equalsIgnoreCase("java") )
+ {
+ serializerType_ = SerializerType.JAVA;
+ }
+ else if ( type.equalsIgnoreCase("xml") )
+ {
+ serializerType_ = SerializerType.XML;
+ }
+ }
+
+ public static int getVersion()
+ {
+ return version_;
+ }
+
+ public static void setVersion(int version)
+ {
+ version_ = version;
+ }
+
+ public static IMessagingService getMessagingInstance()
+ {
+ if ( bShutdown_ )
+ {
+ lock_.lock();
+ try
+ {
+ if ( bShutdown_ )
+ {
+ messagingService_ = new MessagingService();
+ bShutdown_ = false;
+ }
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+ return messagingService_;
+ }
+
+ public Object clone() throws CloneNotSupportedException
+ {
+ //Prevents the singleton from being cloned
+ throw new CloneNotSupportedException();
+ }
+
+ protected MessagingService()
+ {
+ for ( ReservedVerbs_ verbs : ReservedVerbs_.values() )
+ {
+ reservedVerbs_.put(verbs.toString(), verbs.toString());
+ }
+ verbHandlers_ = new HashMap<String, IVerbHandler>();
+ endPoints_ = new HashSet<EndPoint>();
+ /*
+ * Leave callbacks in the cachetable long enough that any related messages will arrive
+ * before the callback is evicted from the table. The concurrency level is set at 128
+ * which is the sum of the threads in the pool that adds shit into the table and the
+ * pool that retrives the callback from here.
+ */
+ int maxSize = MessagingConfig.getMessagingThreadCount();
+ callbackMap_ = new Cachetable<String, IAsyncCallback>( 2 * DatabaseDescriptor.getRpcTimeout() );
+ taskCompletionMap_ = new Cachetable<String, IAsyncResult>( 2 * DatabaseDescriptor.getRpcTimeout() );
+
+ messageDeserializationExecutor_ = new DebuggableThreadPoolExecutor( maxSize,
+ maxSize,
+ Integer.MAX_VALUE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new ThreadFactoryImpl("MESSAGING-SERVICE-POOL")
+ );
+
+ messageSerializerExecutor_ = new DebuggableThreadPoolExecutor( maxSize,
+ maxSize,
+ Integer.MAX_VALUE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new ThreadFactoryImpl("MESSAGE-SERIALIZER-POOL")
+ );
+
+ messageDeserializerExecutor_ = new DebuggableThreadPoolExecutor( maxSize,
+ maxSize,
+ Integer.MAX_VALUE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new ThreadFactoryImpl("MESSAGE-DESERIALIZER-POOL")
+ );
+
+ streamExecutor_ = new DebuggableThreadPoolExecutor("MESSAGE-STREAMING-POOL");
+
+ protocol_ = hash(HashingSchemes.MD5, "FB-MESSAGING".getBytes());
+ /* register the response verb handler */
+ registerVerbHandlers(MessagingService.responseVerbHandler_, new ResponseVerbHandler());
+ /* register stage for response */
+ StageManager.registerStage(MessagingService.responseStage_, new MultiThreadedStage("RESPONSE-STAGE", maxSize) );
+ }
+
+ public byte[] hash(String type, byte data[])
+ {
+ byte result[] = null;
+ try
+ {
+ MessageDigest messageDigest = MessageDigest.getInstance(type);
+ result = messageDigest.digest(data);
+ }
+ catch(Exception e)
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug(LogUtil.throwableToString(e));
+ }
+ return result;
+ }
+
+ public void listen(EndPoint localEp) throws IOException
+ {
+ ServerSocketChannel serverChannel = ServerSocketChannel.open();
+ ServerSocket ss = serverChannel.socket();
+ ss.bind(localEp.getInetAddress());
+ serverChannel.configureBlocking(false);
+
+ SelectionKeyHandler handler = new TcpConnectionHandler(localEp);
+
+ SelectionKey key = SelectorManager.getSelectorManager().register(serverChannel, handler, SelectionKey.OP_ACCEPT);
+ endPoints_.add(localEp);
+ listenSockets_.put(localEp, key);
+ }
+
+ public void listenUDP(EndPoint localEp)
+ {
+ UdpConnection connection = new UdpConnection();
+ if (logger_.isDebugEnabled())
+ logger_.debug("Starting to listen on " + localEp);
+ try
+ {
+ connection.init(localEp.getPort());
+ endPoints_.add(localEp);
+ }
+ catch ( IOException e )
+ {
+ logger_.warn(LogUtil.throwableToString(e));
+ }
+ }
+
+ public static TcpConnectionManager getConnectionPool(EndPoint from, EndPoint to)
+ {
+ String key = from + ":" + to;
+ TcpConnectionManager cp = poolTable_.get(key);
+ if( cp == null )
+ {
+ lock_.lock();
+ try
+ {
+ cp = poolTable_.get(key);
+ if (cp == null )
+ {
+ cp = new TcpConnectionManager(MessagingConfig.getConnectionPoolInitialSize(),
+ MessagingConfig.getConnectionPoolGrowthFactor(),
+ MessagingConfig.getConnectionPoolMaxSize(), from, to);
+ poolTable_.put(key, cp);
+ }
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+ return cp;
+ }
+
+ public static ConnectionStatistics[] getPoolStatistics()
+ {
+ Set<ConnectionStatistics> stats = new HashSet<ConnectionStatistics>();
+ Iterator<TcpConnectionManager> it = poolTable_.values().iterator();
+ while ( it.hasNext() )
+ {
+ TcpConnectionManager cp = it.next();
+ ConnectionStatistics cs = new ConnectionStatistics(cp.getLocalEndPoint(), cp.getRemoteEndPoint(), cp.getPoolSize(), cp.getConnectionsInUse());
+ stats.add( cs );
+ }
+ return stats.toArray(new ConnectionStatistics[0]);
+ }
+
+ public static TcpConnection getConnection(EndPoint from, EndPoint to) throws IOException
+ {
+ return getConnectionPool(from, to).getConnection();
+ }
+
+ private void checkForReservedVerb(String type)
+ {
+ if ( reservedVerbs_.get(type) != null && verbHandlers_.get(type) != null )
+ {
+ throw new IllegalArgumentException( type + " is a reserved verb handler. Scram!");
+ }
+ }
+
+ public void registerVerbHandlers(String type, IVerbHandler verbHandler)
+ {
+ checkForReservedVerb(type);
+ verbHandlers_.put(type, verbHandler);
+ }
+
+ public void deregisterAllVerbHandlers(EndPoint localEndPoint)
+ {
+ Iterator keys = verbHandlers_.keySet().iterator();
+ String key = null;
+
+ /*
+ * endpoint specific verbhandlers can be distinguished because
+ * their key's contain the name of the endpoint.
+ */
+ while(keys.hasNext())
+ {
+ key = (String)keys.next();
+ if (key.contains(localEndPoint.toString()))
+ keys.remove();
+ }
+ }
+
+ public void deregisterVerbHandlers(String type)
+ {
+ verbHandlers_.remove(type);
+ }
+
+ public IVerbHandler getVerbHandler(String type)
+ {
+ IVerbHandler handler = (IVerbHandler)verbHandlers_.get(type);
+ return handler;
+ }
+
+ public String sendRR(Message message, EndPoint[] to, IAsyncCallback cb)
+ {
+ String messageId = message.getMessageId();
+ callbackMap_.put(messageId, cb);
+ for ( int i = 0; i < to.length; ++i )
+ {
+ sendOneWay(message, to[i]);
+ }
+ return messageId;
+ }
+
+ public String sendRR(Message message, EndPoint to, IAsyncCallback cb)
+ {
+ String messageId = message.getMessageId();
+ callbackMap_.put(messageId, cb);
+ sendOneWay(message, to);
+ return messageId;
+ }
+
+ public String sendRR(Message[] messages, EndPoint[] to, IAsyncCallback cb)
+ {
+ if ( messages.length != to.length )
+ {
+ throw new IllegalArgumentException("Number of messages and the number of endpoints need to be same.");
+ }
+ String groupId = GuidGenerator.guid();
+ callbackMap_.put(groupId, cb);
+ for ( int i = 0; i < messages.length; ++i )
+ {
+ messages[i].setMessageId(groupId);
+ sendOneWay(messages[i], to[i]);
+ }
+ return groupId;
+ }
+
+ public IAsyncResult sendRR(Message[] messages, EndPoint[] to)
+ {
+ if ( messages.length != to.length )
+ {
+ throw new IllegalArgumentException("Number of messages and the number of endpoints need to be same.");
+ }
+
+ IAsyncResult iar = new MultiAsyncResult(messages.length);
+ String groupId = GuidGenerator.guid();
+ taskCompletionMap_.put(groupId, iar);
+ for ( int i = 0; i < messages.length; ++i )
+ {
+ messages[i].setMessageId(groupId);
+ sendOneWay(messages[i], to[i]);
+ }
+
+ return iar;
+ }
+
+ public String sendRR(Message[][] messages, EndPoint[][] to, IAsyncCallback cb)
+ {
+ if ( messages.length != to.length )
+ {
+ throw new IllegalArgumentException("Number of messages and the number of endpoints need to be same.");
+ }
+
+ int length = messages.length;
+ String[] gids = new String[length];
+ /* Generate the requisite GUID's */
+ for ( int i = 0; i < length; ++i )
+ {
+ gids[i] = GuidGenerator.guid();
+ }
+ /* attach this context to the callback */
+ cb.attachContext(gids);
+ for ( int i = 0; i < length; ++i )
+ {
+ callbackMap_.put(gids[i], cb);
+ for ( int j = 0; j < messages[i].length; ++j )
+ {
+ messages[i][j].setMessageId(gids[i]);
+ sendOneWay(messages[i][j], to[i][j]);
+ }
+ }
+ return gids[0];
+ }
+
+ /*
+ Use this version for fire and forget style messaging.
+ */
+ public void sendOneWay(Message message, EndPoint to)
+ {
+ // do local deliveries
+ if ( message.getFrom().equals(to) )
+ {
+ MessagingService.receive(message);
+ return;
+ }
+
+ Runnable tcpWriteEvent = new MessageSerializationTask(message, to);
+ messageSerializerExecutor_.execute(tcpWriteEvent);
+ }
+
+ public IAsyncResult sendRR(Message message, EndPoint to)
+ {
+ IAsyncResult iar = new AsyncResult();
+ taskCompletionMap_.put(message.getMessageId(), iar);
+ sendOneWay(message, to);
+ return iar;
+ }
+
+ public void sendUdpOneWay(Message message, EndPoint to)
+ {
+ EndPoint from = message.getFrom();
+ if (message.getFrom().equals(to)) {
+ MessagingService.receive(message);
+ return;
+ }
+
+ UdpConnection connection = null;
+ try
+ {
+ connection = new UdpConnection();
+ connection.init();
+ connection.write(message, to);
+ }
+ catch ( IOException e )
+ {
+ logger_.warn(LogUtil.throwableToString(e));
+ }
+ finally
+ {
+ if ( connection != null )
+ connection.close();
+ }
+ }
+
+ public void stream(String file, long startPosition, long total, EndPoint from, EndPoint to)
+ {
+ isStreaming_.set(true);
+ /* Streaming asynchronously on streamExector_ threads. */
+ Runnable streamingTask = new FileStreamTask(file, startPosition, total, from, to);
+ streamExecutor_.execute(streamingTask);
+ }
+
+ /*
+ * Does the application determine if we are currently streaming data.
+ * This would imply either streaming to a receiver, receiving streamed
+ * data or both.
+ */
+ public static boolean isStreaming()
+ {
+ return isStreaming_.get();
+ }
+
+ public static void setStreamingMode(boolean bVal)
+ {
+ isStreaming_.set(bVal);
+ }
+
+ public static void shutdown()
+ {
+ logger_.info("Shutting down ...");
+ synchronized ( MessagingService.class )
+ {
+ /* Stop listening on any socket */
+ for( SelectionKey skey : listenSockets_.values() )
+ {
+ skey.cancel();
+ try
+ {
+ skey.channel().close();
+ }
+ catch (IOException e) {}
+ }
+ listenSockets_.clear();
+
+ /* Shutdown the threads in the EventQueue's */
+ messageDeserializationExecutor_.shutdownNow();
+ messageSerializerExecutor_.shutdownNow();
+ messageDeserializerExecutor_.shutdownNow();
+ streamExecutor_.shutdownNow();
+
+ /* shut down the cachetables */
+ taskCompletionMap_.shutdown();
+ callbackMap_.shutdown();
+
+ /* Interrupt the selector manager thread */
+ SelectorManager.getSelectorManager().interrupt();
+
+ poolTable_.clear();
+ verbHandlers_.clear();
+ bShutdown_ = true;
+ }
+ if (logger_.isDebugEnabled())
+ logger_.debug("Shutdown invocation complete.");
+ }
+
+ public static void receive(Message message)
+ {
+ enqueueRunnable(message.getMessageType(), new MessageDeliveryTask(message));
+ }
+
+ public static boolean isLocalEndPoint(EndPoint ep)
+ {
+ return ( endPoints_.contains(ep) );
+ }
+
+ private static void enqueueRunnable(String stageName, Runnable runnable){
+
+ IStage stage = StageManager.getStage(stageName);
+
+ if ( stage != null )
+ {
+ stage.execute(runnable);
+ }
+ else
+ {
+ logger_.info("Running on default stage - beware");
+ messageSerializerExecutor_.execute(runnable);
+ }
+ }
+
+ public static IAsyncCallback getRegisteredCallback(String key)
+ {
+ return callbackMap_.get(key);
+ }
+
+ public static void removeRegisteredCallback(String key)
+ {
+ callbackMap_.remove(key);
+ }
+
+ public static IAsyncResult getAsyncResult(String key)
+ {
+ return taskCompletionMap_.remove(key);
+ }
+
+ public static void removeAsyncResult(String key)
+ {
+ taskCompletionMap_.remove(key);
+ }
+
+ public static byte[] getProtocol()
+ {
+ return protocol_;
+ }
+
+ public static ExecutorService getReadExecutor()
+ {
+ return messageDeserializationExecutor_;
+ }
+
+ public static ExecutorService getWriteExecutor()
+ {
+ return messageSerializerExecutor_;
+ }
+
+ public static ExecutorService getDeserilizationExecutor()
+ {
+ return messageDeserializerExecutor_;
+ }
+
+ public static boolean isProtocolValid(byte[] protocol)
+ {
+ return isEqual(protocol_, protocol);
+ }
+
+ public static boolean isEqual(byte digestA[], byte digestB[])
+ {
+ return MessageDigest.isEqual(digestA, digestB);
+ }
+
+ public static byte[] toByteArray(int i)
+ {
+ byte bytes[] = new byte[4];
+ bytes[0] = (byte)(i >>> 24 & 0xff);
+ bytes[1] = (byte)(i >>> 16 & 0xff);
+ bytes[2] = (byte)(i >>> 8 & 0xff);
+ bytes[3] = (byte)(i & 0xff);
+ return bytes;
+ }
+
+ public static byte[] toByteArray(short s)
+ {
+ byte bytes[] = new byte[2];
+ bytes[0] = (byte)(s >>> 8 & 0xff);
+ bytes[1] = (byte)(s & 0xff);
+ return bytes;
+ }
+
+ public static short byteArrayToShort(byte bytes[])
+ {
+ return byteArrayToShort(bytes, 0);
+ }
+
+ public static short byteArrayToShort(byte bytes[], int offset)
+ {
+ if(bytes.length - offset < 2)
+ throw new IllegalArgumentException("A short must be 2 bytes in size.");
+ short n = 0;
+ for(int i = 0; i < 2; i++)
+ {
+ n <<= 8;
+ n |= bytes[offset + i] & 0xff;
+ }
+
+ return n;
+ }
+
+ public static int getBits(int x, int p, int n)
+ {
+ return x >>> (p + 1) - n & ~(-1 << n);
+ }
+
+ public static int byteArrayToInt(byte bytes[])
+ {
+ return byteArrayToInt(bytes, 0);
+ }
+
+ public static int byteArrayToInt(byte bytes[], int offset)
+ {
+ if(bytes.length - offset < 4)
+ throw new IllegalArgumentException("An integer must be 4 bytes in size.");
+ int n = 0;
+ for(int i = 0; i < 4; i++)
+ {
+ n <<= 8;
+ n |= bytes[offset + i] & 0xff;
+ }
+
+ return n;
+ }
+
+ public static ByteBuffer packIt(byte[] bytes, boolean compress, boolean stream, boolean listening)
+ {
+ byte[] size = toByteArray(bytes.length);
+ /*
+ Setting up the protocol header. This is 4 bytes long
+ represented as an integer. The first 2 bits indicate
+ the serializer type. The 3rd bit indicates if compression
+ is turned on or off. It is turned off by default. The 4th
+ bit indicates if we are in streaming mode. It is turned off
+ by default. The 5th bit is used to indicate that the sender
+ is not listening on any well defined port. This implies the
+ receiver needs to cache the connection using the port on the
+ socket. The following 3 bits are reserved for future use.
+ The next 8 bits indicate a version number. Remaining 15 bits
+ are not used currently.
+ */
+ int n = 0;
+ // Setting up the serializer bit
+ n |= serializerType_.ordinal();
+ // set compression bit.
+ if ( compress )
+ n |= 4;
+
+ // set streaming bit
+ if ( stream )
+ n |= 8;
+
+ // set listening 5th bit
+ if ( listening )
+ n |= 16;
+
+ // Setting up the version bit
+ n |= (version_ << 8);
+ /* Finished the protocol header setup */
+
+ byte[] header = toByteArray(n);
+ ByteBuffer buffer = ByteBuffer.allocate(16 + header.length + size.length + bytes.length);
+ buffer.put(protocol_);
+ buffer.put(header);
+ buffer.put(size);
+ buffer.put(bytes);
+ buffer.flip();
+ return buffer;
+ }
+
+ public static ByteBuffer constructStreamHeader(boolean compress, boolean stream)
+ {
+ /*
+ Setting up the protocol header. This is 4 bytes long
+ represented as an integer. The first 2 bits indicate
+ the serializer type. The 3rd bit indicates if compression
+ is turned on or off. It is turned off by default. The 4th
+ bit indicates if we are in streaming mode. It is turned off
+ by default. The following 4 bits are reserved for future use.
+ The next 8 bits indicate a version number. Remaining 15 bits
+ are not used currently.
+ */
+ int n = 0;
+ // Setting up the serializer bit
+ n |= serializerType_.ordinal();
+ // set compression bit.
+ if ( compress )
+ n |= 4;
+
+ // set streaming bit
+ if ( stream )
+ n |= 8;
+
+ // Setting up the version bit
+ n |= (version_ << 8);
+ /* Finished the protocol header setup */
+
+ byte[] header = toByteArray(n);
+ ByteBuffer buffer = ByteBuffer.allocate(16 + header.length);
+ buffer.put(protocol_);
+ buffer.put(header);
+ buffer.flip();
+ return buffer;
+ }
+}