You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/08/16 18:28:44 UTC

svn commit: r986017 - in /cassandra/trunk: ./ src/java/org/apache/cassandra/avro/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/locator/ src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/thrift/

Author: jbellis
Date: Mon Aug 16 16:28:43 2010
New Revision: 986017

URL: http://svn.apache.org/viewvc?rev=986017&view=rev
Log:
merge StorageProxy.mutate, mutateBlocking.  patch by Sylvain Lebresne; reviewed by jbellis for CASSANDRA-1396

Added:
    cassandra/trunk/src/java/org/apache/cassandra/service/IWriteResponseHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/service/NoConsistencyWriteResponseHandler.java
Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
    cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
    cassandra/trunk/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=986017&r1=986016&r2=986017&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Mon Aug 16 16:28:43 2010
@@ -10,6 +10,7 @@ dev
  * rename RackAwareStrategy to OldNetworkTopologyStrategy, RackUnawareStrategy 
    to SimpleStrategy, DatacenterShardStrategy to NetworkTopologyStrategy,
    AbstractRackAwareSnitch to AbstractNetworkTopologySnitch (CASSANDRA-1392)
+ * merge StorageProxy.mutate, mutateBlocking (CASSANDRA-1396)
 
 
 0.7-beta1

Modified: cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java?rev=986017&r1=986016&r2=986017&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java Mon Aug 16 16:28:43 2010
@@ -421,37 +421,22 @@ public class CassandraServer implements 
 
     private void doInsert(ConsistencyLevel consistency, RowMutation rm) throws UnavailableException, TimedOutException
     {
-        if (consistency != ConsistencyLevel.ZERO)
+        try
         {
-            try
-            {
-                schedule();
-                StorageProxy.mutateBlocking(Arrays.asList(rm), thriftConsistencyLevel(consistency));
-            }
-            catch (TimeoutException e)
-            {
-                throw new TimedOutException();
-            }
-            catch (org.apache.cassandra.thrift.UnavailableException thriftE)
-            {
-                throw new UnavailableException();
-            }
-            finally
-            {
-                release();
-            }
+            schedule();
+            StorageProxy.mutate(Arrays.asList(rm), thriftConsistencyLevel(consistency));
         }
-        else
+        catch (TimeoutException e)
         {
-            try
-            {
-                schedule();
-                StorageProxy.mutate(Arrays.asList(rm));
-            }
-            finally
-            {
-                release();
-            }
+            throw new TimedOutException();
+        }
+        catch (org.apache.cassandra.thrift.UnavailableException thriftE)
+        {
+            throw new UnavailableException();
+        }
+        finally
+        {
+            release();
         }
     }
 
@@ -479,38 +464,23 @@ public class CassandraServer implements 
             rowMutations.add(getRowMutationFromMutations(curKeyspace.get(), pair.key.array(), cfToMutations));
         }
         
-        if (consistencyLevel == ConsistencyLevel.ZERO)
+        try
         {
-            try
-            {
-                schedule();
-                StorageProxy.mutate(rowMutations);
-            }
-            finally
-            {
-                release();
-            }
+            schedule();
+            StorageProxy.mutate(rowMutations, thriftConsistencyLevel(consistencyLevel));
         }
-        else
+        catch (TimeoutException te)
         {
-            try
-            {
-                schedule();
-                StorageProxy.mutateBlocking(rowMutations, thriftConsistencyLevel(consistencyLevel));
-            }
-            catch (TimeoutException te)
-            {
-                throw newTimedOutException();
-            }
-            // FIXME: StorageProxy.mutateBlocking throws Thrift's UnavailableException
-            catch (org.apache.cassandra.thrift.UnavailableException ue)
-            {
-                throw newUnavailableException();
-            }
-            finally
-            {
-                release();
-            }
+            throw newTimedOutException();
+        }
+        // FIXME: StorageProxy.mutate throws Thrift's UnavailableException
+        catch (org.apache.cassandra.thrift.UnavailableException ue)
+        {
+            throw newUnavailableException();
+        }
+        finally
+        {
+            release();
         }
         
         return null;

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=986017&r1=986016&r2=986017&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Mon Aug 16 16:28:43 2010
@@ -40,6 +40,7 @@ import org.apache.cassandra.net.Messagin
 import org.apache.cassandra.service.DigestMismatchException;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.WriteResponseHandler;
