You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by gd...@apache.org on 2010/01/22 17:04:11 UTC

svn commit: r902140 - in /incubator/cassandra/trunk: ./ interface/ interface/gen-java/org/apache/cassandra/service/ src/java/org/apache/cassandra/service/

Author: gdusbabek
Date: Fri Jan 22 16:04:10 2010
New Revision: 902140

URL: http://svn.apache.org/viewvc?rev=902140&view=rev
Log:
Add ConsistencyLevel.ANY for writes. Patch by Gary Dusbabek, reviewed by Jonathan Ellis. CASSANDRA-687

Modified:
    incubator/cassandra/trunk/CHANGES.txt
    incubator/cassandra/trunk/interface/cassandra.thrift
    incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java
    incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/ConsistencyLevel.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java

Modified: incubator/cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/CHANGES.txt?rev=902140&r1=902139&r2=902140&view=diff
==============================================================================
--- incubator/cassandra/trunk/CHANGES.txt (original)
+++ incubator/cassandra/trunk/CHANGES.txt Fri Jan 22 16:04:10 2010
@@ -13,6 +13,7 @@
  * row caching [must be explicitly enabled per-CF in config] (CASSANDRA-678)
  * present a useful measure of compaction progress in JMX (CASSANDRA-599)
  * add bin/sstablekeys (CASSNADRA-679)
+ * add ConsistencyLevel.ANY (CASSANDRA-687)
 
 
 0.5.1

Modified: incubator/cassandra/trunk/interface/cassandra.thrift
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/interface/cassandra.thrift?rev=902140&r1=902139&r2=902140&view=diff
==============================================================================
--- incubator/cassandra/trunk/interface/cassandra.thrift (original)
+++ incubator/cassandra/trunk/interface/cassandra.thrift Fri Jan 22 16:04:10 2010
@@ -138,12 +138,14 @@
  *
  * Write:
  *      ZERO    Ensure nothing. A write happens asynchronously in background
+ *      ANY     Ensure that the write has been written once somewhere, including possibly being hinted in a non-target node.
  *      ONE     Ensure that the write has been written to at least 1 node's commit log and memory table before responding to the client.
  *      QUORUM  Ensure that the write has been written to <ReplicationFactor> / 2 + 1 nodes before responding to the client.
  *      ALL     Ensure that the write is written to <code>&lt;ReplicationFactor&gt;</code> nodes before responding to the client.
  *
  * Read:
  *      ZERO    Not supported, because it doesn't make sense.
+ *      ANY     Not supported. You probably want ONE instead.
  *      ONE     Will return the record returned by the first node to respond. A consistency check is always done in a
  *              background thread to fix any consistency issues when ConsistencyLevel.ONE is used. This means subsequent
  *              calls will have correct data even if the initial read gets an older value. (This is called 'read repair'.)
@@ -158,6 +160,7 @@
     DCQUORUM = 3,
     DCQUORUMSYNC = 4,
     ALL = 5,
