You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2014/08/25 16:01:56 UTC

[6/7] git commit: Always send Paxos commit to all replicas

Always send Paxos commit to all replicas

patch by kohlisankalp; reviewed by slebresne for CASSANDRA-7479


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/536a08c2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/536a08c2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/536a08c2

Branch: refs/heads/trunk
Commit: 536a08c29773548203845ad562e278e899b35a4d
Parents: 3fa5a9e
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Mon Aug 25 15:55:43 2014 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Mon Aug 25 16:00:35 2014 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/service/StorageProxy.java  | 24 +++++++++++++-------
 2 files changed, 17 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/536a08c2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2329333..a0ff1d1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -72,6 +72,7 @@ Merged from 1.2:
    values (CASSANDRA-7792)
  * Fix ordering of static cells (CASSANDRA-7763)
 Merged from 2.0:
+ * Always send Paxos commit to all replicas (CASSANDRA-7479)
  * Configure system.paxos with LeveledCompactionStrategy (CASSANDRA-7753)
  * Fix ALTER clustering column type from DateType to TimestampType when
    using DESC clustering order (CASSANRDA-7797)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/536a08c2/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 1c0c482..ff6d89c 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -251,10 +251,7 @@ public class StorageProxy implements StorageProxyMBean
             Tracing.trace("CAS precondition is met; proposing client-requested updates for {}", ballot);
             if (proposePaxos(proposal, liveEndpoints, requiredParticipants, true, consistencyForPaxos))
             {
-                if (consistencyForCommit == ConsistencyLevel.ANY)
-                    sendCommit(proposal, liveEndpoints);
-                else
-                    commitPaxos(proposal, consistencyForCommit);
+                commitPaxos(proposal, consistencyForCommit);
                 Tracing.trace("CAS successful");
                 return null;
             }
@@ -416,23 +413,34 @@ public class StorageProxy implements StorageProxyMBean
 
     private static void commitPaxos(Commit proposal, ConsistencyLevel consistencyLevel) throws WriteTimeoutException
     {
+        boolean shouldBlock = consistencyLevel != ConsistencyLevel.ANY;
         Keyspace keyspace = Keyspace.open(proposal.update.metadata().ksName);
 
         Token tk = StorageService.getPartitioner().getToken(proposal.key);
         List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspace.getName(), tk);
         Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspace.getName());
 
-        AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
-        AbstractWriteResponseHandler responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistencyLevel, null, WriteType.SIMPLE);
+        AbstractWriteResponseHandler responseHandler = null;
+        if (shouldBlock)
+        {
+            AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
+            responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistencyLevel, null, WriteType.SIMPLE);
+        }
 
         MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_COMMIT, proposal, Commit.serializer);
         for (InetAddress destination : Iterables.concat(naturalEndpoints, pendingEndpoints))
         {
             if (FailureDetector.instance.isAlive(destination))
-                MessagingService.instance().sendRR(message, destination, responseHandler);
+            {
+                if (shouldBlock)
+                    MessagingService.instance().sendRR(message, destination, responseHandler);
+                else
+                    MessagingService.instance().sendOneWay(message, destination);
+            }
         }
 
-        responseHandler.get();
+        if (shouldBlock)
+            responseHandler.get();
     }
 
     /**