+import org.apache.cassandra.service.IWriteResponseHandler;
 import org.apache.cassandra.thrift.InvalidRequestException;
 import org.apache.cassandra.utils.WrappedRunnable;
 import org.cliffc.high_scale_lib.NonBlockingHashSet;
@@ -122,7 +123,7 @@ public class HintedHandOffManager
             RowMutation rm = new RowMutation(tableName, key);
             rm.add(cf);
             Message message = rm.makeRowMutationMessage();
-            WriteResponseHandler responseHandler = new WriteResponseHandler(endpoint);
+            IWriteResponseHandler responseHandler =  WriteResponseHandler.create(endpoint);
             MessagingService.instance.sendRR(message, new InetAddress[] { endpoint }, responseHandler);
             try
             {

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=986017&r1=986016&r2=986017&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java Mon Aug 16 16:28:43 2010
@@ -104,11 +104,11 @@ public abstract class AbstractReplicatio
      */
     public abstract Set<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata) throws IllegalStateException;
 
-    public AbstractWriteResponseHandler getWriteResponseHandler(Collection<InetAddress> writeEndpoints,
-                                                                Multimap<InetAddress, InetAddress> hintedEndpoints,
-                                                                ConsistencyLevel consistencyLevel)
+    public IWriteResponseHandler getWriteResponseHandler(Collection<InetAddress> writeEndpoints,
+                                                         Multimap<InetAddress, InetAddress> hintedEndpoints,
+                                                         ConsistencyLevel consistencyLevel)
     {
-        return new WriteResponseHandler(writeEndpoints, hintedEndpoints, consistencyLevel, table);
+        return WriteResponseHandler.create(writeEndpoints, hintedEndpoints, consistencyLevel, table);
     }
 
     // instance method so test subclasses can override it

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java?rev=986017&r1=986016&r2=986017&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java Mon Aug 16 16:28:43 2010
@@ -152,16 +152,16 @@ public class NetworkTopologyStrategy ext
      * return a DCQRH with a map of all the DC rep factor.
      */
     @Override
-    public AbstractWriteResponseHandler getWriteResponseHandler(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistency_level)
+    public IWriteResponseHandler getWriteResponseHandler(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistency_level)
     {
         if (consistency_level == ConsistencyLevel.DCQUORUM)
         {
             // block for in this context will be localnodes block.
-            return new DatacenterWriteResponseHandler(writeEndpoints, hintedEndpoints, consistency_level, table);
+            return DatacenterWriteResponseHandler.create(writeEndpoints, hintedEndpoints, consistency_level, table);
         }
         else if (consistency_level == ConsistencyLevel.DCQUORUMSYNC)
         {
-            return new DatacenterSyncWriteResponseHandler(writeEndpoints, hintedEndpoints, consistency_level, table);
+            return DatacenterSyncWriteResponseHandler.create(writeEndpoints, hintedEndpoints, consistency_level, table);
         }
         return super.getWriteResponseHandler(writeEndpoints, hintedEndpoints, consistency_level);
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java?rev=986017&r1=986016&r2=986017&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java Mon Aug 16 16:28:43 2010
@@ -28,13 +28,13 @@ import java.util.concurrent.TimeoutExcep
 
 import com.google.common.collect.Multimap;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.thrift.ConsistencyLevel;
 import org.apache.cassandra.thrift.UnavailableException;
 import org.apache.cassandra.utils.SimpleCondition;
 
-public abstract class AbstractWriteResponseHandler implements IAsyncCallback
+public abstract class AbstractWriteResponseHandler implements IWriteResponseHandler
 {
     protected final SimpleCondition condition = new SimpleCondition();
     protected final long startTime;
@@ -42,7 +42,7 @@ public abstract class AbstractWriteRespo
     protected final Multimap<InetAddress, InetAddress> hintedEndpoints;
     protected final ConsistencyLevel consistencyLevel;
 
-    public AbstractWriteResponseHandler(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistencyLevel)
+    protected AbstractWriteResponseHandler(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistencyLevel)
     {
         startTime = System.currentTimeMillis();
         this.consistencyLevel = consistencyLevel;
@@ -69,6 +69,13 @@ public abstract class AbstractWriteRespo
         }
     }
 
+    public void addHintCallback(Message hintedMessage, InetAddress destination)
+    {
+        // (non-destination hints are part of the callback and count towards consistency only under CL.ANY)
+        if (writeEndpoints.contains(destination) || consistencyLevel == ConsistencyLevel.ANY)
+            MessagingService.instance.addCallback(this, hintedMessage.getMessageId());
+    }
+
     /** null message means "response from local write" */
     public abstract void response(Message msg);
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java?rev=986017&r1=986016&r2=986017&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java Mon Aug 16 16:28:43 2010
@@ -56,7 +56,7 @@ public class DatacenterSyncWriteResponse
 	private final NetworkTopologyStrategy strategy;
     private HashMap<String, AtomicInteger> responses = new HashMap<String, AtomicInteger>();
 
-    public DatacenterSyncWriteResponseHandler(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistencyLevel, String table)
+    protected DatacenterSyncWriteResponseHandler(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistencyLevel, String table)
     {
         // Response is been managed by the map so make it 1 for the superclass.
         super(writeEndpoints, hintedEndpoints, consistencyLevel);
@@ -71,6 +71,18 @@ public class DatacenterSyncWriteResponse
         }
     }
 