+    ANY = 6,
 }
 
 /**
@@ -275,7 +278,7 @@
   ColumnOrSuperColumn get(1:required string keyspace,
                           2:required string key,
                           3:required ColumnPath column_path,
-                          4:required ConsistencyLevel consistency_level=1)
+                          4:required ConsistencyLevel consistency_level=ONE)
                       throws (1:InvalidRequestException ire, 2:NotFoundException nfe, 3:UnavailableException ue, 4:TimedOutException te),
 
   /**
@@ -286,7 +289,7 @@
                                       2:required string key, 
                                       3:required ColumnParent column_parent, 
                                       4:required SlicePredicate predicate, 
-                                      5:required ConsistencyLevel consistency_level=1)
+                                      5:required ConsistencyLevel consistency_level=ONE)
                             throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
 
   /**
@@ -297,7 +300,7 @@
   map<string,ColumnOrSuperColumn> multiget(1:required string keyspace, 
                                            2:required list<string> keys, 
                                            3:required ColumnPath column_path, 
-                                           4:required ConsistencyLevel consistency_level=1)
+                                           4:required ConsistencyLevel consistency_level=ONE)
                                   throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
 
   /**
@@ -307,7 +310,7 @@
                                                        2:required list<string> keys, 
                                                        3:required ColumnParent column_parent, 
                                                        4:required SlicePredicate predicate, 
-                                                       5:required ConsistencyLevel consistency_level=1)
+                                                       5:required ConsistencyLevel consistency_level=ONE)
                                         throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
 
   /**
@@ -316,7 +319,7 @@
   i32 get_count(1:required string keyspace, 
                 2:required string key, 
                 3:required ColumnParent column_parent, 
-                4:required ConsistencyLevel consistency_level=1)
+                4:required ConsistencyLevel consistency_level=ONE)
       throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
 
   /**
@@ -328,7 +331,7 @@
                                  4:required string start_key="", 
                                  5:required string finish_key="", 
                                  6:required i32 row_count=100, 
-                                 7:required ConsistencyLevel consistency_level=1)
+                                 7:required ConsistencyLevel consistency_level=ONE)
                  throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
 
   # modification methods
@@ -343,7 +346,7 @@
               3:required ColumnPath column_path, 
               4:required binary value, 
               5:required i64 timestamp, 
-              6:required ConsistencyLevel consistency_level=0)
+              6:required ConsistencyLevel consistency_level=ZERO)
        throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
 
   /**
@@ -355,7 +358,7 @@
   void batch_insert(1:required string keyspace, 
                     2:required string key, 
                     3:required map<string, list<ColumnOrSuperColumn>> cfmap,
-                    4:required ConsistencyLevel consistency_level=0)
+                    4:required ConsistencyLevel consistency_level=ZERO)
        throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
 
   /**
@@ -367,12 +370,12 @@
               2:required string key,
               3:required ColumnPath column_path,
               4:required i64 timestamp,
-              5:ConsistencyLevel consistency_level=0)
+              5:ConsistencyLevel consistency_level=ZERO)
        throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
 
   void batch_mutate(1:required string keyspace,
                     2:required map<string, map<string, list<Mutation>>> mutation_map,
-                    3:required ConsistencyLevel consistency_level=0)
+                    3:required ConsistencyLevel consistency_level=ZERO)
        throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
        
   // Meta-APIs -- APIs to get information about the node or cluster,

Modified: incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java?rev=902140&r1=902139&r2=902140&view=diff
==============================================================================
--- incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java (original)
+++ incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java Fri Jan 22 16:04:10 2010
@@ -116,6 +116,7 @@
      * Insert Columns or SuperColumns across different Column Families for the same row key. batch_mutation is a
      * map<string, list<ColumnOrSuperColumn>> -- a map which pairs column family names with the relevant ColumnOrSuperColumn
      * objects to insert.
+     * @deprecated; use batch_mutate instead
      * 
      * @param keyspace
      * @param key

Modified: incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/ConsistencyLevel.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/ConsistencyLevel.java?rev=902140&r1=902139&r2=902140&view=diff
==============================================================================
--- incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/ConsistencyLevel.java (original)
+++ incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/ConsistencyLevel.java Fri Jan 22 16:04:10 2010
@@ -21,12 +21,14 @@
  * 
  * Write:
  *      ZERO    Ensure nothing. A write happens asynchronously in background
+ *      ANY     Ensure that the write has been written once somewhere, including possibly being hinted in a non-target node.
  *      ONE     Ensure that the write has been written to at least 1 node's commit log and memory table before responding to the client.
  *      QUORUM  Ensure that the write has been written to <ReplicationFactor> / 2 + 1 nodes before responding to the client.
  *      ALL     Ensure that the write is written to <code>&lt;ReplicationFactor&gt;</code> nodes before responding to the client.
  * 
  * Read:
  *      ZERO    Not supported, because it doesn't make sense.
+ *      ANY     Not supported. You probably want ONE instead.
  *      ONE     Will return the record returned by the first node to respond. A consistency check is always done in a
  *              background thread to fix any consistency issues when ConsistencyLevel.ONE is used. This means subsequent
  *              calls will have correct data even if the initial read gets an older value. (This is called 'read repair'.)
@@ -40,7 +42,8 @@
   QUORUM(2),
   DCQUORUM(3),
   DCQUORUMSYNC(4),
-  ALL(5);
+  ALL(5),
+  ANY(6);
 
   private static final Map<Integer, ConsistencyLevel> BY_VALUE = new HashMap<Integer,ConsistencyLevel>() {{
     for(ConsistencyLevel val : ConsistencyLevel.values()) {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java?rev=902140&r1=902139&r2=902140&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java Fri Jan 22 16:04:10 2010
@@ -85,6 +85,10 @@
         {
             throw new InvalidRequestException("Consistency level all is not yet supported on read operations");
         }
+        if (consistency_level == ConsistencyLevel.ANY)
+        {
+            throw new InvalidRequestException("Consistency level any may not be applied to read operations");
+        }
 
         List<Row> rows;
         try

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=902140&r1=902139&r2=902140&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Fri Jan 22 16:04:10 2010
@@ -176,9 +176,9 @@
                 List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(rm.key());
                 Map<InetAddress, InetAddress> endpointMap = StorageService.instance.getHintedEndpointMap(rm.key(), naturalEndpoints);
                 int blockFor = determineBlockFor(naturalEndpoints.size(), endpointMap.size(), consistency_level);
-    
+                
                 // avoid starting a write we know can't achieve the required consistency
-                assureSufficientLiveNodes(endpointMap, blockFor);
+                assureSufficientLiveNodes(endpointMap, blockFor, consistency_level);
                 
                 // send out the writes, as in insert() above, but this time with a callback that tracks responses
                 final WriteResponseHandler responseHandler = StorageService.instance.getWriteResponseHandler(blockFor, consistency_level);
@@ -211,9 +211,11 @@
                     }
                     else
                     {
-                        // (hints aren't part of the callback since they don't count towards consistency until they are on the final destination node)
                         Message hintedMessage = rm.makeRowMutationMessage();
                         hintedMessage.addHeader(RowMutation.HINT, naturalTarget.getAddress());
+                        // (hints are part of the callback and count towards consistency only under CL.ANY
+                        if (consistency_level == ConsistencyLevel.ANY)
+                            MessagingService.instance.addCallback(responseHandler, hintedMessage.getMessageId());
                         if (logger.isDebugEnabled())
                             logger.debug("insert writing key " + rm.key() + " to " + hintedMessage.getMessageId() + "@" + maybeHintedTarget + " for " + naturalTarget);
                         MessagingService.instance.sendOneWay(hintedMessage, maybeHintedTarget);
@@ -240,20 +242,30 @@
 
     }
 
-    private static void assureSufficientLiveNodes(Map<InetAddress, InetAddress> endpointMap, int blockFor)
+    private static void assureSufficientLiveNodes(Map<InetAddress, InetAddress> endpointMap, int blockFor, ConsistencyLevel consistencyLevel)
             throws UnavailableException
     {
-        int liveNodes = 0;
-        for (Map.Entry<InetAddress, InetAddress> entry : endpointMap.entrySet())
+        if (consistencyLevel == ConsistencyLevel.ANY)
         {
-            if (entry.getKey().equals(entry.getValue()))
-            {
-                liveNodes++;
-            }
+            // ensure there are blockFor distinct living nodes (hints are ok).
+            if (new HashSet(endpointMap.values()).size() < blockFor)
+                throw new UnavailableException();
         }
-        if (liveNodes < blockFor)
+        else
         {
-            throw new UnavailableException();
+            // only count live + unhinted nodes.
+            int liveNodes = 0;
+            for (Map.Entry<InetAddress, InetAddress> entry : endpointMap.entrySet())
+            {
+                if (entry.getKey().equals(entry.getValue()))
+                {
+                    liveNodes++;
+                }
+            }
+            if (liveNodes < blockFor)
+            {
+                throw new UnavailableException();
+            }
         }
     }
 
@@ -296,6 +308,10 @@
         {
             blockFor = naturalTargets + bootstrapTargets;
         }
+        else if (consistency_level == ConsistencyLevel.ANY)
+        {
+            blockFor = 1;
+        }
         else
         {
             throw new UnsupportedOperationException("invalid consistency level " + consistency_level);