You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2009/03/27 06:40:42 UTC

svn commit: r759027 - in /incubator/cassandra/trunk/src/org/apache/cassandra/net: AsyncResult.java IAsyncCallback.java IAsyncResult.java IMessagingService.java MessagingService.java ResponseVerbHandler.java

Author: alakshman
Date: Fri Mar 27 05:40:42 2009
New Revision: 759027

URL: http://svn.apache.org/viewvc?rev=759027&view=rev
Log:
Some methods added to aggregate the results from multigets.

Modified:
    incubator/cassandra/trunk/src/org/apache/cassandra/net/AsyncResult.java
    incubator/cassandra/trunk/src/org/apache/cassandra/net/IAsyncCallback.java
    incubator/cassandra/trunk/src/org/apache/cassandra/net/IAsyncResult.java
    incubator/cassandra/trunk/src/org/apache/cassandra/net/IMessagingService.java
    incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingService.java
    incubator/cassandra/trunk/src/org/apache/cassandra/net/ResponseVerbHandler.java

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/net/AsyncResult.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/AsyncResult.java?rev=759027&r1=759026&r2=759027&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/AsyncResult.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/AsyncResult.java Fri Mar 27 05:40:42 2009
@@ -105,14 +105,24 @@
         return result_;
     }
     
