You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 08:57:31 UTC
svn commit: r749218 [23/34] - in /incubator/cassandra: branches/ dist/
nightly/ site/ tags/ trunk/ trunk/lib/ trunk/src/ trunk/src/org/
trunk/src/org/apache/ trunk/src/org/apache/cassandra/
trunk/src/org/apache/cassandra/analytics/ trunk/src/org/apache...
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/Header.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/Header.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/Header.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/Header.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.utils.GuidGenerator;
+
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class Header implements java.io.Serializable
+{
+ static final long serialVersionUID = -3194851946523170022L;
+ private static ICompactSerializer<Header> serializer_;
+ private static AtomicInteger idGen_ = new AtomicInteger(0);
+
+ static
+ {
+ serializer_ = new HeaderSerializer();
+ }
+
+ static ICompactSerializer<Header> serializer()
+ {
+ return serializer_;
+ }
+
+ private EndPoint from_;
+ private String type_;
+ private String verb_;
+ private String messageId_;
+ protected Map<String, byte[]> details_ = new Hashtable<String, byte[]>();
+
+ Header(String id, EndPoint from, String messageType, String verb)
+ {
+ messageId_ = id;
+ from_ = from;
+ type_ = messageType;
+ verb_ = verb;
+ }
+
+ Header(String id, EndPoint from, String messageType, String verb, Map<String, byte[]> details)
+ {
+ this(id, from, messageType, verb);
+ details_ = details;
+ }
+
+ Header(EndPoint from, String messageType, String verb)
+ {
+ messageId_ = Integer.toString(idGen_.incrementAndGet());
+ from_ = from;
+ type_ = messageType;
+ verb_ = verb;
+ }
+
+ EndPoint getFrom()
+ {
+ return from_;
+ }
+
+ String getMessageType()
+ {
+ return type_;
+ }
+
+ String getVerb()
+ {
+ return verb_;
+ }
+
+ String getMessageId()
+ {
+ return messageId_;
+ }
+
+ void setMessageId(String id)
+ {
+ messageId_ = id;
+ }
+
+ void setMessageType(String type)
+ {
+ type_ = type;
+ }
+
+ void setMessageVerb(String verb)
+ {
+ verb_ = verb;
+ }
+
+ byte[] getDetail(Object key)
+ {
+ return details_.get(key);
+ }
+
+ void removeDetail(Object key)
+ {
+ details_.remove(key);
+ }
+
+ void addDetail(String key, byte[] value)
+ {
+ details_.put(key, value);
+ }
+
+ Map<String, byte[]> getDetails()
+ {
+ return details_;
+ }
+}
+
+class HeaderSerializer implements ICompactSerializer<Header>
+{
+ public void serialize(Header t, DataOutputStream dos) throws IOException
+ {
+ dos.writeUTF(t.getMessageId());
+ CompactEndPointSerializationHelper.serialize(t.getFrom(), dos);
+ dos.writeUTF(t.getMessageType());
+ dos.writeUTF( t.getVerb() );
+
+ /* Serialize the message header */
+ int size = t.details_.size();
+ dos.writeInt(size);
+ Set<String> keys = t.details_.keySet();
+
+ for( String key : keys )
+ {
+ dos.writeUTF(key);
+ byte[] value = t.details_.get(key);
+ dos.writeInt(value.length);
+ dos.write(value);
+ }
+ }
+
+ public Header deserialize(DataInputStream dis) throws IOException
+ {
+ String id = dis.readUTF();
+ EndPoint from = CompactEndPointSerializationHelper.deserialize(dis);
+ String type = dis.readUTF();
+ String verb = dis.readUTF();
+
+ /* Deserializing the message header */
+ int size = dis.readInt();
+ Map<String, byte[]> details = new Hashtable<String, byte[]>(size);
+ for ( int i = 0; i < size; ++i )
+ {
+ String key = dis.readUTF();
+ int length = dis.readInt();
+ byte[] bytes = new byte[length];
+ dis.readFully(bytes);
+ details.put(key, bytes);
+ }
+
+ return new Header(id, from, type, verb, details);
+ }
+}
+
+
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/HeaderTypes.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/HeaderTypes.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/HeaderTypes.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/HeaderTypes.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class HeaderTypes
+{
+ public final static String TASK_PROFILE_CHAIN = "TASK_PROFILE_CHAIN";
+ public static String TASK_ID = "TASK_ID";
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/IAsyncCallback.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/IAsyncCallback.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/IAsyncCallback.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/IAsyncCallback.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IAsyncCallback
+{
+ /**
+ * @param response responses to be returned
+ */
+ public void response(Message msg);
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/IAsyncResult.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/IAsyncResult.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/IAsyncResult.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/IAsyncResult.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IAsyncResult
+{
+ public Object[] get();
+ public boolean isDone();
+ public Object[] get(long timeout, TimeUnit tu) throws TimeoutException;
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/IMessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/IMessagingService.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/IMessagingService.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/IMessagingService.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+
+import javax.xml.bind.JAXBException;
+
+import org.apache.cassandra.concurrent.IStage;
+
+
+/**
+ * An IMessagingService provides the methods for sending messages to remote
+ * endpoints. IMessagingService enables the sending of request-response style
+ * messages and fire-forget style messages.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IMessagingService
+{
+ /**
+ * Register a verb and the corresponding verb handler with the
+ * Messaging Service.
+ * @param type name of the verb.
+ * @param verbHandler handler for the specified verb
+ */
+ public void registerVerbHandlers(String type, IVerbHandler verbHandler);
+
+ /**
+ * Deregister all verbhandlers corresponding to localEndPoint.
+ * @param localEndPoint
+ */
+ public void deregisterAllVerbHandlers(EndPoint localEndPoint);
+
+ /**
+ * Deregister a verbhandler corresponding to the verb from the
+ * Messaging Service.
+ * @param type name of the verb.
+ */
+ public void deregisterVerbHandlers(String type);
+
+ /**
+ * Listen on the specified port.
+ * @param ep EndPoint whose port to listen on.
+ * @param isHttp specify if the port is an Http port.
+ */
+ public void listen(EndPoint ep, boolean isHttp) throws IOException;
+
+ /**
+ * Listen on the specified port.
+ * @param ep EndPoint whose port to listen on.
+ */
+ public void listenUDP(EndPoint ep);
+
+ /**
+ * Send a message to a given endpoint.
+ * @param message message to be sent.
+ * @param to endpoint to which the message needs to be sent
+ * @return an reference to an IAsyncResult which can be queried for the
+ * response
+ */
+ public IAsyncResult sendRR(Message message, EndPoint to);
+
+ /**
+ * Send a message to the given set of endpoints and informs the MessagingService
+ * to wait for at least <code>howManyResults</code> responses to determine success
+ * of failure.
+ * @param message message to be sent.
+ * @param to endpoints to which the message needs to be sent
+ * @param cb callback interface which is used to pass the responses
+ * @return an reference to message id used to match with the result
+ */
+ public String sendRR(Message message, EndPoint[] to, IAsyncCallback cb);
+
+ /**
+ * Send a message to a given endpoint. This method specifies a callback
+ * which is invoked with the actual response.
+ * @param message message to be sent.
+ * @param to endpoint to which the message needs to be sent
+ * @param cb callback interface which is used to pass the responses or
+ * suggest that a timeout occured to the invoker of the send().
+ * suggest that a timeout occured to the invoker of the send().
+ * @return an reference to message id used to match with the result
+ */
+ public String sendRR(Message message, EndPoint to, IAsyncCallback cb);
+
+ /**
+ * Send a message to a given endpoint. The ith element in the <code>messages</code>
+ * array is sent to the ith element in the <code>to</code> array.This method assumes
+ * there is a one-one mapping between the <code>messages</code> array and
+ * the <code>to</code> array. Otherwise an IllegalArgumentException will be thrown.
+ * This method also informs the MessagingService to wait for at least
+ * <code>howManyResults</code> responses to determine success of failure.
+ * @param messages messages to be sent.
+ * @param to endpoints to which the message needs to be sent
+ * @param cb callback interface which is used to pass the responses or
+ * suggest that a timeout occured to the invoker of the send().
+ * suggest that a timeout occured to the invoker of the send().
+ * @return an reference to message id used to match with the result
+ */
+ public String sendRR(Message[] messages, EndPoint[] to, IAsyncCallback cb);
+
+ /**
+ * Send a message to a given endpoint. This method adheres to the fire and forget
+ * style messaging.
+ * @param message messages to be sent.
+ * @param to endpoint to which the message needs to be sent
+ */
+ public void sendOneWay(Message message, EndPoint to);
+
+ /**
+ * Send a message to a given endpoint. This method adheres to the fire and forget
+ * style messaging.
+ * @param message messages to be sent.
+ * @param to endpoint to which the message needs to be sent
+ */
+ public void sendUdpOneWay(Message message, EndPoint to);
+
+ /**
+ * Stream a file from source to destination. This is highly optimized
+ * to not hold any of the contents of the file in memory.
+ * @param file name of file to stream.
+ * param start position inside the file
+ * param total number of bytes to stream
+ * param to endpoint to which we need to stream the file.
+ */
+ public void stream(String file, long startPosition, long total, EndPoint from, EndPoint to);
+
+ /**
+ * This method returns the verb handler associated with the registered
+ * verb. If no handler has been registered then null is returned.
+ * @param verb for which the verb handler is sought
+ * @return a reference to IVerbHandler which is the handler for the specified verb
+ */
+ public IVerbHandler getVerbHandler(String verb);
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/IVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/IVerbHandler.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/IVerbHandler.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/IVerbHandler.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+/**
+ * IVerbHandler provides the method that all verb handlers need to implement.
+ * The concrete implementation of this interface would provide the functionality
+ * for a given verb.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IVerbHandler
+{
+ /**
+ * This method delivers a message to the implementing class (if the implementing
+ * class was registered by a call to MessagingService.registerVerbHandlers).
+ * Note that the caller should not be holding any locks when calling this method
+ * because the implementation may be synchronized.
+ *
+ * @param message - incoming message that needs handling.
+ */
+ public void doVerb(Message message);
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/Message.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/Message.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/Message.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/Message.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.lang.reflect.Array;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.*;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.utils.*;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class Message implements java.io.Serializable
+{
+ static final long serialVersionUID = 6329198792470413221L;
+ private static ICompactSerializer<Message> serializer_;
+
+ static
+ {
+ serializer_ = new MessageSerializer();
+ }
+
+ public static ICompactSerializer<Message> serializer()
+ {
+ return serializer_;
+ }
+
+ Header header_;
+ private Object[] body_ = new Object[0];
+
+ /* Ctor for JAXB. DO NOT DELETE */
+ private Message()
+ {
+ }
+
+ protected Message(String id, EndPoint from, String messageType, String verb, Object[] body)
+ {
+ header_ = new Header(id, from, messageType, verb);
+ body_ = body;
+ }
+
+ protected Message(Header header, Object[] body)
+ {
+ header_ = header;
+ body_ = body;
+ }
+
+ public Message(EndPoint from, String messageType, String verb, Object[] body)
+ {
+ header_ = new Header(from, messageType, verb);
+ body_ = body;
+ }
+
+ public byte[] getHeader(Object key)
+ {
+ return header_.getDetail(key);
+ }
+
+ public void removeHeader(Object key)
+ {
+ header_.removeDetail(key);
+ }
+
+ public void setMessageType(String type)
+ {
+ header_.setMessageType(type);
+ }
+
+ public void setMessageVerb(String verb)
+ {
+ header_.setMessageVerb(verb);
+ }
+
+ public void addHeader(String key, byte[] value)
+ {
+ header_.addDetail(key, value);
+ }
+
+ public Map<String, byte[]> getHeaders()
+ {
+ return header_.getDetails();
+ }
+
+ public Object[] getMessageBody()
+ {
+ return body_;
+ }
+
+ public void setMessageBody(Object[] body)
+ {
+ body_ = body;
+ }
+
+ public EndPoint getFrom()
+ {
+ return header_.getFrom();
+ }
+
+ public String getMessageType()
+ {
+ return header_.getMessageType();
+ }
+
+ public String getVerb()
+ {
+ return header_.getVerb();
+ }
+
+ public String getMessageId()
+ {
+ return header_.getMessageId();
+ }
+
+ public Class[] getTypes()
+ {
+ List<Class> types = new ArrayList<Class>();
+
+ for ( int i = 0; i < body_.length; ++i )
+ {
+ if ( body_[i].getClass().isArray() )
+ {
+ int size = Array.getLength(body_[i]);
+ if ( size > 0 )
+ {
+ types.add( Array.get( body_[i], 0).getClass() );
+ }
+ }
+ else
+ {
+ types.add(body_[i].getClass());
+ }
+ }
+
+ return types.toArray( new Class[0] );
+ }
+
+ void setMessageId(String id)
+ {
+ header_.setMessageId(id);
+ }
+
+ public Message getReply(EndPoint from, Object[] args)
+ {
+ Message response = new Message(getMessageId(),
+ from,
+ MessagingService.responseStage_,
+ MessagingService.responseVerbHandler_,
+ args);
+ return response;
+ }
+
+ public String toString()
+ {
+ StringBuffer sbuf = new StringBuffer("");
+ String separator = System.getProperty("line.separator");
+ sbuf.append("ID:" + getMessageId());
+ sbuf.append(separator);
+ sbuf.append("FROM:" + getFrom());
+ sbuf.append(separator);
+ sbuf.append("TYPE:" + getMessageType());
+ sbuf.append(separator);
+ sbuf.append("VERB:" + getVerb());
+ sbuf.append(separator);
+ sbuf.append("BODY TYPE:" + getBodyTypes());
+ sbuf.append(separator);
+ return sbuf.toString();
+ }
+
+ private String getBodyTypes()
+ {
+ StringBuffer sbuf = new StringBuffer("");
+ Class[] types = getTypes();
+ for ( int i = 0; i < types.length; ++i )
+ {
+ sbuf.append(types[i].getName());
+ sbuf.append(" ");
+ }
+ return sbuf.toString();
+ }
+}
+
+class MessageSerializer implements ICompactSerializer<Message>
+{
+ public void serialize(Message t, DataOutputStream dos) throws IOException
+ {
+ Header.serializer().serialize( t.header_, dos);
+ byte[] bytes = (byte[])t.getMessageBody()[0];
+ dos.writeInt(bytes.length);
+ dos.write(bytes);
+ }
+
+ public Message deserialize(DataInputStream dis) throws IOException
+ {
+ Header header = Header.serializer().deserialize(dis);
+ int size = dis.readInt();
+ byte[] bytes = new byte[size];
+ dis.readFully(bytes);
+ // return new Message(header.getMessageId(), header.getFrom(), header.getMessageType(), header.getVerb(), new Object[]{bytes});
+ return new Message(header, new Object[]{bytes});
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/MessageDeliveryTask.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/MessageDeliveryTask.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/MessageDeliveryTask.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/MessageDeliveryTask.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import org.apache.cassandra.continuations.Suspendable;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class MessageDeliveryTask implements Runnable
+{
+ private Message message_;
+ private static Logger logger_ = Logger.getLogger(MessageDeliveryTask.class);
+
+ public MessageDeliveryTask(Message message)
+ {
+ message_ = message;
+ }
+
+ public void run()
+ {
+ try
+ {
+ String verb = message_.getVerb();
+ IVerbHandler verbHandler = MessagingService.getMessagingInstance().getVerbHandler(verb);
+ if ( verbHandler != null )
+ {
+ verbHandler.doVerb(message_);
+ }
+ }
+ catch (Throwable th)
+ {
+ logger_.warn( LogUtil.throwableToString(th) );
+ }
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/MessageDeserializationTask.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/MessageDeserializationTask.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/MessageDeserializationTask.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/MessageDeserializationTask.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.net.io.FastSerializer;
+import org.apache.cassandra.net.io.ISerializer;
+import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class MessageDeserializationTask implements Runnable
+{
+ private static Logger logger_ = Logger.getLogger(MessageDeserializationTask.class);
+ private static ISerializer serializer_ = new FastSerializer();
+ private int serializerType_;
+ private byte[] bytes_ = new byte[0];
+
+ MessageDeserializationTask(int serializerType, byte[] bytes)
+ {
+ serializerType_ = serializerType;
+ bytes_ = bytes;
+ }
+
+ public void run()
+ {
+ /* For DEBUG only. Printing queue length */
+ DebuggableThreadPoolExecutor es = (DebuggableThreadPoolExecutor)MessagingService.getDeserilizationExecutor();
+ logger_.debug( "Message Deserialization Task: " + (es.getTaskCount() - es.getCompletedTaskCount()) );
+ /* END DEBUG */
+ try
+ {
+ Message message = (Message)serializer_.deserialize(bytes_);
+
+ if ( message != null )
+ {
+ message = SinkManager.processServerMessageSink(message);
+ MessagingService.receive(message);
+ }
+ }
+ catch ( IOException ex )
+ {
+ logger_.warn(LogUtil.throwableToString(ex));
+ }
+ }
+
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/MessageSerializationTask.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/MessageSerializationTask.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/MessageSerializationTask.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/MessageSerializationTask.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+import java.net.SocketException;
+
+import org.apache.cassandra.concurrent.Context;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.ThreadLocalContext;
+import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class MessageSerializationTask implements Runnable
+{
+ private static Logger logger_ = Logger.getLogger(MessageSerializationTask.class);
+ private Message message_;
+ private EndPoint to_;
+
+ public MessageSerializationTask(Message message, EndPoint to)
+ {
+ message_ = message;
+ to_ = to;
+ }
+
+ public Message getMessage()
+ {
+ return message_;
+ }
+
+ public void run()
+ {
+ /* For DEBUG only. Printing queue length */
+ DebuggableThreadPoolExecutor es = (DebuggableThreadPoolExecutor)MessagingService.getWriteExecutor();
+ logger_.debug( "Message Serialization Task: " + (es.getTaskCount() - es.getCompletedTaskCount()) );
+ /* END DEBUG */
+
+ /* Adding the message to be serialized in the TLS. For accessing in the afterExecute() */
+ Context ctx = new Context();
+ ctx.put(this.getClass().getName(), message_);
+ ThreadLocalContext.put(ctx);
+
+ TcpConnection connection = null;
+ try
+ {
+ Message message = SinkManager.processClientMessageSink(message_);
+ if(null == message)
+ return;
+ connection = MessagingService.getConnection(message_.getFrom(), to_);
+ connection.write(message);
+ }
+ catch ( SocketException se )
+ {
+ // Shutting down the entire pool. May be too conservative an approach.
+ MessagingService.getConnectionPool(message_.getFrom(), to_).shutdown();
+ logger_.warn(LogUtil.throwableToString(se));
+ }
+ catch ( IOException e )
+ {
+ logConnectAndIOException(e, connection);
+ }
+ catch (Throwable th)
+ {
+ logger_.warn(LogUtil.throwableToString(th));
+ }
+ finally
+ {
+ if ( connection != null )
+ {
+ connection.close();
+ }
+ }
+ }
+
+ private void logConnectAndIOException(IOException ex, TcpConnection connection)
+ {
+ if ( connection != null )
+ {
+ connection.errorClose();
+ }
+ logger_.warn(LogUtil.throwableToString(ex));
+ }
+}
+
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingConfig.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingConfig.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingConfig.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingConfig.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class MessagingConfig
+{
+ // The expected time for one message round trip. It does not reflect message processing
+ // time at the receiver.
+ private static int expectedRoundTripTime_ = 400;
+ private static int numberOfPorts_ = 2;
+ private static int threadCount_ = 4;
+
+ public static int getMessagingThreadCount()
+ {
+ return threadCount_;
+ }
+
+ public static void setMessagingThreadCount(int threadCount)
+ {
+ threadCount_ = threadCount;
+ }
+
+ public static void setExpectedRoundTripTime(int roundTripTimeMillis) {
+ if(roundTripTimeMillis > 0 )
+ expectedRoundTripTime_ = roundTripTimeMillis;
+ }
+
+ public static int getExpectedRoundTripTime()
+ {
+ return expectedRoundTripTime_;
+ }
+
+ public static int getConnectionPoolInitialSize()
+ {
+ return ConnectionPoolConfiguration.initialSize_;
+ }
+
+ public static int getConnectionPoolGrowthFactor()
+ {
+ return ConnectionPoolConfiguration.growthFactor_;
+ }
+
+ public static int getConnectionPoolMaxSize()
+ {
+ return ConnectionPoolConfiguration.maxSize_;
+ }
+
+ public static int getConnectionPoolWaitTimeout()
+ {
+ return ConnectionPoolConfiguration.waitTimeout_;
+ }
+
+ public static int getConnectionPoolMonitorInterval()
+ {
+ return ConnectionPoolConfiguration.monitorInterval_;
+ }
+
+ public static void setNumberOfPorts(int n)
+ {
+ numberOfPorts_ = n;
+ }
+
+ public static int getNumberOfPorts()
+ {
+ return numberOfPorts_;
+ }
+}
+
+class ConnectionPoolConfiguration
+{
+ public static int initialSize_ = 1;
+ public static int growthFactor_ = 1;
+ public static int maxSize_ = 1;
+ public static int waitTimeout_ = 10;
+ public static int monitorInterval_ = 300;
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingService.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingService.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingService.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,743 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.*;
+import java.lang.management.ManagementFactory;
+import java.net.*;
+import java.security.MessageDigest;
+import java.util.*;
+import java.nio.ByteBuffer;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+import java.nio.channels.*;
+import org.apache.cassandra.concurrent.*;
+import org.apache.cassandra.net.io.*;
+import org.apache.cassandra.utils.*;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.xml.bind.*;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.IStage;
+import org.apache.cassandra.concurrent.MultiThreadedStage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.http.HttpConnectionHandler;
+import org.apache.cassandra.net.io.SerializerType;
+import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.utils.Cachetable;
+import org.apache.cassandra.utils.GuidGenerator;
+import org.apache.cassandra.utils.HashingSchemes;
+import org.apache.cassandra.utils.ICachetable;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class MessagingService implements IMessagingService, MessagingServiceMBean
+{
+ private static boolean debugOn_ = false;
+
+ private static int version_ = 1;
+ //TODO: make this parameter dynamic somehow. Not sure if config is appropriate.
+ private static SerializerType serializerType_ = SerializerType.BINARY;
+
+ private static byte[] protocol_ = new byte[16];
+ /* Verb Handler for the Response */
+ public static final String responseVerbHandler_ = "RESPONSE";
+ /* Stage for responses. */
+ public static final String responseStage_ = "RESPONSE-STAGE";
+ private enum ReservedVerbs_ {RESPONSE};
+
+ private static Map<String, String> reservedVerbs_ = new Hashtable<String, String>();
+ /* Indicate if we are currently streaming data to another node or receiving streaming data */
+ private static AtomicBoolean isStreaming_ = new AtomicBoolean(false);
+
+ /* This records all the results mapped by message Id */
+ private static ICachetable<String, IAsyncCallback> callbackMap_;
+ private static ICachetable<String, IAsyncResult> taskCompletionMap_;
+
+ /* Manages the table of endpoints it is listening on */
+ private static Set<EndPoint> endPoints_;
+
+ /* List of sockets we are listening on */
+ private static Map<EndPoint, SelectionKey> listenSockets_ = new HashMap<EndPoint, SelectionKey>();
+
+ /* Lookup table for registering message handlers based on the verb. */
+ private static Map<String, IVerbHandler> verbHandlers_;
+
+ private static Map<String, MulticastSocket> mCastMembership_ = new HashMap<String, MulticastSocket>();
+
+ /* Thread pool to handle messaging read activities of Socket and default stage */
+ private static ExecutorService messageDeserializationExecutor_;
+
+ /* Thread pool to handle messaging write activities */
+ private static ExecutorService messageSerializerExecutor_;
+
+ /* Thread pool to handle deserialization of messages read from the socket. */
+ private static ExecutorService messageDeserializerExecutor_;
+
+ /* Thread pool to handle messaging write activities */
+ private static ExecutorService streamExecutor_;
+
+ private final static ReentrantLock lock_ = new ReentrantLock();
+ private static Map<String, TcpConnectionManager> poolTable_ = new Hashtable<String, TcpConnectionManager>();
+
+ private static boolean bShutdown_ = false;
+
+ private static Logger logger_ = Logger.getLogger(MessagingService.class);
+
+ private static IMessagingService messagingService_ = new MessagingService();
+
+ public static boolean isDebugOn()
+ {
+ return debugOn_;
+ }
+
+ public static void debugOn(boolean on)
+ {
+ debugOn_ = on;
+ }
+
+ public static SerializerType getSerializerType()
+ {
+ return serializerType_;
+ }
+
+ public synchronized static void serializerType(String type)
+ {
+ if ( type.equalsIgnoreCase("binary") )
+ {
+ serializerType_ = SerializerType.BINARY;
+ }
+ else if ( type.equalsIgnoreCase("java") )
+ {
+ serializerType_ = SerializerType.JAVA;
+ }
+ else if ( type.equalsIgnoreCase("xml") )
+ {
+ serializerType_ = SerializerType.XML;
+ }
+ }
+
+ public static int getVersion()
+ {
+ return version_;
+ }
+
+ public static void setVersion(int version)
+ {
+ version_ = version;
+ }
+
+ public static IMessagingService getMessagingInstance()
+ {
+ if ( bShutdown_ )
+ {
+ lock_.lock();
+ try
+ {
+ if ( bShutdown_ )
+ {
+ messagingService_ = new MessagingService();
+ bShutdown_ = false;
+ }
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+ return messagingService_;
+ }
+
+ public Object clone() throws CloneNotSupportedException
+ {
+ //Prevents the singleton from being cloned
+ throw new CloneNotSupportedException();
+ }
+
+ protected MessagingService()
+ {
+ for ( ReservedVerbs_ verbs : ReservedVerbs_.values() )
+ {
+ reservedVerbs_.put(verbs.toString(), verbs.toString());
+ }
+ verbHandlers_ = new HashMap<String, IVerbHandler>();
+ endPoints_ = new HashSet<EndPoint>();
+ /*
+ * Leave callbacks in the cachetable long enough that any related messages will arrive
+ * before the callback is evicted from the table. The concurrency level is set at 128
+ * which is the sum of the threads in the pool that adds shit into the table and the
+ * pool that retrives the callback from here.
+ */
+ int maxSize = MessagingConfig.getMessagingThreadCount();
+ callbackMap_ = new Cachetable<String, IAsyncCallback>( 2 * DatabaseDescriptor.getRpcTimeout() );
+ taskCompletionMap_ = new Cachetable<String, IAsyncResult>( 2 * DatabaseDescriptor.getRpcTimeout() );
+
+ messageDeserializationExecutor_ = new DebuggableThreadPoolExecutor( maxSize,
+ maxSize,
+ Integer.MAX_VALUE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new ThreadFactoryImpl("MESSAGING-SERVICE-POOL")
+ );
+
+ messageSerializerExecutor_ = new DebuggableThreadPoolExecutor( maxSize,
+ maxSize,
+ Integer.MAX_VALUE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new ThreadFactoryImpl("MESSAGE-SERIALIZER-POOL")
+ );
+
+ messageDeserializerExecutor_ = new DebuggableThreadPoolExecutor( maxSize,
+ maxSize,
+ Integer.MAX_VALUE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new ThreadFactoryImpl("MESSAGE-DESERIALIZER-POOL")
+ );
+
+ streamExecutor_ = new DebuggableThreadPoolExecutor( 1,
+ 1,
+ Integer.MAX_VALUE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new ThreadFactoryImpl("MESSAGE-STREAMING-POOL")
+ );
+
+ protocol_ = hash(HashingSchemes.MD5, "FB-MESSAGING".getBytes());
+ /* register the response verb handler */
+ registerVerbHandlers(MessagingService.responseVerbHandler_, new ResponseVerbHandler());
+ /* register stage for response */
+ StageManager.registerStage(MessagingService.responseStage_, new MultiThreadedStage("RESPONSE-STAGE", maxSize) );
+ }
+
+ public byte[] hash(String type, byte data[])
+ {
+ byte result[] = null;
+ try
+ {
+ MessageDigest messageDigest = MessageDigest.getInstance(type);
+ result = messageDigest.digest(data);
+ }
+ catch(Exception e)
+ {
+ LogUtil.getLogger(MessagingService.class.getName()).debug(LogUtil.throwableToString(e));
+ }
+ return result;
+ }
+
+ public long getMessagingSerializerTaskCount()
+ {
+ DebuggableThreadPoolExecutor dstp = (DebuggableThreadPoolExecutor)messageSerializerExecutor_;
+ return dstp.getTaskCount() - dstp.getCompletedTaskCount();
+ }
+
+ public long getMessagingReceiverTaskCount()
+ {
+ DebuggableThreadPoolExecutor dstp = (DebuggableThreadPoolExecutor)messageDeserializationExecutor_;
+ return dstp.getTaskCount() - dstp.getCompletedTaskCount();
+ }
+
+ public void listen(EndPoint localEp, boolean isHttp) throws IOException
+ {
+ ServerSocketChannel serverChannel = ServerSocketChannel.open();
+ ServerSocket ss = serverChannel.socket();
+ ss.bind(localEp.getInetAddress());
+ serverChannel.configureBlocking(false);
+
+ SelectionKeyHandler handler = null;
+ if ( isHttp )
+ {
+ handler = new HttpConnectionHandler();
+ }
+ else
+ {
+ handler = new TcpConnectionHandler(localEp);
+ }
+
+ SelectionKey key = SelectorManager.getSelectorManager().register(serverChannel, handler, SelectionKey.OP_ACCEPT);
+ endPoints_.add(localEp);
+ listenSockets_.put(localEp, key);
+ }
+
+ public void listenUDP(EndPoint localEp)
+ {
+ UdpConnection connection = new UdpConnection();
+ logger_.debug("Starting to listen on " + localEp);
+ try
+ {
+ connection.init(localEp.getPort());
+ endPoints_.add(localEp);
+ }
+ catch ( IOException e )
+ {
+ logger_.warn(LogUtil.throwableToString(e));
+ }
+ }
+
+ public static TcpConnectionManager getConnectionPool(EndPoint from, EndPoint to)
+ {
+ String key = from + ":" + to;
+ TcpConnectionManager cp = poolTable_.get(key);
+ if( cp == null )
+ {
+ lock_.lock();
+ try
+ {
+ cp = poolTable_.get(key);
+ if (cp == null )
+ {
+ cp = new TcpConnectionManager(MessagingConfig.getConnectionPoolInitialSize(),
+ MessagingConfig.getConnectionPoolGrowthFactor(),
+ MessagingConfig.getConnectionPoolMaxSize(), from, to);
+ poolTable_.put(key, cp);
+ }
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+ return cp;
+ }
+
+ public static ConnectionStatistics[] getPoolStatistics()
+ {
+ Set<ConnectionStatistics> stats = new HashSet<ConnectionStatistics>();
+ Iterator<TcpConnectionManager> it = poolTable_.values().iterator();
+ while ( it.hasNext() )
+ {
+ TcpConnectionManager cp = it.next();
+ ConnectionStatistics cs = new ConnectionStatistics(cp.getLocalEndPoint(), cp.getRemoteEndPoint(), cp.getPoolSize(), cp.getConnectionsInUse());
+ stats.add( cs );
+ }
+ return stats.toArray(new ConnectionStatistics[0]);
+ }
+
+ public static TcpConnection getConnection(EndPoint from, EndPoint to) throws IOException
+ {
+ return getConnectionPool(from, to).getConnection();
+ }
+
+ private void checkForReservedVerb(String type)
+ {
+ if ( reservedVerbs_.get(type) != null && verbHandlers_.get(type) != null )
+ {
+ throw new IllegalArgumentException( type + " is a reserved verb handler. Scram!");
+ }
+ }
+
+ public void registerVerbHandlers(String type, IVerbHandler verbHandler)
+ {
+ checkForReservedVerb(type);
+ verbHandlers_.put(type, verbHandler);
+ }
+
+ public void deregisterAllVerbHandlers(EndPoint localEndPoint)
+ {
+ Iterator keys = verbHandlers_.keySet().iterator();
+ String key = null;
+
+ /*
+ * endpoint specific verbhandlers can be distinguished because
+ * their key's contain the name of the endpoint.
+ */
+ while(keys.hasNext())
+ {
+ key = (String)keys.next();
+ if(-1 != key.indexOf(localEndPoint.toString()))
+ keys.remove();
+ }
+ }
+
+ public void deregisterVerbHandlers(String type)
+ {
+ verbHandlers_.remove(type);
+ }
+
+ public IVerbHandler getVerbHandler(String type)
+ {
+ IVerbHandler handler = (IVerbHandler)verbHandlers_.get(type);
+ return handler;
+ }
+
+ public String sendRR(Message message, EndPoint[] to, IAsyncCallback cb)
+ {
+ String messageId = message.getMessageId();
+ callbackMap_.put(messageId, cb);
+ for ( int i = 0; i < to.length; ++i )
+ {
+ sendOneWay(message, to[i]);
+ }
+ return messageId;
+ }
+
+ public String sendRR(Message message, EndPoint to, IAsyncCallback cb)
+ {
+ String messageId = message.getMessageId();
+ callbackMap_.put(messageId, cb);
+ sendOneWay(message, to);
+ return messageId;
+ }
+
+ public String sendRR(Message[] messages, EndPoint[] to, IAsyncCallback cb)
+ {
+ if ( messages.length != to.length )
+ {
+ throw new IllegalArgumentException("Number of messages and the number of endpoints need to be same.");
+ }
+ String groupId = GuidGenerator.guid();
+ callbackMap_.put(groupId, cb);
+ for ( int i = 0; i < messages.length; ++i )
+ {
+ messages[i].setMessageId(groupId);
+ sendOneWay(messages[i], to[i]);
+ }
+ return groupId;
+ }
+
+ /*
+ Use this version for fire and forget style messaging.
+ */
+ public void sendOneWay(Message message, EndPoint to)
+ {
+ // do local deliveries
+ if ( message.getFrom().equals(to) )
+ {
+ MessagingService.receive(message);
+ return;
+ }
+
+ Runnable tcpWriteEvent = new MessageSerializationTask(message, to);
+ messageSerializerExecutor_.execute(tcpWriteEvent);
+ }
+
+ public IAsyncResult sendRR(Message message, EndPoint to)
+ {
+ IAsyncResult iar = new AsyncResult();
+ taskCompletionMap_.put(message.getMessageId(), iar);
+ sendOneWay(message, to);
+ return iar;
+ }
+
+ public void sendUdpOneWay(Message message, EndPoint to)
+ {
+ EndPoint from = message.getFrom();
+ if (message.getFrom().equals(to)) {
+ MessagingService.receive(message);
+ return;
+ }
+
+ UdpConnection connection = null;
+ try
+ {
+ connection = new UdpConnection();
+ connection.init();
+ connection.write(message, to);
+ }
+ catch ( IOException e )
+ {
+ logger_.warn(LogUtil.throwableToString(e));
+ }
+ finally
+ {
+ if ( connection != null )
+ connection.close();
+ }
+ }
+
+ public void stream(String file, long startPosition, long total, EndPoint from, EndPoint to)
+ {
+ isStreaming_.set(true);
+ /* Streaming asynchronously on streamExector_ threads. */
+ Runnable streamingTask = new FileStreamTask(file, startPosition, total, from, to);
+ streamExecutor_.execute(streamingTask);
+ }
+
+ /*
+ * Does the application determine if we are currently streaming data.
+ * This would imply either streaming to a receiver, receiving streamed
+ * data or both.
+ */
+ public static boolean isStreaming()
+ {
+ return isStreaming_.get();
+ }
+
+ public static void setStreamingMode(boolean bVal)
+ {
+ isStreaming_.set(bVal);
+ }
+
+ public static void shutdown()
+ {
+ logger_.info("Shutting down ...");
+ synchronized ( MessagingService.class )
+ {
+ /* Stop listening on any socket */
+ for( SelectionKey skey : listenSockets_.values() )
+ {
+ SelectorManager.getSelectorManager().cancel(skey);
+ }
+ listenSockets_.clear();
+
+ /* Shutdown the threads in the EventQueue's */
+ messageDeserializationExecutor_.shutdownNow();
+ messageSerializerExecutor_.shutdownNow();
+ messageDeserializerExecutor_.shutdownNow();
+ streamExecutor_.shutdownNow();
+
+ /* shut down the cachetables */
+ taskCompletionMap_.shutdown();
+ callbackMap_.shutdown();
+
+ /* Interrupt the selector manager thread */
+ SelectorManager.getSelectorManager().interrupt();
+
+ poolTable_.clear();
+ verbHandlers_.clear();
+ bShutdown_ = true;
+ }
+ logger_.debug("Shutdown invocation complete.");
+ }
+
+ public static void receive(Message message)
+ {
+ enqueueRunnable(message.getMessageType(), new MessageDeliveryTask(message));
+ }
+
+ public static boolean isLocalEndPoint(EndPoint ep)
+ {
+ return ( endPoints_.contains(ep) );
+ }
+
+ private static void enqueueRunnable(String stageName, Runnable runnable){
+
+ IStage stage = StageManager.getStage(stageName);
+
+ if ( stage != null )
+ {
+ logger_.info("Running on stage " + stage.getName());
+ stage.execute(runnable);
+ }
+ else
+ {
+ logger_.info("Running on default stage - beware");
+ messageSerializerExecutor_.execute(runnable);
+ }
+ }
+
+ public static IAsyncCallback getRegisteredCallback(String key)
+ {
+ return callbackMap_.get(key);
+ }
+
+ public static void removeRegisteredCallback(String key)
+ {
+ callbackMap_.remove(key);
+ }
+
+ public static IAsyncResult getAsyncResult(String key)
+ {
+ return taskCompletionMap_.remove(key);
+ }
+
+ public static void removeAsyncResult(String key)
+ {
+ taskCompletionMap_.remove(key);
+ }
+
+ public static byte[] getProtocol()
+ {
+ return protocol_;
+ }
+
+ public static ExecutorService getReadExecutor()
+ {
+ return messageDeserializationExecutor_;
+ }
+
+ public static ExecutorService getWriteExecutor()
+ {
+ return messageSerializerExecutor_;
+ }
+
+ public static ExecutorService getDeserilizationExecutor()
+ {
+ return messageDeserializerExecutor_;
+ }
+
+ public static boolean isProtocolValid(byte[] protocol)
+ {
+ return isEqual(protocol_, protocol);
+ }
+
+ public static boolean isEqual(byte digestA[], byte digestB[])
+ {
+ return MessageDigest.isEqual(digestA, digestB);
+ }
+
+ public static byte[] toByteArray(int i)
+ {
+ byte bytes[] = new byte[4];
+ bytes[0] = (byte)(i >>> 24 & 0xff);
+ bytes[1] = (byte)(i >>> 16 & 0xff);
+ bytes[2] = (byte)(i >>> 8 & 0xff);
+ bytes[3] = (byte)(i & 0xff);
+ return bytes;
+ }
+
+ public static byte[] toByteArray(short s)
+ {
+ byte bytes[] = new byte[2];
+ bytes[0] = (byte)(s >>> 8 & 0xff);
+ bytes[1] = (byte)(s & 0xff);
+ return bytes;
+ }
+
+ public static short byteArrayToShort(byte bytes[])
+ {
+ return byteArrayToShort(bytes, 0);
+ }
+
+ public static short byteArrayToShort(byte bytes[], int offset)
+ {
+ if(bytes.length - offset < 2)
+ throw new IllegalArgumentException("A short must be 2 bytes in size.");
+ short n = 0;
+ for(int i = 0; i < 2; i++)
+ {
+ n <<= 8;
+ n |= bytes[offset + i] & 0xff;
+ }
+
+ return n;
+ }
+
+ public static int getBits(int x, int p, int n)
+ {
+ return x >>> (p + 1) - n & ~(-1 << n);
+ }
+
+ public static int byteArrayToInt(byte bytes[])
+ {
+ return byteArrayToInt(bytes, 0);
+ }
+
+ public static int byteArrayToInt(byte bytes[], int offset)
+ {
+ if(bytes.length - offset < 4)
+ throw new IllegalArgumentException("An integer must be 4 bytes in size.");
+ int n = 0;
+ for(int i = 0; i < 4; i++)
+ {
+ n <<= 8;
+ n |= bytes[offset + i] & 0xff;
+ }
+
+ return n;
+ }
+
+ public static ByteBuffer packIt(byte[] bytes, boolean compress, boolean stream, boolean listening)
+ {
+ byte[] size = toByteArray(bytes.length);
+ /*
+ Setting up the protocol header. This is 4 bytes long
+ represented as an integer. The first 2 bits indicate
+ the serializer type. The 3rd bit indicates if compression
+ is turned on or off. It is turned off by default. The 4th
+ bit indicates if we are in streaming mode. It is turned off
+ by default. The 5th bit is used to indicate that the sender
+ is not listening on any well defined port. This implies the
+ receiver needs to cache the connection using the port on the
+ socket. The following 3 bits are reserved for future use.
+ The next 8 bits indicate a version number. Remaining 15 bits
+ are not used currently.
+ */
+ int n = 0;
+ // Setting up the serializer bit
+ n |= serializerType_.ordinal();
+ // set compression bit.
+ if ( compress )
+ n |= 4;
+
+ // set streaming bit
+ if ( stream )
+ n |= 8;
+
+ // set listening 5th bit
+ if ( listening )
+ n |= 16;
+
+ // Setting up the version bit
+ n |= (version_ << 8);
+ /* Finished the protocol header setup */
+
+ byte[] header = toByteArray(n);
+ ByteBuffer buffer = ByteBuffer.allocate(16 + header.length + size.length + bytes.length);
+ buffer.put(protocol_);
+ buffer.put(header);
+ buffer.put(size);
+ buffer.put(bytes);
+ buffer.flip();
+ return buffer;
+ }
+
+ public static ByteBuffer constructStreamHeader(boolean compress, boolean stream)
+ {
+ /*
+ Setting up the protocol header. This is 4 bytes long
+ represented as an integer. The first 2 bits indicate
+ the serializer type. The 3rd bit indicates if compression
+ is turned on or off. It is turned off by default. The 4th
+ bit indicates if we are in streaming mode. It is turned off
+ by default. The following 4 bits are reserved for future use.
+ The next 8 bits indicate a version number. Remaining 15 bits
+ are not used currently.
+ */
+ int n = 0;
+ // Setting up the serializer bit
+ n |= serializerType_.ordinal();
+ // set compression bit.
+ if ( compress )
+ n |= 4;
+
+ // set streaming bit
+ if ( stream )
+ n |= 8;
+
+ // Setting up the version bit
+ n |= (version_ << 8);
+ /* Finished the protocol header setup */
+
+ byte[] header = toByteArray(n);
+ ByteBuffer buffer = ByteBuffer.allocate(16 + header.length);
+ buffer.put(protocol_);
+ buffer.put(header);
+ buffer.flip();
+ return buffer;
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingServiceMBean.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingServiceMBean.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingServiceMBean.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingServiceMBean.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface MessagingServiceMBean
+{
+ public long getMessagingSerializerTaskCount();
+ public long getMessagingReceiverTaskCount();
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/ProtocolHeader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/ProtocolHeader.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/ProtocolHeader.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/ProtocolHeader.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public final class ProtocolHeader
+{
+ public static final String SERIALIZER = "SERIALIZER";
+ public static final String COMPRESSION = "COMPRESSION";
+ public static final String VERSION = "VERSION";
+
+ public int serializerType_;
+ public boolean isCompressed_;
+ public boolean isStreamingMode_;
+ public boolean isListening_;
+ public int version_;
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/ResponseVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/ResponseVerbHandler.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/ResponseVerbHandler.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/ResponseVerbHandler.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class ResponseVerbHandler implements IVerbHandler
+{
+ private static final Logger logger_ = Logger.getLogger( ResponseVerbHandler.class );
+
+ public void doVerb(Message message)
+ {
+ String messageId = message.getMessageId();
+ IAsyncCallback cb = MessagingService.getRegisteredCallback(messageId);
+ if ( cb != null )
+ {
+ logger_.info("Processing response on a callback from " + message.getFrom());
+ cb.response(message);
+ }
+ else
+ {
+ AsyncResult ar = (AsyncResult)MessagingService.getAsyncResult(messageId);
+ if ( ar != null )
+ {
+ logger_.info("Processing response on an async result from " + message.getFrom());
+ ar.result(message.getMessageBody());
+ }
+ }
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectionKeyHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectionKeyHandler.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectionKeyHandler.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectionKeyHandler.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.nio.channels.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class SelectionKeyHandler
+{
+ public void modifyKey(SelectionKey key)
+ {
+ throw new UnsupportedOperationException("modifyKey() cannot be called on " + getClass().getName() + "!");
+ }
+
+ public void modifyKeyForRead(SelectionKey key)
+ {
+ throw new UnsupportedOperationException("modifyKeyForRead() cannot be called on " + getClass().getName() + "!");
+ }
+
+ public void modifyKeyForWrite(SelectionKey key)
+ {
+ throw new UnsupportedOperationException("modifyKeyForWrite() cannot be called on " + getClass().getName() + "!");
+ }
+
+ /**
+ * Method which is called when the key becomes acceptable.
+ *
+ * @param key The key which is acceptable.
+ */
+ public void accept(SelectionKey key)
+ {
+ throw new UnsupportedOperationException("accept() cannot be called on " + getClass().getName() + "!");
+ }
+
+ /**
+ * Method which is called when the key becomes connectable.
+ *
+ * @param key The key which is connectable.
+ */
+ public void connect(SelectionKey key)
+ {
+ throw new UnsupportedOperationException("connect() cannot be called on " + getClass().getName() + "!");
+ }
+
+ /**
+ * Method which is called when the key becomes readable.
+ *
+ * @param key The key which is readable.
+ */
+ public void read(SelectionKey key)
+ {
+ throw new UnsupportedOperationException("read() cannot be called on " + getClass().getName() + "!");
+ }
+
+ /**
+ * Method which is called when the key becomes writable.
+ *
+ * @param key The key which is writable.
+ */
+ public void write(SelectionKey key)
+ {
+ throw new UnsupportedOperationException("write() cannot be called on " + getClass().getName() + "!");
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectorManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectorManager.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectorManager.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectorManager.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,366 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.TreeSet;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.utils.*;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class SelectorManager extends Thread
+{
+ private static final Logger logger_ = Logger.getLogger(SelectorManager.class);
+ // the underlying selector used
+ /**
+ * DESCRIBE THE FIELD
+ */
+ protected Selector selector_;
+ protected HashSet<SelectionKey> modifyKeysForRead_;
+ protected HashSet<SelectionKey> modifyKeysForWrite_;
+
+ // the list of keys waiting to be cancelled
+ protected HashSet<SelectionKey> cancelledKeys_;
+
+ // The static selector manager which is used by all applications
+ private static SelectorManager manager_;
+
+ // The static UDP selector manager which is used by all applications
+ private static SelectorManager udpManager_;
+
+ /**
+ * Constructor, which is private since there is only one selector per JVM.
+ *
+ * @param profile
+ * DESCRIBE THE PARAMETER
+ */
+ protected SelectorManager(String name)
+ {
+ super(name);
+ this.modifyKeysForRead_ = new HashSet<SelectionKey>();
+ this.modifyKeysForWrite_ = new HashSet<SelectionKey>();
+ this.cancelledKeys_ = new HashSet<SelectionKey>();
+
+ // attempt to create selector
+ try
+ {
+ selector_ = Selector.open();
+ }
+ catch (IOException e)
+ {
+ logger_.error("SEVERE ERROR (SelectorManager): Error creating selector "
+ + e);
+ }
+
+ setDaemon(false);
+ start();
+ }
+
+ /**
+ * Method which asks the Selector Manager to add the given key to the
+ * cancelled set. If noone calls register on this key during the rest of
+ * this select() operation, the key will be cancelled. Otherwise, it will be
+ * returned as a result of the register operation.
+ *
+ * @param key
+ * The key to cancel
+ */
+ public void cancel(SelectionKey key)
+ {
+ if (key == null)
+ {
+ throw new NullPointerException();
+ }
+
+ synchronized ( cancelledKeys_ )
+ {
+ cancelledKeys_.add(key);
+ }
+ }
+
+ /**
+ * Registers a new channel with the selector, and attaches the given
+ * SelectionKeyHandler as the handler for the newly created key. Operations
+ * which the hanlder is interested in will be called as available.
+ *
+ * @param channel
+ * The channel to regster with the selector
+ * @param handler
+ * The handler to use for the callbacks
+ * @param ops
+ * The initial interest operations
+ * @return The SelectionKey which uniquely identifies this channel
+ * @exception IOException
+ * DESCRIBE THE EXCEPTION
+ */
+ public SelectionKey register(SelectableChannel channel,
+ SelectionKeyHandler handler, int ops) throws IOException
+ {
+ if ((channel == null) || (handler == null))
+ {
+ throw new NullPointerException();
+ }
+
+ selector_.wakeup();
+ SelectionKey key = channel.register(selector_, ops, handler);
+ synchronized(cancelledKeys_)
+ {
+ cancelledKeys_.remove(key);
+ }
+ selector_.wakeup();
+ return key;
+ }
+
+ public void modifyKeyForRead(SelectionKey key)
+ {
+ if (key == null)
+ {
+ throw new NullPointerException();
+ }
+
+ synchronized(modifyKeysForRead_)
+ {
+ modifyKeysForRead_.add(key);
+ }
+ selector_.wakeup();
+ }
+
+ public void modifyKeyForWrite(SelectionKey key)
+ {
+ if (key == null)
+ {
+ throw new NullPointerException();
+ }
+
+ synchronized( modifyKeysForWrite_ )
+ {
+ modifyKeysForWrite_.add(key);
+ }
+ selector_.wakeup();
+ }
+
+ /**
+ * This method starts the socket manager listening for events. It is
+ * designed to be started when this thread's start() method is invoked.
+ */
+ public void run()
+ {
+ try
+ {
+ // loop while waiting for activity
+ while (true && !Thread.currentThread().interrupted() )
+ {
+ try
+ {
+ doProcess();
+ selector_.select(1000);
+ synchronized( cancelledKeys_ )
+ {
+ if (cancelledKeys_.size() > 0)
+ {
+ SelectionKey[] keys = cancelledKeys_.toArray( new SelectionKey[0]);
+
+ for ( SelectionKey key : keys )
+ {
+ key.cancel();
+ key.channel().close();
+ }
+ cancelledKeys_.clear();
+ }
+ }
+ }
+ catch ( IOException e )
+ {
+ logger_.warn(LogUtil.throwableToString(e));
+ }
+ }
+
+ manager_ = null;
+ }
+ catch (Throwable t)
+ {
+ logger_.error("ERROR (SelectorManager.run): " + t);
+ logger_.error(LogUtil.throwableToString(t));
+ System.exit(-1);
+ }
+ }
+
+ protected void doProcess() throws IOException
+ {
+ doInvocationsForRead();
+ doInvocationsForWrite();
+ doSelections();
+ }
+
+ /**
+ * DESCRIBE THE METHOD
+ *
+ * @exception IOException
+ * DESCRIBE THE EXCEPTION
+ */
+ protected void doSelections() throws IOException
+ {
+ SelectionKey[] keys = selectedKeys();
+
+ for (int i = 0; i < keys.length; i++)
+ {
+ selector_.selectedKeys().remove(keys[i]);
+
+ synchronized (keys[i])
+ {
+ SelectionKeyHandler skh = (SelectionKeyHandler) keys[i]
+ .attachment();
+
+ if (skh != null)
+ {
+ // accept
+ if (keys[i].isValid() && keys[i].isAcceptable())
+ {
+ skh.accept(keys[i]);
+ }
+
+ // connect
+ if (keys[i].isValid() && keys[i].isConnectable())
+ {
+ skh.connect(keys[i]);
+ }
+
+ // read
+ if (keys[i].isValid() && keys[i].isReadable())
+ {
+ skh.read(keys[i]);
+ }
+
+ // write
+ if (keys[i].isValid() && keys[i].isWritable())
+ {
+ skh.write(keys[i]);
+ }
+ }
+ else
+ {
+ keys[i].channel().close();
+ keys[i].cancel();
+ }
+ }
+ }
+ }
+
+ private void doInvocationsForRead()
+ {
+ Iterator<SelectionKey> it;
+ synchronized (modifyKeysForRead_)
+ {
+ it = new ArrayList<SelectionKey>(modifyKeysForRead_).iterator();
+ modifyKeysForRead_.clear();
+ }
+
+ while (it.hasNext())
+ {
+ SelectionKey key = it.next();
+ if (key.isValid() && (key.attachment() != null))
+ {
+ ((SelectionKeyHandler) key.attachment()).modifyKeyForRead(key);
+ }
+ }
+ }
+
+ private void doInvocationsForWrite()
+ {
+ Iterator<SelectionKey> it;
+ synchronized (modifyKeysForWrite_)
+ {
+ it = new ArrayList<SelectionKey>(modifyKeysForWrite_).iterator();
+ modifyKeysForWrite_.clear();
+ }
+
+ while (it.hasNext())
+ {
+ SelectionKey key = it.next();
+ if (key.isValid() && (key.attachment() != null))
+ {
+ ((SelectionKeyHandler) key.attachment()).modifyKeyForWrite(key);
+ }
+ }
+ }
+
+ /**
+ * Selects all of the currenlty selected keys on the selector and returns
+ * the result as an array of keys.
+ *
+ * @return The array of keys
+ * @exception IOException
+ * DESCRIBE THE EXCEPTION
+ */
+ protected SelectionKey[] selectedKeys() throws IOException
+ {
+ return (SelectionKey[]) selector_.selectedKeys().toArray(
+ new SelectionKey[0]);
+ }
+
+ /**
+ * Returns the SelectorManager applications should use.
+ *
+ * @return The SelectorManager which applications should use
+ */
+ public static SelectorManager getSelectorManager()
+ {
+ synchronized (SelectorManager.class)
+ {
+ if (manager_ == null)
+ {
+ manager_ = new SelectorManager("TCP Selector Manager");
+ }
+ }
+ return manager_;
+ }
+
+ public static SelectorManager getUdpSelectorManager()
+ {
+ synchronized (SelectorManager.class)
+ {
+ if (udpManager_ == null)
+ {
+ udpManager_ = new SelectorManager("UDP Selector Manager");
+ }
+ }
+ return udpManager_;
+ }
+
+ /**
+ * Returns whether or not this thread of execution is the selector thread
+ *
+ * @return Whether or not this is the selector thread
+ */
+ public static boolean isSelectorThread()
+ {
+ return (Thread.currentThread() == manager_);
+ }
+}