You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2009/03/27 06:40:42 UTC
svn commit: r759027 - in
/incubator/cassandra/trunk/src/org/apache/cassandra/net: AsyncResult.java
IAsyncCallback.java IAsyncResult.java IMessagingService.java
MessagingService.java ResponseVerbHandler.java
Author: alakshman
Date: Fri Mar 27 05:40:42 2009
New Revision: 759027
URL: http://svn.apache.org/viewvc?rev=759027&view=rev
Log:
Some methods added to aggregate the results from multigets.
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/net/AsyncResult.java
incubator/cassandra/trunk/src/org/apache/cassandra/net/IAsyncCallback.java
incubator/cassandra/trunk/src/org/apache/cassandra/net/IAsyncResult.java
incubator/cassandra/trunk/src/org/apache/cassandra/net/IMessagingService.java
incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingService.java
incubator/cassandra/trunk/src/org/apache/cassandra/net/ResponseVerbHandler.java
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/net/AsyncResult.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/AsyncResult.java?rev=759027&r1=759026&r2=759027&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/AsyncResult.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/AsyncResult.java Fri Mar 27 05:40:42 2009
@@ -105,14 +105,24 @@
return result_;
}
- void result(Object[] result)
+ public List<Object[]> multiget()
+ {
+ throw new UnsupportedOperationException("This operation is not supported in the AsyncResult abstraction.");
+ }
+
+ public List<Object[]> multiget(long timeout, TimeUnit tu) throws TimeoutException
+ {
+ throw new UnsupportedOperationException("This operation is not supported in the AsyncResult abstraction.");
+ }
+
+ public void result(Message response)
{
try
{
lock_.lock();
if ( !done_.get() )
- {
- result_ = result;
+ {
+ result_ = response.getMessageBody();
done_.set(true);
condition_.signal();
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/net/IAsyncCallback.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/IAsyncCallback.java?rev=759027&r1=759026&r2=759027&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/IAsyncCallback.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/IAsyncCallback.java Fri Mar 27 05:40:42 2009
@@ -28,4 +28,11 @@
* @param response responses to be returned
*/
public void response(Message msg);
+
+ /**
+ * Attach some application specific context to the
+ * callback.
+ * @param o application specific context
+ */
+ public void attachContext(Object o);
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/net/IAsyncResult.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/IAsyncResult.java?rev=759027&r1=759026&r2=759027&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/IAsyncResult.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/IAsyncResult.java Fri Mar 27 05:40:42 2009
@@ -27,8 +27,49 @@
*/
public interface IAsyncResult
-{
- public Object[] get();
+{
+ /**
+ * This is used to check if the task has been completed
+ *
+ * @return true if the task has been completed and false otherwise.
+ */
public boolean isDone();
- public Object[] get(long timeout, TimeUnit tu) throws TimeoutException;
+
+ /**
+ * Returns the result for the task that was submitted.
+ * @return the result wrapped in an Object[]
+ */
+ public Object[] get();
+
+ /**
+ * Same operation as the above get() but allows the calling
+ * thread to specify a timeout.
+ * @param timeout the maximum time to wait
+ * @param tu the time unit of the timeout argument
+ * @return the result wrapped in an Object[]
+ */
+ public Object[] get(long timeout, TimeUnit tu) throws TimeoutException;
+
+ /**
+ * Returns the result for all tasks that was submitted.
+ * @return the list of results wrapped in an Object[]
+ */
+ public List<Object[]> multiget();
+
+ /**
+ * Same operation as the above get() but allows the calling
+ * thread to specify a timeout.
+ * @param timeout the maximum time to wait
+ * @param tu the time unit of the timeout argument
+ * @return the result wrapped in an Object[]
+ */
+ public List<Object[]> multiget(long timeout, TimeUnit tu) throws TimeoutException;
+
+ /**
+ * Store the result obtained for the submitted task.
+ * @param result result wrapped in an Object[]
+ *
+ * @param result the response message
+ */
+ public void result(Message result);
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/net/IMessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/IMessagingService.java?rev=759027&r1=759026&r2=759027&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/IMessagingService.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/IMessagingService.java Fri Mar 27 05:40:42 2009
@@ -111,10 +111,37 @@
* @param to endpoints to which the message needs to be sent
* @param cb callback interface which is used to pass the responses or
* suggest that a timeout occured to the invoker of the send().
- * suggest that a timeout occured to the invoker of the send().
* @return an reference to message id used to match with the result
*/
public String sendRR(Message[] messages, EndPoint[] to, IAsyncCallback cb);
+
+ /**
+ * Send a message to a given endpoint. The ith element in the <code>messages</code>
+ * array is sent to the ith element in the <code>to</code> array.This method assumes
+ * there is a one-one mapping between the <code>messages</code> array and
+ * the <code>to</code> array. Otherwise an IllegalArgumentException will be thrown.
+ * This method also informs the MessagingService to wait for at least
+ * <code>howManyResults</code> responses to determine success of failure.
+ * @param messages messages to be sent.
+ * @param to endpoints to which the message needs to be sent
+ * @return an reference to IAsyncResult
+ */
+ public IAsyncResult sendRR(Message[] messages, EndPoint[] to);
+
+ /**
+ * Send a message to a given endpoint. The ith element in the <code>messages</code>
+ * array is sent to the ith element in the <code>to</code> array.This method assumes
+ * there is a one-one mapping between the <code>messages</code> array and
+ * the <code>to</code> array. Otherwise an IllegalArgumentException will be thrown.
+ * The idea is that multi-groups of messages are grouped as one logical message
+ * whose results are harnessed via the <i>IAsyncResult</i>
+ * @param messages groups of grouped messages.
+ * @param to destination for the groups of messages
+ * @param the callback handler to be invoked for the responses
+ * @return the group id which is basically useless - it is only returned for API's
+ * to look compatible.
+ */
+ public String sendRR(Message[][] messages, EndPoint[][] to, IAsyncCallback cb);
/**
* Send a message to a given endpoint. This method adheres to the fire and forget
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingService.java?rev=759027&r1=759026&r2=759027&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingService.java Fri Mar 27 05:40:42 2009
@@ -417,7 +417,54 @@
sendOneWay(messages[i], to[i]);
}
return groupId;
- }
+ }
+
+ public IAsyncResult sendRR(Message[] messages, EndPoint[] to)
+ {
+ if ( messages.length != to.length )
+ {
+ throw new IllegalArgumentException("Number of messages and the number of endpoints need to be same.");
+ }
+
+ IAsyncResult iar = new MultiAsyncResult(messages.length);
+ String groupId = GuidGenerator.guid();
+ taskCompletionMap_.put(groupId, iar);
+ for ( int i = 0; i < messages.length; ++i )
+ {
+ messages[i].setMessageId(groupId);
+ sendOneWay(messages[i], to[i]);
+ }
+
+ return iar;
+ }
+
+ public String sendRR(Message[][] messages, EndPoint[][] to, IAsyncCallback cb)
+ {
+ if ( messages.length != to.length )
+ {
+ throw new IllegalArgumentException("Number of messages and the number of endpoints need to be same.");
+ }
+
+ int length = messages.length;
+ String[] gids = new String[length];
+ /* Generate the requisite GUID's */
+ for ( int i = 0; i < length; ++i )
+ {
+ gids[i] = GuidGenerator.guid();
+ }
+ /* attach this context to the callback */
+ cb.attachContext(gids);
+ for ( int i = 0; i < length; ++i )
+ {
+ callbackMap_.put(gids[i], cb);
+ for ( int j = 0; j < messages[i].length; ++j )
+ {
+ messages[i][j].setMessageId(gids[i]);
+ sendOneWay(messages[i][j], to[i][j]);
+ }
+ }
+ return gids[0];
+ }
/*
Use this version for fire and forget style messaging.
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/net/ResponseVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/ResponseVerbHandler.java?rev=759027&r1=759026&r2=759027&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/ResponseVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/ResponseVerbHandler.java Fri Mar 27 05:40:42 2009
@@ -39,11 +39,11 @@
}
else
{
- AsyncResult ar = (AsyncResult)MessagingService.getAsyncResult(messageId);
+ IAsyncResult ar = MessagingService.getAsyncResult(messageId);
if ( ar != null )
{
logger_.info("Processing response on an async result from " + message.getFrom());
- ar.result(message.getMessageBody());
+ ar.result(message);
}
}
}