+    public static IWriteResponseHandler create(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistencyLevel, String table)
+    {
+        if (consistencyLevel == ConsistencyLevel.ZERO)
+        {
+            return NoConsistencyWriteResponseHandler.instance;
+        }
+        else
+        {
+            return new DatacenterSyncWriteResponseHandler(writeEndpoints, hintedEndpoints, consistencyLevel, table);
+        }
+    }
+
     public void response(Message message)
     {
         String dataCenter = message == null

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java?rev=986017&r1=986016&r2=986017&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java Mon Aug 16 16:28:43 2010
@@ -50,12 +50,23 @@ public class DatacenterWriteResponseHand
         localdc = snitch.getDatacenter(FBUtilities.getLocalAddress());
     }
 
-    public DatacenterWriteResponseHandler(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistencyLevel, String table)
+    protected DatacenterWriteResponseHandler(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistencyLevel, String table)
     {
         super(writeEndpoints, hintedEndpoints, consistencyLevel, table);
         assert consistencyLevel == ConsistencyLevel.DCQUORUM;
     }
 
+    public static IWriteResponseHandler create(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistencyLevel, String table)
+    {
+        if (consistencyLevel == ConsistencyLevel.ZERO)
+        {
+            return NoConsistencyWriteResponseHandler.instance;
+        }
+        else
+        {
+            return new DatacenterWriteResponseHandler(writeEndpoints, hintedEndpoints, consistencyLevel, table);
+        }
+    }
 
     @Override
     protected int determineBlockFor(String table)

Added: cassandra/trunk/src/java/org/apache/cassandra/service/IWriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/IWriteResponseHandler.java?rev=986017&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/IWriteResponseHandler.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/IWriteResponseHandler.java Mon Aug 16 16:28:43 2010
@@ -0,0 +1,35 @@
+package org.apache.cassandra.service;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import java.net.InetAddress;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.thrift.UnavailableException;
+
+public interface IWriteResponseHandler extends IAsyncCallback
+{
+    public void get() throws TimeoutException;
+    public void addHintCallback(Message hintedMessage, InetAddress destination);
+    public void assureSufficientLiveNodes() throws UnavailableException;
+}