-    void result(Object[] result)
+    public List<Object[]> multiget()
+    {
+        throw new UnsupportedOperationException("This operation is not supported in the AsyncResult abstraction.");
+    }
+    
+    public List<Object[]> multiget(long timeout, TimeUnit tu) throws TimeoutException
+    {
+        throw new UnsupportedOperationException("This operation is not supported in the AsyncResult abstraction.");
+    }
+    
+    public void result(Message response)
     {        
         try
         {
             lock_.lock();
             if ( !done_.get() )
-            {
-                result_ = result;
+            {                
+                result_ = response.getMessageBody();
                 done_.set(true);
                 condition_.signal();
             }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/net/IAsyncCallback.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/IAsyncCallback.java?rev=759027&r1=759026&r2=759027&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/IAsyncCallback.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/IAsyncCallback.java Fri Mar 27 05:40:42 2009
@@ -28,4 +28,11 @@
 	 * @param response responses to be returned
 	 */
 	public void response(Message msg);
+    
+    /**
+     * Attach some application specific context to the
+     * callback.
+     * @param o application specific context
+     */
+    public void attachContext(Object o);
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/net/IAsyncResult.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/IAsyncResult.java?rev=759027&r1=759026&r2=759027&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/IAsyncResult.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/IAsyncResult.java Fri Mar 27 05:40:42 2009
@@ -27,8 +27,49 @@
  */
 
 public interface IAsyncResult
-{
-    public Object[] get();
+{    
+    /**
+     * This is used to check if the task has been completed
+     * 
+     * @return true if the task has been completed and false otherwise.
+     */
     public boolean isDone();
-    public Object[] get(long timeout, TimeUnit tu) throws TimeoutException; 
+    
+    /**
+     * Returns the result for the task that was submitted.
+     * @return the result wrapped in an Object[]
+    */
+    public Object[] get();    
+    
+    /**
+     * Same operation as the above get() but allows the calling
+     * thread to specify a timeout.
+     * @param timeout the maximum time to wait
+     * @param tu the time unit of the timeout argument
+     * @return the result wrapped in an Object[]
+    */
+    public Object[] get(long timeout, TimeUnit tu) throws TimeoutException;
+    
+    /**
+     * Returns the result for all tasks that was submitted.
+     * @return the list of results wrapped in an Object[]
+    */
+    public List<Object[]> multiget();
+    
+    /**
+     * Same operation as the above get() but allows the calling
+     * thread to specify a timeout.
+     * @param timeout the maximum time to wait
+     * @param tu the time unit of the timeout argument
+     * @return the result wrapped in an Object[]
+    */
+    public List<Object[]> multiget(long timeout, TimeUnit tu) throws TimeoutException;
+    
+    /**
+     * Store the result obtained for the submitted task.
+     * @param result result wrapped in an Object[]
+     * 
+     * @param result the response message
+     */
+    public void result(Message result);
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/net/IMessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/IMessagingService.java?rev=759027&r1=759026&r2=759027&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/IMessagingService.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/IMessagingService.java Fri Mar 27 05:40:42 2009
@@ -111,10 +111,37 @@
      * @param to endpoints to which the message needs to be sent
      * @param cb callback interface which is used to pass the responses or
      *           suggest that a timeout occured to the invoker of the send().
-     *           suggest that a timeout occured to the invoker of the send().
      * @return an reference to message id used to match with the result
      */
     public String sendRR(Message[] messages, EndPoint[] to, IAsyncCallback cb);
+    
+    /**
+     * Send a message to a given endpoint. The ith element in the <code>messages</code>
+     * array is sent to the ith element in the <code>to</code> array.This method assumes
+     * there is a one-one mapping between the <code>messages</code> array and
+     * the <code>to</code> array. Otherwise an  IllegalArgumentException will be thrown.
+     * This method also informs the MessagingService to wait for at least
+     * <code>howManyResults</code> responses to determine success of failure.
+     * @param messages messages to be sent.
+     * @param to endpoints to which the message needs to be sent
+     * @return an reference to IAsyncResult
+     */
+    public IAsyncResult sendRR(Message[] messages, EndPoint[] to);
+    
+    /**
+     * Send a message to a given endpoint. The ith element in the <code>messages</code>
+     * array is sent to the ith element in the <code>to</code> array.This method assumes
+     * there is a one-one mapping between the <code>messages</code> array and
+     * the <code>to</code> array. Otherwise an  IllegalArgumentException will be thrown.
+     * The idea is that multi-groups of messages are grouped as one logical message
+     * whose results are harnessed via the <i>IAsyncResult</i>
+     * @param messages groups of grouped messages.
+     * @param to destination for the groups of messages
+     * @param the callback handler to be invoked for the responses
+     * @return the group id which is basically useless - it is only returned for API's
+     *         to look compatible.
+     */
+    public String sendRR(Message[][] messages, EndPoint[][] to, IAsyncCallback cb);
 
     /**
      * Send a message to a given endpoint. This method adheres to the fire and forget

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingService.java?rev=759027&r1=759026&r2=759027&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingService.java Fri Mar 27 05:40:42 2009
@@ -417,7 +417,54 @@
             sendOneWay(messages[i], to[i]);
         }
         return groupId;
-    }    
+    } 
+    
+    public IAsyncResult sendRR(Message[] messages, EndPoint[] to)
+    {
+        if ( messages.length != to.length )
+        {
+            throw new IllegalArgumentException("Number of messages and the number of endpoints need to be same.");
+        }
+        
+        IAsyncResult iar = new MultiAsyncResult(messages.length);
+        String groupId = GuidGenerator.guid();
+        taskCompletionMap_.put(groupId, iar);
+        for ( int i = 0; i < messages.length; ++i )
+        {
+            messages[i].setMessageId(groupId);
+            sendOneWay(messages[i], to[i]);
+        }
+        
+        return iar;
+    }
+    
+    public String sendRR(Message[][] messages, EndPoint[][] to, IAsyncCallback cb)
+    {
+        if ( messages.length != to.length )
+        {
+            throw new IllegalArgumentException("Number of messages and the number of endpoints need to be same.");
+        }
+        
+        int length = messages.length;
+        String[] gids = new String[length];
+        /* Generate the requisite GUID's */
+        for ( int i = 0; i < length; ++i )
+        {
+            gids[i] = GuidGenerator.guid();
+        }
+        /* attach this context to the callback */
+        cb.attachContext(gids);
+        for ( int i = 0; i < length; ++i )
+        {
+            callbackMap_.put(gids[i], cb);
+            for ( int j = 0; j < messages[i].length; ++j )
+            {
+                messages[i][j].setMessageId(gids[i]);
+                sendOneWay(messages[i][j], to[i][j]);
+            }            
+        }      
+        return gids[0];
+    }
 
     /*
         Use this version for fire and forget style messaging.

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/net/ResponseVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/ResponseVerbHandler.java?rev=759027&r1=759026&r2=759027&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/ResponseVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/ResponseVerbHandler.java Fri Mar 27 05:40:42 2009
@@ -39,11 +39,11 @@
         }
         else
         {            
-            AsyncResult ar = (AsyncResult)MessagingService.getAsyncResult(messageId);
+            IAsyncResult ar = MessagingService.getAsyncResult(messageId);
             if ( ar != null )
             {
                 logger_.info("Processing response on an async result from " + message.getFrom());
-                ar.result(message.getMessageBody());
+                ar.result(message);
             }
         }
     }