You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by gd...@apache.org on 2010/01/22 17:04:11 UTC
svn commit: r902140 - in /incubator/cassandra/trunk: ./ interface/
interface/gen-java/org/apache/cassandra/service/
src/java/org/apache/cassandra/service/
Author: gdusbabek
Date: Fri Jan 22 16:04:10 2010
New Revision: 902140
URL: http://svn.apache.org/viewvc?rev=902140&view=rev
Log:
Add ConsistencyLevel.ANY for writes. Patch by Gary Dusbabek, reviewed by Jonathan Ellis. CASSANDRA-687
Modified:
incubator/cassandra/trunk/CHANGES.txt
incubator/cassandra/trunk/interface/cassandra.thrift
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/ConsistencyLevel.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Modified: incubator/cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/CHANGES.txt?rev=902140&r1=902139&r2=902140&view=diff
==============================================================================
--- incubator/cassandra/trunk/CHANGES.txt (original)
+++ incubator/cassandra/trunk/CHANGES.txt Fri Jan 22 16:04:10 2010
@@ -13,6 +13,7 @@
* row caching [must be explicitly enabled per-CF in config] (CASSANDRA-678)
* present a useful measure of compaction progress in JMX (CASSANDRA-599)
* add bin/sstablekeys (CASSNADRA-679)
+ * add ConsistencyLevel.ANY (CASSANDRA-687)
0.5.1
Modified: incubator/cassandra/trunk/interface/cassandra.thrift
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/interface/cassandra.thrift?rev=902140&r1=902139&r2=902140&view=diff
==============================================================================
--- incubator/cassandra/trunk/interface/cassandra.thrift (original)
+++ incubator/cassandra/trunk/interface/cassandra.thrift Fri Jan 22 16:04:10 2010
@@ -138,12 +138,14 @@
*
* Write:
* ZERO Ensure nothing. A write happens asynchronously in background
+ * ANY Ensure that the write has been written once somewhere, including possibly being hinted in a non-target node.
* ONE Ensure that the write has been written to at least 1 node's commit log and memory table before responding to the client.
* QUORUM Ensure that the write has been written to <ReplicationFactor> / 2 + 1 nodes before responding to the client.
* ALL Ensure that the write is written to <code><ReplicationFactor></code> nodes before responding to the client.
*
* Read:
* ZERO Not supported, because it doesn't make sense.
+ * ANY Not supported. You probably want ONE instead.
* ONE Will return the record returned by the first node to respond. A consistency check is always done in a
* background thread to fix any consistency issues when ConsistencyLevel.ONE is used. This means subsequent
* calls will have correct data even if the initial read gets an older value. (This is called 'read repair'.)
@@ -158,6 +160,7 @@
DCQUORUM = 3,
DCQUORUMSYNC = 4,
ALL = 5,
+ ANY = 6,
}
/**
@@ -275,7 +278,7 @@
ColumnOrSuperColumn get(1:required string keyspace,
2:required string key,
3:required ColumnPath column_path,
- 4:required ConsistencyLevel consistency_level=1)
+ 4:required ConsistencyLevel consistency_level=ONE)
throws (1:InvalidRequestException ire, 2:NotFoundException nfe, 3:UnavailableException ue, 4:TimedOutException te),
/**
@@ -286,7 +289,7 @@
2:required string key,
3:required ColumnParent column_parent,
4:required SlicePredicate predicate,
- 5:required ConsistencyLevel consistency_level=1)
+ 5:required ConsistencyLevel consistency_level=ONE)
throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
/**
@@ -297,7 +300,7 @@
map<string,ColumnOrSuperColumn> multiget(1:required string keyspace,
2:required list<string> keys,
3:required ColumnPath column_path,
- 4:required ConsistencyLevel consistency_level=1)
+ 4:required ConsistencyLevel consistency_level=ONE)
throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
/**
@@ -307,7 +310,7 @@
2:required list<string> keys,
3:required ColumnParent column_parent,
4:required SlicePredicate predicate,
- 5:required ConsistencyLevel consistency_level=1)
+ 5:required ConsistencyLevel consistency_level=ONE)
throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
/**
@@ -316,7 +319,7 @@
i32 get_count(1:required string keyspace,
2:required string key,
3:required ColumnParent column_parent,
- 4:required ConsistencyLevel consistency_level=1)
+ 4:required ConsistencyLevel consistency_level=ONE)
throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
/**
@@ -328,7 +331,7 @@
4:required string start_key="",
5:required string finish_key="",
6:required i32 row_count=100,
- 7:required ConsistencyLevel consistency_level=1)
+ 7:required ConsistencyLevel consistency_level=ONE)
throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
# modification methods
@@ -343,7 +346,7 @@
3:required ColumnPath column_path,
4:required binary value,
5:required i64 timestamp,
- 6:required ConsistencyLevel consistency_level=0)
+ 6:required ConsistencyLevel consistency_level=ZERO)
throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
/**
@@ -355,7 +358,7 @@
void batch_insert(1:required string keyspace,
2:required string key,
3:required map<string, list<ColumnOrSuperColumn>> cfmap,
- 4:required ConsistencyLevel consistency_level=0)
+ 4:required ConsistencyLevel consistency_level=ZERO)
throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
/**
@@ -367,12 +370,12 @@
2:required string key,
3:required ColumnPath column_path,
4:required i64 timestamp,
- 5:ConsistencyLevel consistency_level=0)
+ 5:ConsistencyLevel consistency_level=ZERO)
throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
void batch_mutate(1:required string keyspace,
2:required map<string, map<string, list<Mutation>>> mutation_map,
- 3:required ConsistencyLevel consistency_level=0)
+ 3:required ConsistencyLevel consistency_level=ZERO)
throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
// Meta-APIs -- APIs to get information about the node or cluster,
Modified: incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java?rev=902140&r1=902139&r2=902140&view=diff
==============================================================================
--- incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java (original)
+++ incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java Fri Jan 22 16:04:10 2010
@@ -116,6 +116,7 @@
* Insert Columns or SuperColumns across different Column Families for the same row key. batch_mutation is a
* map<string, list<ColumnOrSuperColumn>> -- a map which pairs column family names with the relevant ColumnOrSuperColumn
* objects to insert.
+ * @deprecated; use batch_mutate instead
*
* @param keyspace
* @param key
Modified: incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/ConsistencyLevel.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/ConsistencyLevel.java?rev=902140&r1=902139&r2=902140&view=diff
==============================================================================
--- incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/ConsistencyLevel.java (original)
+++ incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/ConsistencyLevel.java Fri Jan 22 16:04:10 2010
@@ -21,12 +21,14 @@
*
* Write:
* ZERO Ensure nothing. A write happens asynchronously in background
+ * ANY Ensure that the write has been written once somewhere, including possibly being hinted in a non-target node.
* ONE Ensure that the write has been written to at least 1 node's commit log and memory table before responding to the client.
* QUORUM Ensure that the write has been written to <ReplicationFactor> / 2 + 1 nodes before responding to the client.
* ALL Ensure that the write is written to <code><ReplicationFactor></code> nodes before responding to the client.
*
* Read:
* ZERO Not supported, because it doesn't make sense.
+ * ANY Not supported. You probably want ONE instead.
* ONE Will return the record returned by the first node to respond. A consistency check is always done in a
* background thread to fix any consistency issues when ConsistencyLevel.ONE is used. This means subsequent
* calls will have correct data even if the initial read gets an older value. (This is called 'read repair'.)
@@ -40,7 +42,8 @@
QUORUM(2),
DCQUORUM(3),
DCQUORUMSYNC(4),
- ALL(5);
+ ALL(5),
+ ANY(6);
private static final Map<Integer, ConsistencyLevel> BY_VALUE = new HashMap<Integer,ConsistencyLevel>() {{
for(ConsistencyLevel val : ConsistencyLevel.values()) {
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java?rev=902140&r1=902139&r2=902140&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java Fri Jan 22 16:04:10 2010
@@ -85,6 +85,10 @@
{
throw new InvalidRequestException("Consistency level all is not yet supported on read operations");
}
+ if (consistency_level == ConsistencyLevel.ANY)
+ {
+ throw new InvalidRequestException("Consistency level any may not be applied to read operations");
+ }
List<Row> rows;
try
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=902140&r1=902139&r2=902140&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Fri Jan 22 16:04:10 2010
@@ -176,9 +176,9 @@
List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(rm.key());
Map<InetAddress, InetAddress> endpointMap = StorageService.instance.getHintedEndpointMap(rm.key(), naturalEndpoints);
int blockFor = determineBlockFor(naturalEndpoints.size(), endpointMap.size(), consistency_level);
-
+
// avoid starting a write we know can't achieve the required consistency
- assureSufficientLiveNodes(endpointMap, blockFor);
+ assureSufficientLiveNodes(endpointMap, blockFor, consistency_level);
// send out the writes, as in insert() above, but this time with a callback that tracks responses
final WriteResponseHandler responseHandler = StorageService.instance.getWriteResponseHandler(blockFor, consistency_level);
@@ -211,9 +211,11 @@
}
else
{
- // (hints aren't part of the callback since they don't count towards consistency until they are on the final destination node)
Message hintedMessage = rm.makeRowMutationMessage();
hintedMessage.addHeader(RowMutation.HINT, naturalTarget.getAddress());
+ // (hints are part of the callback and count towards consistency only under CL.ANY
+ if (consistency_level == ConsistencyLevel.ANY)
+ MessagingService.instance.addCallback(responseHandler, hintedMessage.getMessageId());
if (logger.isDebugEnabled())
logger.debug("insert writing key " + rm.key() + " to " + hintedMessage.getMessageId() + "@" + maybeHintedTarget + " for " + naturalTarget);
MessagingService.instance.sendOneWay(hintedMessage, maybeHintedTarget);
@@ -240,20 +242,30 @@
}
- private static void assureSufficientLiveNodes(Map<InetAddress, InetAddress> endpointMap, int blockFor)
+ private static void assureSufficientLiveNodes(Map<InetAddress, InetAddress> endpointMap, int blockFor, ConsistencyLevel consistencyLevel)
throws UnavailableException
{
- int liveNodes = 0;
- for (Map.Entry<InetAddress, InetAddress> entry : endpointMap.entrySet())
+ if (consistencyLevel == ConsistencyLevel.ANY)
{
- if (entry.getKey().equals(entry.getValue()))
- {
- liveNodes++;
- }
+ // ensure there are blockFor distinct living nodes (hints are ok).
+ if (new HashSet(endpointMap.values()).size() < blockFor)
+ throw new UnavailableException();
}
- if (liveNodes < blockFor)
+ else
{
- throw new UnavailableException();
+ // only count live + unhinted nodes.
+ int liveNodes = 0;
+ for (Map.Entry<InetAddress, InetAddress> entry : endpointMap.entrySet())
+ {
+ if (entry.getKey().equals(entry.getValue()))
+ {
+ liveNodes++;
+ }
+ }
+ if (liveNodes < blockFor)
+ {
+ throw new UnavailableException();
+ }
}
}
@@ -296,6 +308,10 @@
{
blockFor = naturalTargets + bootstrapTargets;
}
+ else if (consistency_level == ConsistencyLevel.ANY)
+ {
+ blockFor = 1;
+ }
else
{
throw new UnsupportedOperationException("invalid consistency level " + consistency_level);