Added: cassandra/trunk/src/java/org/apache/cassandra/service/NoConsistencyWriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/NoConsistencyWriteResponseHandler.java?rev=986017&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/NoConsistencyWriteResponseHandler.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/NoConsistencyWriteResponseHandler.java Mon Aug 16 16:28:43 2010
@@ -0,0 +1,20 @@
+package org.apache.cassandra.service;
+
+import java.net.InetAddress;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.thrift.UnavailableException;
+
+class NoConsistencyWriteResponseHandler implements IWriteResponseHandler
+{
+    static final IWriteResponseHandler instance = new NoConsistencyWriteResponseHandler();
+
+    public void get() throws TimeoutException {}
+
+    public void addHintCallback(Message hintedMessage, InetAddress destination) {}
+
+    public void response(Message msg) {}
+
+    public void assureSufficientLiveNodes() throws UnavailableException {}
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=986017&r1=986016&r2=986017&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Mon Aug 16 16:28:43 2010
@@ -84,100 +84,13 @@ public class StorageProxy implements Sto
      * of the possibility of a replica being down and hint
      * the data across to some other replica.
      *
-     * This is the ZERO consistency level. We do not wait for replies.
-     *
      * @param mutations the mutations to be applied across the replicas
+     * @param consistency_level the consistency level for the operation
     */
-    public static void mutate(List<RowMutation> mutations)
+    public static void mutate(List<RowMutation> mutations, ConsistencyLevel consistency_level) throws UnavailableException, TimeoutException
     {
         long startTime = System.nanoTime();
-        try
-        {
-            StorageService ss = StorageService.instance;
-            for (final RowMutation rm: mutations)
-            {
-                try
-                {
-                    String table = rm.getTable();
-                    AbstractReplicationStrategy rs = ss.getReplicationStrategy(table);
-
-                    List<InetAddress> naturalEndpoints = ss.getNaturalEndpoints(table, rm.key());
-                    Multimap<InetAddress,InetAddress> hintedEndpoints = rs.getHintedEndpoints(naturalEndpoints);
-                    Message unhintedMessage = null; // lazy initialize for non-local, unhinted writes
-
-                    // 3 cases:
-                    // 1. local, unhinted write: run directly on write stage
-                    // 2. non-local, unhinted write: send row mutation message
-                    // 3. hinted write: add hint header, and send message
-                    for (Map.Entry<InetAddress, Collection<InetAddress>> entry : hintedEndpoints.asMap().entrySet())
-                    {
-                        InetAddress destination = entry.getKey();
-                        Collection<InetAddress> targets = entry.getValue();
-                        if (targets.size() == 1 && targets.iterator().next().equals(destination))
-                        {
-                            // unhinted writes
-                            if (destination.equals(FBUtilities.getLocalAddress()))
-                            {
-                                if (logger.isDebugEnabled())
-                                    logger.debug("insert writing local " + rm.toString(true));
-                                Runnable runnable = new WrappedRunnable()
-                                {
-                                    public void runMayThrow() throws IOException
-                                    {
-                                        rm.apply();
-                                    }
-                                };
-                                StageManager.getStage(StageManager.MUTATION_STAGE).execute(runnable);
-                            }
-                            else
-                            {
-                                if (unhintedMessage == null)
-                                    unhintedMessage = rm.makeRowMutationMessage();
-                                if (logger.isDebugEnabled())
-                                    logger.debug("insert writing key " + FBUtilities.bytesToHex(rm.key()) + " to " + unhintedMessage.getMessageId() + "@" + destination);
-                                MessagingService.instance.sendOneWay(unhintedMessage, destination);
-                            }
-                        }
-                        else
-                        {
-                            // hinted
-                            Message hintedMessage = rm.makeRowMutationMessage();
-                            for (InetAddress target : targets)
-                            {
-                                if (!target.equals(destination))
-                                {
-                                    addHintHeader(hintedMessage, target);
-                                    if (logger.isDebugEnabled())
-                                        logger.debug("insert writing key " + FBUtilities.bytesToHex(rm.key()) + " to " + hintedMessage.getMessageId() + "@" + destination + " for " + target);
-                                }
-                            }
-                            MessagingService.instance.sendOneWay(hintedMessage, destination);
-                        }
-                    }
-                }
-                catch (IOException e)
-                {
-                    throw new RuntimeException("error inserting key " + FBUtilities.bytesToHex(rm.key()), e);
-                }
-            }
-        }
-        finally
-        {
-            writeStats.addNano(System.nanoTime() - startTime);
-        }
-    }
-
-    private static void addHintHeader(Message message, InetAddress target)
-    {
-        byte[] oldHint = message.getHeader(RowMutation.HINT);
-        byte[] hint = oldHint == null ? target.getAddress() : ArrayUtils.addAll(oldHint, target.getAddress());
-        message.setHeader(RowMutation.HINT, hint);
-    }
-
-    public static void mutateBlocking(List<RowMutation> mutations, ConsistencyLevel consistency_level) throws UnavailableException, TimeoutException
-    {
-        long startTime = System.nanoTime();
-        ArrayList<AbstractWriteResponseHandler> responseHandlers = new ArrayList<AbstractWriteResponseHandler>();
+        ArrayList<IWriteResponseHandler> responseHandlers = new ArrayList<IWriteResponseHandler>();
 
         RowMutation mostRecentRowMutation = null;
         StorageService ss = StorageService.instance;
@@ -195,7 +108,7 @@ public class StorageProxy implements Sto
                 Multimap<InetAddress, InetAddress> hintedEndpoints = rs.getHintedEndpoints(writeEndpoints);
                 
                 // send out the writes, as in mutate() above, but this time with a callback that tracks responses
-                final AbstractWriteResponseHandler responseHandler = rs.getWriteResponseHandler(writeEndpoints, hintedEndpoints, consistency_level);
+                final IWriteResponseHandler responseHandler = rs.getWriteResponseHandler(writeEndpoints, hintedEndpoints, consistency_level);
                 responseHandler.assureSufficientLiveNodes();
 
                 responseHandlers.add(responseHandler);
@@ -238,15 +151,13 @@ public class StorageProxy implements Sto
                                     logger.debug("insert writing key " + FBUtilities.bytesToHex(rm.key()) + " to " + hintedMessage.getMessageId() + "@" + destination + " for " + target);
                             }
                         }
