You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2014/01/08 17:23:08 UTC
[1/2] git commit: Fix execution of LOCAL_QUORUM queries with
SimpleStrategy
Updated Branches:
refs/heads/cassandra-2.0 5fa605510 -> f8fd7db67
Fix execution of LOCAL_QUORUM queries with SimpleStrategy
patch by alexliu68; reviewed by slebresne for CASSANDRA-6545
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/32d7cb50
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/32d7cb50
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/32d7cb50
Branch: refs/heads/cassandra-2.0
Commit: 32d7cb5066050ef6123f50a25c6e9b4c9e180ea0
Parents: 2a7c20e
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Jan 8 17:14:58 2014 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Jan 8 17:15:20 2014 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ConsistencyLevel.java | 60 +++++++++++++-------
.../locator/AbstractReplicationStrategy.java | 2 +-
3 files changed, 42 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/32d7cb50/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5a85977..cba97d0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,7 @@
* fsync compression metadata (CASSANDRA-6531)
* Validate CF existence on execution for prepared statement (CASSANDRA-6535)
* Add ability to throttle batchlog replay (CASSANDRA-6550)
+ * Fix executing LOCAL_QUORUM with SimpleStrategy (CASSANDRA-6545)
1.2.13
http://git-wip-us.apache.org/repos/asf/cassandra/blob/32d7cb50/src/java/org/apache/cassandra/db/ConsistencyLevel.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
index 4d72767..3737c73 100644
--- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java
+++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
@@ -88,9 +88,16 @@ public enum ConsistencyLevel
return codeIdx[code];
}
+ private int quorumFor(Table table)
+ {
+ return (table.getReplicationStrategy().getReplicationFactor() / 2) + 1;
+ }
+
private int localQuorumFor(Table table, String dc)
{
- return (((NetworkTopologyStrategy) table.getReplicationStrategy()).getReplicationFactor(dc) / 2) + 1;
+ return (table.getReplicationStrategy() instanceof NetworkTopologyStrategy)
+ ? (((NetworkTopologyStrategy) table.getReplicationStrategy()).getReplicationFactor(dc) / 2) + 1
+ : quorumFor(table);
}
public int blockFor(Table table)
@@ -107,17 +114,24 @@ public enum ConsistencyLevel
case THREE:
return 3;
case QUORUM:
- return (table.getReplicationStrategy().getReplicationFactor() / 2) + 1;
+ return quorumFor(table);
case ALL:
return table.getReplicationStrategy().getReplicationFactor();
case LOCAL_QUORUM:
return localQuorumFor(table, DatabaseDescriptor.getLocalDataCenter());
case EACH_QUORUM:
- NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) table.getReplicationStrategy();
- int n = 0;
- for (String dc : strategy.getDatacenters())
- n += localQuorumFor(table, dc);
- return n;
+ if (table.getReplicationStrategy() instanceof NetworkTopologyStrategy)
+ {
+ NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) table.getReplicationStrategy();
+ int n = 0;
+ for (String dc : strategy.getDatacenters())
+ n += localQuorumFor(table, dc);
+ return n;
+ }
+ else
+ {
+ return quorumFor(table);
+ }
default:
throw new UnsupportedOperationException("Invalid consistency level: " + toString());
}
@@ -208,16 +222,20 @@ public enum ConsistencyLevel
// local hint is acceptable, and local node is always live
return true;
case LOCAL_ONE:
- return countLocalEndpoints(liveEndpoints) >= 1;
+ return countLocalEndpoints(liveEndpoints) >= 1;
case LOCAL_QUORUM:
return countLocalEndpoints(liveEndpoints) >= blockFor(table);
case EACH_QUORUM:
- for (Map.Entry<String, Integer> entry : countPerDCEndpoints(table, liveEndpoints).entrySet())
+ if (table.getReplicationStrategy() instanceof NetworkTopologyStrategy)
{
- if (entry.getValue() < localQuorumFor(table, entry.getKey()))
- return false;
+ for (Map.Entry<String, Integer> entry : countPerDCEndpoints(table, liveEndpoints).entrySet())
+ {
+ if (entry.getValue() < localQuorumFor(table, entry.getKey()))
+ return false;
+ }
+ return true;
}
- return true;
+ // Fallthough on purpose for SimpleStrategy
default:
return Iterables.size(liveEndpoints) >= blockFor(table);
}
@@ -250,14 +268,18 @@ public enum ConsistencyLevel
}
break;
case EACH_QUORUM:
- for (Map.Entry<String, Integer> entry : countPerDCEndpoints(table, liveEndpoints).entrySet())
+ if (table.getReplicationStrategy() instanceof NetworkTopologyStrategy)
{
- int dcBlockFor = localQuorumFor(table, entry.getKey());
- int dcLive = entry.getValue();
- if (dcLive < dcBlockFor)
- throw new UnavailableException(this, dcBlockFor, dcLive);
+ for (Map.Entry<String, Integer> entry : countPerDCEndpoints(table, liveEndpoints).entrySet())
+ {
+ int dcBlockFor = localQuorumFor(table, entry.getKey());
+ int dcLive = entry.getValue();
+ if (dcLive < dcBlockFor)
+ throw new UnavailableException(this, dcBlockFor, dcLive);
+ }
+ break;
}
- break;
+ // Fallthough on purpose for SimpleStrategy
default:
int live = Iterables.size(liveEndpoints);
if (live < blockFor)
@@ -282,8 +304,6 @@ public enum ConsistencyLevel
public void validateForWrite(String table) throws InvalidRequestException
{
- if(this == EACH_QUORUM)
- requireNetworkTopologyStrategy(table);
}
public void validateCounterForWrite(CFMetaData metadata) throws InvalidRequestException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/32d7cb50/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index a48bec9..e4dd422 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -134,7 +134,7 @@ public abstract class AbstractReplicationStrategy
// block for in this context will be localnodes block.
return new DatacenterWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getTable(), callback, writeType);
}
- else if (consistency_level == ConsistencyLevel.EACH_QUORUM)
+ else if (consistency_level == ConsistencyLevel.EACH_QUORUM && (this instanceof NetworkTopologyStrategy))
{
return new DatacenterSyncWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getTable(), callback, writeType);
}
[2/2] git commit: Merge branch 'cassandra-1.2' into cassandra-2.0
Posted by sl...@apache.org.
Merge branch 'cassandra-1.2' into cassandra-2.0
Conflicts:
src/java/org/apache/cassandra/db/ConsistencyLevel.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f8fd7db6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f8fd7db6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f8fd7db6
Branch: refs/heads/cassandra-2.0
Commit: f8fd7db67e3ff7268bd2bc96bd1373d54664b7dd
Parents: 5fa6055 32d7cb5
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Jan 8 17:22:55 2014 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Jan 8 17:22:55 2014 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ConsistencyLevel.java | 59 +++++++++++++-------
.../locator/AbstractReplicationStrategy.java | 2 +-
3 files changed, 41 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fd7db6/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index faf52a4,cba97d0..d0b63a0
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -5,29 -8,10 +5,30 @@@ Merged from 1.2
* fsync compression metadata (CASSANDRA-6531)
* Validate CF existence on execution for prepared statement (CASSANDRA-6535)
* Add ability to throttle batchlog replay (CASSANDRA-6550)
+ * Fix executing LOCAL_QUORUM with SimpleStrategy (CASSANDRA-6545)
-1.2.13
+2.0.4
+ * Allow removing snapshots of no-longer-existing CFs (CASSANDRA-6418)
+ * add StorageService.stopDaemon() (CASSANDRA-4268)
+ * add IRE for invalid CF supplied to get_count (CASSANDRA-5701)
+ * add client encryption support to sstableloader (CASSANDRA-6378)
+ * Fix accept() loop for SSL sockets post-shutdown (CASSANDRA-6468)
+ * Fix size-tiered compaction in LCS L0 (CASSANDRA-6496)
+ * Fix assertion failure in filterColdSSTables (CASSANDRA-6483)
+ * Fix row tombstones in larger-than-memory compactions (CASSANDRA-6008)
+ * Fix cleanup ClassCastException (CASSANDRA-6462)
+ * Reduce gossip memory use by interning VersionedValue strings (CASSANDRA-6410)
+ * Allow specifying datacenters to participate in a repair (CASSANDRA-6218)
+ * Fix divide-by-zero in PCI (CASSANDRA-6403)
+ * Fix setting last compacted key in the wrong level for LCS (CASSANDRA-6284)
+ * Add millisecond precision formats to the timestamp parser (CASSANDRA-6395)
+ * Expose a total memtable size metric for a CF (CASSANDRA-6391)
+ * cqlsh: handle symlinks properly (CASSANDRA-6425)
+ * Fix potential infinite loop when paging query with IN (CASSANDRA-6464)
+ * Fix assertion error in AbstractQueryPager.discardFirst (CASSANDRA-6447)
+ * Fix streaming older SSTable yields unnecessary tombstones (CASSANDRA-6527)
+Merged from 1.2:
* Improved error message on bad properties in DDL queries (CASSANDRA-6453)
* Randomize batchlog candidates selection (CASSANDRA-6481)
* Fix thundering herd on endpoint cache invalidation (CASSANDRA-6345, 6485)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fd7db6/src/java/org/apache/cassandra/db/ConsistencyLevel.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ConsistencyLevel.java
index 0f6aba7,3737c73..6d04314
--- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java
+++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
@@@ -89,12 -88,19 +89,19 @@@ public enum ConsistencyLeve
return codeIdx[code];
}
- private int quorumFor(Table table)
++ private int quorumFor(Keyspace keyspace)
+ {
- return (table.getReplicationStrategy().getReplicationFactor() / 2) + 1;
++ return (keyspace.getReplicationStrategy().getReplicationFactor() / 2) + 1;
+ }
+
- private int localQuorumFor(Table table, String dc)
+ private int localQuorumFor(Keyspace keyspace, String dc)
{
- return (((NetworkTopologyStrategy) keyspace.getReplicationStrategy()).getReplicationFactor(dc) / 2) + 1;
- return (table.getReplicationStrategy() instanceof NetworkTopologyStrategy)
- ? (((NetworkTopologyStrategy) table.getReplicationStrategy()).getReplicationFactor(dc) / 2) + 1
- : quorumFor(table);
++ return (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy)
++ ? (((NetworkTopologyStrategy) keyspace.getReplicationStrategy()).getReplicationFactor(dc) / 2) + 1
++ : quorumFor(keyspace);
}
- public int blockFor(Table table)
+ public int blockFor(Keyspace keyspace)
{
switch (this)
{
@@@ -108,17 -114,24 +115,24 @@@
case THREE:
return 3;
case QUORUM:
- return (keyspace.getReplicationStrategy().getReplicationFactor() / 2) + 1;
- return quorumFor(table);
++ return quorumFor(keyspace);
case ALL:
- return table.getReplicationStrategy().getReplicationFactor();
+ return keyspace.getReplicationStrategy().getReplicationFactor();
case LOCAL_QUORUM:
- return localQuorumFor(table, DatabaseDescriptor.getLocalDataCenter());
+ return localQuorumFor(keyspace, DatabaseDescriptor.getLocalDataCenter());
case EACH_QUORUM:
- NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) keyspace.getReplicationStrategy();
- int n = 0;
- for (String dc : strategy.getDatacenters())
- n += localQuorumFor(keyspace, dc);
- return n;
- if (table.getReplicationStrategy() instanceof NetworkTopologyStrategy)
++ if (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy)
+ {
- NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) table.getReplicationStrategy();
++ NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) keyspace.getReplicationStrategy();
+ int n = 0;
+ for (String dc : strategy.getDatacenters())
- n += localQuorumFor(table, dc);
++ n += localQuorumFor(keyspace, dc);
+ return n;
+ }
+ else
+ {
- return quorumFor(table);
++ return quorumFor(keyspace);
+ }
default:
throw new UnsupportedOperationException("Invalid consistency level: " + toString());
}
@@@ -211,16 -224,20 +225,20 @@@
case LOCAL_ONE:
return countLocalEndpoints(liveEndpoints) >= 1;
case LOCAL_QUORUM:
- return countLocalEndpoints(liveEndpoints) >= blockFor(table);
+ return countLocalEndpoints(liveEndpoints) >= blockFor(keyspace);
case EACH_QUORUM:
- for (Map.Entry<String, Integer> entry : countPerDCEndpoints(keyspace, liveEndpoints).entrySet())
- if (table.getReplicationStrategy() instanceof NetworkTopologyStrategy)
++ if (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy)
{
- if (entry.getValue() < localQuorumFor(keyspace, entry.getKey()))
- return false;
- for (Map.Entry<String, Integer> entry : countPerDCEndpoints(table, liveEndpoints).entrySet())
++ for (Map.Entry<String, Integer> entry : countPerDCEndpoints(keyspace, liveEndpoints).entrySet())
+ {
- if (entry.getValue() < localQuorumFor(table, entry.getKey()))
++ if (entry.getValue() < localQuorumFor(keyspace, entry.getKey()))
+ return false;
+ }
+ return true;
}
- return true;
+ // Fallthough on purpose for SimpleStrategy
default:
- return Iterables.size(liveEndpoints) >= blockFor(table);
+ return Iterables.size(liveEndpoints) >= blockFor(keyspace);
}
}
@@@ -251,14 -268,18 +269,18 @@@
}
break;
case EACH_QUORUM:
- for (Map.Entry<String, Integer> entry : countPerDCEndpoints(keyspace, liveEndpoints).entrySet())
- if (table.getReplicationStrategy() instanceof NetworkTopologyStrategy)
++ if (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy)
{
- int dcBlockFor = localQuorumFor(keyspace, entry.getKey());
- int dcLive = entry.getValue();
- if (dcLive < dcBlockFor)
- throw new UnavailableException(this, dcBlockFor, dcLive);
- for (Map.Entry<String, Integer> entry : countPerDCEndpoints(table, liveEndpoints).entrySet())
++ for (Map.Entry<String, Integer> entry : countPerDCEndpoints(keyspace, liveEndpoints).entrySet())
+ {
- int dcBlockFor = localQuorumFor(table, entry.getKey());
++ int dcBlockFor = localQuorumFor(keyspace, entry.getKey());
+ int dcLive = entry.getValue();
+ if (dcLive < dcBlockFor)
+ throw new UnavailableException(this, dcBlockFor, dcLive);
+ }
+ break;
}
- break;
+ // Fallthough on purpose for SimpleStrategy
default:
int live = Iterables.size(liveEndpoints);
if (live < blockFor)
@@@ -281,42 -302,8 +303,39 @@@
}
}
- public void validateForWrite(String table) throws InvalidRequestException
+ public void validateForWrite(String keyspaceName) throws InvalidRequestException
+ {
+ switch (this)
+ {
- case EACH_QUORUM:
- requireNetworkTopologyStrategy(keyspaceName);
- break;
+ case SERIAL:
+ case LOCAL_SERIAL:
+ throw new InvalidRequestException("You must use conditional updates for serializable writes");
+ }
+ }
+
+ // This is the same than validateForWrite really, but we include a slightly different error message for SERIAL/LOCAL_SERIAL
+ public void validateForCasCommit(String keyspaceName) throws InvalidRequestException
+ {
+ switch (this)
+ {
+ case EACH_QUORUM:
+ requireNetworkTopologyStrategy(keyspaceName);
+ break;
+ case SERIAL:
+ case LOCAL_SERIAL:
+ throw new InvalidRequestException(this + " is not supported as conditional update commit consistency. Use ANY if you mean \"make sure it is accepted but I don't care how many replicas commit it for non-SERIAL reads\"");
+ }
+ }
+
+ public void validateForCas() throws InvalidRequestException
{
+ if (!isSerialConsistency())
+ throw new InvalidRequestException("Invalid consistency for conditional update. Must be one of SERIAL or LOCAL_SERIAL");
+ }
+
+ public boolean isSerialConsistency()
+ {
+ return this == SERIAL || this == LOCAL_SERIAL;
}
public void validateCounterForWrite(CFMetaData metadata) throws InvalidRequestException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fd7db6/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index f4a2662,e4dd422..df33813
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@@ -132,20 -132,20 +132,20 @@@ public abstract class AbstractReplicati
if (consistency_level.isDatacenterLocal())
{
// block for in this context will be localnodes block.
- return new DatacenterWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getTable(), callback, writeType);
+ return new DatacenterWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType);
}
- else if (consistency_level == ConsistencyLevel.EACH_QUORUM)
+ else if (consistency_level == ConsistencyLevel.EACH_QUORUM && (this instanceof NetworkTopologyStrategy))
{
- return new DatacenterSyncWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getTable(), callback, writeType);
+ return new DatacenterSyncWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType);
}
- return new WriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getTable(), callback, writeType);
+ return new WriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType);
}
- private Table getTable()
+ private Keyspace getKeyspace()
{
- if (table == null)
- table = Table.open(tableName);
- return table;
+ if (keyspace == null)
+ keyspace = Keyspace.open(keyspaceName);
+ return keyspace;
}
/**