You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/11/17 14:33:54 UTC
svn commit: r881280 -
/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Author: jbellis
Date: Tue Nov 17 13:33:53 2009
New Revision: 881280
URL: http://svn.apache.org/viewvc?rev=881280&view=rev
Log:
make local insert() skip MessagingService and fix HH write
patch by jbellis; reviewed by Jaakko Laine for CASSANDRA-558
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=881280&r1=881279&r2=881280&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Tue Nov 17 13:33:53 2009
@@ -19,6 +19,7 @@
import java.io.IOError;
import java.io.IOException;
+import java.io.IOError;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -71,65 +72,71 @@
}
/**
- * This method is responsible for creating Message to be
- * sent over the wire to N replicas where some of the replicas
- * may be hints.
- */
- private static Map<InetAddress, Message> createWriteMessages(RowMutation rm, Map<InetAddress, InetAddress> endpointMap) throws IOException
- {
- Map<InetAddress, Message> messageMap = new HashMap<InetAddress, Message>();
- Message message = rm.makeRowMutationMessage();
-
- for (Map.Entry<InetAddress, InetAddress> entry : endpointMap.entrySet())
- {
- InetAddress target = entry.getKey();
- InetAddress hintedTarget = entry.getValue();
- if (target.equals(hintedTarget))
- {
- messageMap.put(target, message);
- }
- else
- {
- Message hintedMessage = rm.makeRowMutationMessage();
- hintedMessage.addHeader(RowMutation.HINT, hintedTarget.getAddress());
- if (logger.isDebugEnabled())
- logger.debug("Sending the hint of " + hintedTarget + " to " + target);
- messageMap.put(hintedTarget, hintedMessage);
- }
- }
- return messageMap;
- }
-
- /**
* Use this method to have this RowMutation applied
* across all replicas. This method will take care
* of the possibility of a replica being down and hint
- * the data across to some other replica.
+ * the data across to some other replica.
+ *
+ * This is the ZERO consistency level. We do not wait for replies.
+ *
* @param rm the mutation to be applied across the replicas
*/
- public static void insert(RowMutation rm)
+ public static void insert(final RowMutation rm)
{
- /*
- * Get the N nodes from storage service where the data needs to be
- * replicated
- * Construct a message for write
- * Send them asynchronously to the replicas.
- */
-
long startTime = System.currentTimeMillis();
try
{
List<InetAddress> naturalEndpoints = StorageService.instance().getNaturalEndpoints(rm.key());
- // (This is the ZERO consistency level, so user doesn't care if we don't really have N destinations available.)
Map<InetAddress, InetAddress> endpointMap = StorageService.instance().getHintedEndpointMap(rm.key(), naturalEndpoints);
- Map<InetAddress, Message> messageMap = createWriteMessages(rm, endpointMap);
- for (Map.Entry<InetAddress, Message> entry : messageMap.entrySet())
- {
- Message message = entry.getValue();
- InetAddress endpoint = entry.getKey();
- if (logger.isDebugEnabled())
- logger.debug("insert writing key " + rm.key() + " to " + message.getMessageId() + "@" + endpoint);
- MessagingService.instance().sendOneWay(message, endpoint);
+ Message unhintedMessage = null; // lazy initialize for non-local, unhinted writes
+
+ // 3 cases:
+ // 1. local, unhinted write: run directly on write stage
+ // 2. non-local, unhinted write: send row mutation message
+ // 3. hinted write: add hint header, and send message
+ for (Map.Entry<InetAddress, InetAddress> entry : endpointMap.entrySet())
+ {
+ InetAddress target = entry.getKey();
+ InetAddress hintedTarget = entry.getValue();
+ if (target.equals(hintedTarget))
+ {
+ if (target.equals(FBUtilities.getLocalAddress()))
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("insert writing local key " + rm.key());
+ Runnable runnable = new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ rm.apply();
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+ };
+ StageManager.getStage(StageManager.mutationStage_).execute(runnable);
+ }
+ else
+ {
+ if (unhintedMessage == null)
+ unhintedMessage = rm.makeRowMutationMessage();
+ if (logger.isDebugEnabled())
+ logger.debug("insert writing key " + rm.key() + " to " + unhintedMessage.getMessageId() + "@" + target);
+ MessagingService.instance().sendOneWay(unhintedMessage, target);
+ }
+ }
+ else
+ {
+ Message hintedMessage = rm.makeRowMutationMessage();
+ hintedMessage.addHeader(RowMutation.HINT, target.getAddress());
+ if (logger.isDebugEnabled())
+ logger.debug("insert writing key " + rm.key() + " to " + unhintedMessage.getMessageId() + "@" + hintedTarget + " for " + target);
+ MessagingService.instance().sendOneWay(hintedMessage, hintedTarget);
+ }
}
}
catch (IOException e)