You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/11/16 20:37:13 UTC
svn commit: r880926 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra/net:
IMessagingService.java MessagingService.java
Author: jbellis
Date: Mon Nov 16 19:37:12 2009
New Revision: 880926
URL: http://svn.apache.org/viewvc?rev=880926&view=rev
Log:
r/m redundant interface IMessagingService. patch by jbellis
Removed:
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IMessagingService.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=880926&r1=880925&r2=880926&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Mon Nov 16 19:37:12 2009
@@ -41,7 +41,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
-public class MessagingService implements IMessagingService
+public class MessagingService
{
private static int version_ = 1;
//TODO: make this parameter dynamic somehow. Not sure if config is appropriate.
@@ -89,7 +89,7 @@
private static Logger logger_ = Logger.getLogger(MessagingService.class);
- private static IMessagingService messagingService_ = new MessagingService();
+ private static MessagingService messagingService_ = new MessagingService();
private static final int MESSAGE_DESERIALIZE_THREADS = 4;
@@ -98,7 +98,7 @@
return version_;
}
- public static IMessagingService instance()
+ public static MessagingService instance()
{
if ( bShutdown_ )
{
@@ -184,6 +184,10 @@
return result;
}
+ /**
+ * Listen on the specified port.
+ * @param localEp InetAddress whose port to listen on.
+ */
public void listen(InetAddress localEp) throws IOException
{
ServerSocketChannel serverChannel = ServerSocketChannel.open();
@@ -198,6 +202,10 @@
listenSockets_.put(localEp, key);
}
+ /**
+ * Listen on the specified port.
+ * @param localEp InetAddress whose port to listen on.
+ */
public void listenUDP(InetAddress localEp)
{
UdpConnection connection = new UdpConnection();
@@ -251,12 +259,22 @@
}
}
+ /**
+ * Register a verb and the corresponding verb handler with the
+ * Messaging Service.
+ * @param type name of the verb.
+ * @param verbHandler handler for the specified verb
+ */
public void registerVerbHandlers(String type, IVerbHandler verbHandler)
{
checkForReservedVerb(type);
verbHandlers_.put(type, verbHandler);
}
+ /**
+ * Deregister all verbhandlers corresponding to localEndPoint.
+ * @param localEndPoint
+ */
public void deregisterAllVerbHandlers(InetAddress localEndPoint)
{
Iterator keys = verbHandlers_.keySet().iterator();
@@ -274,17 +292,34 @@
}
}
+ /**
+ * Deregister a verbhandler corresponding to the verb from the
+ * Messaging Service.
+ * @param type name of the verb.
+ */
public void deregisterVerbHandlers(String type)
{
verbHandlers_.remove(type);
}
+ /**
+ * This method returns the verb handler associated with the registered
+ * verb. If no handler has been registered then null is returned.
+ * @param type for which the verb handler is sought
+ * @return a reference to IVerbHandler which is the handler for the specified verb
+ */
public IVerbHandler getVerbHandler(String type)
{
- IVerbHandler handler = (IVerbHandler)verbHandlers_.get(type);
- return handler;
+ return verbHandlers_.get(type);
}
+ /**
+ * Send a message to a given endpoint.
+ * @param message message to be sent.
+ * @param to endpoint to which the message needs to be sent
+ * @return an reference to an IAsyncResult which can be queried for the
+ * response
+ */
public String sendRR(Message message, InetAddress[] to, IAsyncCallback cb)
{
String messageId = message.getMessageId();
@@ -296,6 +331,16 @@
return messageId;
}
+ /**
+ * Send a message to a given endpoint. This method specifies a callback
+ * which is invoked with the actual response.
+ * @param message message to be sent.
+ * @param to endpoint to which the message needs to be sent
+ * @param cb callback interface which is used to pass the responses or
+ * suggest that a timeout occurred to the invoker of the send().
+ * suggest that a timeout occurred to the invoker of the send().
+ * @return an reference to message id used to match with the result
+ */
public String sendRR(Message message, InetAddress to, IAsyncCallback cb)
{
String messageId = message.getMessageId();
@@ -304,6 +349,19 @@
return messageId;
}
+ /**
+ * Send a message to a given endpoint. The ith element in the <code>messages</code>
+ * array is sent to the ith element in the <code>to</code> array.This method assumes
+ * there is a one-one mapping between the <code>messages</code> array and
+ * the <code>to</code> array. Otherwise an IllegalArgumentException will be thrown.
+ * This method also informs the MessagingService to wait for at least
+ * <code>howManyResults</code> responses to determine success of failure.
+ * @param messages messages to be sent.
+ * @param to endpoints to which the message needs to be sent
+ * @param cb callback interface which is used to pass the responses or
+ * suggest that a timeout occured to the invoker of the send().
+ * @return an reference to message id used to match with the result
+ */
public String sendRR(Message[] messages, InetAddress[] to, IAsyncCallback cb)
{
if ( messages.length != to.length )
@@ -320,9 +378,12 @@
return groupId;
}
- /*
- Use this version for fire and forget style messaging.
- */
+ /**
+ * Send a message to a given endpoint. This method adheres to the fire and forget
+ * style messaging.
+ * @param message messages to be sent.
+ * @param to endpoint to which the message needs to be sent
+ */
public void sendOneWay(Message message, InetAddress to)
{
// do local deliveries
@@ -374,6 +435,12 @@
return iar;
}
+ /**
+ * Send a message to a given endpoint. This method adheres to the fire and forget
+ * style messaging.
+ * @param message messages to be sent.
+ * @param to endpoint to which the message needs to be sent
+ */
public void sendUdpOneWay(Message message, InetAddress to)
{
if (message.getFrom().equals(to)) {
@@ -398,7 +465,15 @@
connection.close();
}
}
-
+ /**
+ * Stream a file from source to destination. This is highly optimized
+ * to not hold any of the contents of the file in memory.
+ * @param file name of file to stream.
+ * @param startPosition position inside the file
+ * @param total number of bytes to stream
+ * @param to endpoint to which we need to stream the file.
+ */
+
public void stream(String file, long startPosition, long total, InetAddress from, InetAddress to)
{
isStreaming_.set(true);