You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/11/16 20:37:13 UTC

svn commit: r880926 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra/net: IMessagingService.java MessagingService.java

Author: jbellis
Date: Mon Nov 16 19:37:12 2009
New Revision: 880926

URL: http://svn.apache.org/viewvc?rev=880926&view=rev
Log:
r/m redundant interface IMessagingService.  patch by jbellis

Removed:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IMessagingService.java
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=880926&r1=880925&r2=880926&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Mon Nov 16 19:37:12 2009
@@ -41,7 +41,7 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantLock;
 
-public class MessagingService implements IMessagingService
+public class MessagingService
 {
     private static int version_ = 1;
     //TODO: make this parameter dynamic somehow.  Not sure if config is appropriate.
@@ -89,7 +89,7 @@
     
     private static Logger logger_ = Logger.getLogger(MessagingService.class);
     
-    private static IMessagingService messagingService_ = new MessagingService();
+    private static MessagingService messagingService_ = new MessagingService();
 
     private static final int MESSAGE_DESERIALIZE_THREADS = 4;
 
@@ -98,7 +98,7 @@
         return version_;
     }
 
-    public static IMessagingService instance()
+    public static MessagingService instance()
     {   
     	if ( bShutdown_ )
     	{
@@ -184,6 +184,10 @@
         return result;
     }
     
+    /**
+     * Listen on the specified port.
+     * @param localEp InetAddress whose port to listen on.
+     */
     public void listen(InetAddress localEp) throws IOException
     {        
         ServerSocketChannel serverChannel = ServerSocketChannel.open();
@@ -198,6 +202,10 @@
         listenSockets_.put(localEp, key);             
     }
     
+    /**
+     * Listen on the specified port.
+     * @param localEp InetAddress whose port to listen on.
+     */
     public void listenUDP(InetAddress localEp)
     {
         UdpConnection connection = new UdpConnection();
@@ -251,12 +259,22 @@
     	}
     }     
     
+    /**
+     * Register a verb and the corresponding verb handler with the
+     * Messaging Service.
+     * @param type name of the verb.
+     * @param verbHandler handler for the specified verb
+     */
     public void registerVerbHandlers(String type, IVerbHandler verbHandler)
     {
     	checkForReservedVerb(type);
     	verbHandlers_.put(type, verbHandler);
     }
     
+    /**
+     * Deregister all verbhandlers corresponding to localEndPoint.
+     * @param localEndPoint
+     */
     public void deregisterAllVerbHandlers(InetAddress localEndPoint)
     {
         Iterator keys = verbHandlers_.keySet().iterator();
@@ -274,17 +292,34 @@
         }
     }
     
+    /**
+     * Deregister a verbhandler corresponding to the verb from the
+     * Messaging Service.
+     * @param type name of the verb.
+     */
     public void deregisterVerbHandlers(String type)
     {
         verbHandlers_.remove(type);
     }
 
+    /**
+     * This method returns the verb handler associated with the registered
+     * verb. If no handler has been registered then null is returned.
+     * @param type for which the verb handler is sought
+     * @return a reference to IVerbHandler which is the handler for the specified verb
+     */
     public IVerbHandler getVerbHandler(String type)
     {
-        IVerbHandler handler = (IVerbHandler)verbHandlers_.get(type);
-        return handler;
+        return verbHandlers_.get(type);
     }
 
+    /**
+     * Send a message to a given endpoint.
+     * @param message message to be sent.
+     * @param to endpoint to which the message needs to be sent
+     * @return an reference to an IAsyncResult which can be queried for the
+     * response
+     */
     public String sendRR(Message message, InetAddress[] to, IAsyncCallback cb)
     {
         String messageId = message.getMessageId();                        
@@ -296,6 +331,16 @@
         return messageId;
     }
     
+    /**
+     * Send a message to a given endpoint. This method specifies a callback
+     * which is invoked with the actual response.
+     * @param message message to be sent.
+     * @param to endpoint to which the message needs to be sent
+     * @param cb callback interface which is used to pass the responses or
+     *           suggest that a timeout occurred to the invoker of the send().
+     *           suggest that a timeout occurred to the invoker of the send().
+     * @return an reference to message id used to match with the result
+     */
     public String sendRR(Message message, InetAddress to, IAsyncCallback cb)
     {        
         String messageId = message.getMessageId();
@@ -304,6 +349,19 @@
         return messageId;
     }
 
+    /**
+     * Send a message to a given endpoint. The ith element in the <code>messages</code>
+     * array is sent to the ith element in the <code>to</code> array.This method assumes
+     * there is a one-one mapping between the <code>messages</code> array and
+     * the <code>to</code> array. Otherwise an  IllegalArgumentException will be thrown.
+     * This method also informs the MessagingService to wait for at least
+     * <code>howManyResults</code> responses to determine success of failure.
+     * @param messages messages to be sent.
+     * @param to endpoints to which the message needs to be sent
+     * @param cb callback interface which is used to pass the responses or
+     *           suggest that a timeout occured to the invoker of the send().
+     * @return an reference to message id used to match with the result
+     */
     public String sendRR(Message[] messages, InetAddress[] to, IAsyncCallback cb)
     {
         if ( messages.length != to.length )
@@ -320,9 +378,12 @@
         return groupId;
     } 
     
-    /*
-        Use this version for fire and forget style messaging.
-    */
+    /**
+     * Send a message to a given endpoint. This method adheres to the fire and forget
+     * style messaging.
+     * @param message messages to be sent.
+     * @param to endpoint to which the message needs to be sent
+     */
     public void sendOneWay(Message message, InetAddress to)
     {
         // do local deliveries
@@ -374,6 +435,12 @@
         return iar;
     }
     
+    /**
+     * Send a message to a given endpoint. This method adheres to the fire and forget
+     * style messaging.
+     * @param message messages to be sent.
+     * @param to endpoint to which the message needs to be sent
+     */
     public void sendUdpOneWay(Message message, InetAddress to)
     {
         if (message.getFrom().equals(to)) {
@@ -398,7 +465,15 @@
                 connection.close();
         }
     }
-    
+    /**
+     * Stream a file from source to destination. This is highly optimized
+     * to not hold any of the contents of the file in memory.
+     * @param file name of file to stream.
+     * @param startPosition position inside the file
+     * @param total number of bytes to stream
+     * @param to endpoint to which we need to stream the file.
+    */
+
     public void stream(String file, long startPosition, long total, InetAddress from, InetAddress to)
     {
         isStreaming_.set(true);