-                        // (non-destination hints are part of the callback and count towards consistency only under CL.ANY)
-                        if (writeEndpoints.contains(destination) || consistency_level == ConsistencyLevel.ANY)
-                            MessagingService.instance.addCallback(responseHandler, hintedMessage.getMessageId());
+                        responseHandler.addHintCallback(hintedMessage, destination);
                         MessagingService.instance.sendOneWay(hintedMessage, destination);
                     }
                 }
             }
             // wait for writes.  throws timeoutexception if necessary
-            for (AbstractWriteResponseHandler responseHandler : responseHandlers)
+            for (IWriteResponseHandler responseHandler : responseHandlers)
             {
                 responseHandler.get();
             }
@@ -265,7 +176,14 @@ public class StorageProxy implements Sto
 
     }
 
-    private static void insertLocalMessage(final RowMutation rm, final AbstractWriteResponseHandler responseHandler)
+    private static void addHintHeader(Message message, InetAddress target)
+    {
+        byte[] oldHint = message.getHeader(RowMutation.HINT);
+        byte[] hint = oldHint == null ? target.getAddress() : ArrayUtils.addAll(oldHint, target.getAddress());
+        message.setHeader(RowMutation.HINT, hint);
+    }
+
+    private static void insertLocalMessage(final RowMutation rm, final IWriteResponseHandler responseHandler)
     {
         if (logger.isDebugEnabled())
             logger.debug("insert writing local " + rm.toString(true));

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java?rev=986017&r1=986016&r2=986017&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java Mon Aug 16 16:28:43 2010
@@ -39,15 +39,15 @@ public class WriteResponseHandler extend
 {
     protected static final Logger logger = LoggerFactory.getLogger(WriteResponseHandler.class);
 
-    protected AtomicInteger responses;
+    protected final AtomicInteger responses;
 
-    public WriteResponseHandler(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistencyLevel, String table)
+    protected WriteResponseHandler(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistencyLevel, String table)
     {
         super(writeEndpoints, hintedEndpoints, consistencyLevel);
         responses = new AtomicInteger(determineBlockFor(table));
     }
 
-    public WriteResponseHandler(InetAddress endpoint)
+    protected WriteResponseHandler(InetAddress endpoint)
     {
         super(Arrays.asList(endpoint),
               ImmutableMultimap.<InetAddress, InetAddress>builder().put(endpoint, endpoint).build(),
@@ -55,6 +55,23 @@ public class WriteResponseHandler extend
         responses = new AtomicInteger(1);
     }
 
+    public static IWriteResponseHandler create(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistencyLevel, String table)
+    {
+        if (consistencyLevel == ConsistencyLevel.ZERO)
+        {
+            return NoConsistencyWriteResponseHandler.instance;
+        }
+        else
+        {
+            return new WriteResponseHandler(writeEndpoints, hintedEndpoints, consistencyLevel, table);
+        }
+    }
+
+    public static IWriteResponseHandler create(InetAddress endpoint)
+    {
+        return new WriteResponseHandler(endpoint);
+    }
+
     public void response(Message m)
     {
         if (responses.decrementAndGet() == 0)

Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=986017&r1=986016&r2=986017&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Mon Aug 16 16:28:43 2010
@@ -463,20 +463,13 @@ public class CassandraServer implements 
         {
             schedule();
 
-            if (consistency_level == ConsistencyLevel.ZERO)
+            try
             {
-                StorageProxy.mutate(mutations);
+              StorageProxy.mutate(mutations, consistency_level);
             }
-            else
+            catch (TimeoutException e)
             {
-                try
-                {
-                    StorageProxy.mutateBlocking(mutations, consistency_level);
-                }
-                catch (TimeoutException e)
-                {
-                    throw new TimedOutException();
-                }
+              throw new TimedOutException();
             }
         }
         finally