You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/09/13 21:35:24 UTC
git commit: (cql3) Allow defining default consistency levels
Updated Branches:
refs/heads/trunk 75c4cfa5c -> aa8989d9b
(cql3) Allow defining default consistency levels
patch by slebresne; reviewed by yukim for CASSANDRA-4448
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/aa8989d9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/aa8989d9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/aa8989d9
Branch: refs/heads/trunk
Commit: aa8989d9b98f35a95c988f0b664a0b4ffd232bac
Parents: 75c4cfa
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu Sep 13 21:34:34 2012 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Sep 13 21:34:34 2012 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/config/CFMetaData.java | 42 ++++++++++++++-
src/java/org/apache/cassandra/cql3/CFPropDefs.java | 43 +++++++++++++++
src/java/org/apache/cassandra/cql3/Cql.g | 2 +-
.../cassandra/cql3/statements/BatchStatement.java | 27 +++++++++-
.../cql3/statements/ModificationStatement.java | 9 +++-
.../cassandra/cql3/statements/SelectStatement.java | 12 +++-
7 files changed, 127 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa8989d9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 371af7c..50ecbd1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -62,6 +62,7 @@
* fix counter add/get using CQL2 and CQL3 in stress tool (CASSANDRA-4633)
* Add sstable count per level to cfstats (CASSANDRA-4537)
* (cql3) Add ALTER KEYSPACE statement (CASSANDRA-4611)
+ * (cql3) Allow defining default consistency levels (CASSANDRA-4448)
1.1.6
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa8989d9/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index cb8951c..adbd853 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -131,6 +131,8 @@ public final class CFMetaData
+ "value_alias text,"
+ "column_aliases text,"
+ "compaction_strategy_options text,"
+ + "default_read_consistency text,"
+ + "default_write_consistency text,"
+ "PRIMARY KEY (keyspace_name, columnfamily_name)"
+ ") WITH COMMENT='ColumnFamily definitions' AND gc_grace_seconds=8640");
public static final CFMetaData SchemaColumnsCf = compile(10, "CREATE TABLE " + SystemTable.SCHEMA_COLUMNS_CF + "("
@@ -245,6 +247,11 @@ public final class CFMetaData
public CompressionParameters compressionParameters;
+ // Default consistency levels for CQL3. The default for those values is ONE,
+ // but we keep the internal default to null as it help handling thrift compatibility
+ private ConsistencyLevel readConsistencyLevel;
+ private ConsistencyLevel writeConsistencyLevel;
+
// Processed infos used by CQL. This can be fully reconstructed from the CFMedata,
// so it's not saved on disk. It is however costlyish to recreate for each query
// so we cache it here (and update on each relevant CFMetadata change)
@@ -268,6 +275,8 @@ public final class CFMetaData
public CFMetaData compressionParameters(CompressionParameters prop) {compressionParameters = prop; return this;}
public CFMetaData bloomFilterFpChance(Double prop) {bloomFilterFpChance = prop; return this;}
public CFMetaData caching(Caching prop) {caching = prop; return this;}
+ public CFMetaData defaultReadCL(ConsistencyLevel prop) {readConsistencyLevel = prop; return this;}
+ public CFMetaData defaultWriteCL(ConsistencyLevel prop) {writeConsistencyLevel = prop; return this;}
public CFMetaData(String keyspace, String name, ColumnFamilyType type, AbstractType<?> comp, AbstractType<?> subcc)
{
@@ -433,7 +442,9 @@ public final class CFMetaData
.compactionStrategyOptions(oldCFMD.compactionStrategyOptions)
.compressionParameters(oldCFMD.compressionParameters)
.bloomFilterFpChance(oldCFMD.bloomFilterFpChance)
- .caching(oldCFMD.caching);
+ .caching(oldCFMD.caching)
+ .defaultReadCL(oldCFMD.readConsistencyLevel)
+ .defaultWriteCL(oldCFMD.writeConsistencyLevel);
}
/**
@@ -519,6 +530,16 @@ public final class CFMetaData
return valueAlias;
}
+ public ConsistencyLevel getReadConsistencyLevel()
+ {
+ return readConsistencyLevel == null ? ConsistencyLevel.ONE : readConsistencyLevel;
+ }
+
+ public ConsistencyLevel getWriteConsistencyLevel()
+ {
+ return writeConsistencyLevel == null ? ConsistencyLevel.ONE : writeConsistencyLevel;
+ }
+
public CompressionParameters compressionParameters()
{
return compressionParameters;
@@ -581,6 +602,8 @@ public final class CFMetaData
.append(compressionParameters, rhs.compressionParameters)
.append(bloomFilterFpChance, rhs.bloomFilterFpChance)
.append(caching, rhs.caching)
+ .append(readConsistencyLevel, rhs.readConsistencyLevel)
+ .append(writeConsistencyLevel, rhs.writeConsistencyLevel)
.isEquals();
}
@@ -611,6 +634,8 @@ public final class CFMetaData
.append(compressionParameters)
.append(bloomFilterFpChance)
.append(caching)
+ .append(readConsistencyLevel)
+ .append(writeConsistencyLevel)
.toHashCode();
}
@@ -799,6 +824,11 @@ public final class CFMetaData
}
if (cfm.valueAlias != null)
valueAlias = cfm.valueAlias;
+ if (cfm.readConsistencyLevel != null)
+ readConsistencyLevel = cfm.readConsistencyLevel;
+ if (cfm.writeConsistencyLevel != null)
+ writeConsistencyLevel = cfm.writeConsistencyLevel;
+
bloomFilterFpChance = cfm.bloomFilterFpChance;
caching = cfm.caching;
@@ -1258,6 +1288,10 @@ public final class CFMetaData
: Column.create(valueAlias, timestamp, cfName, "value_alias"));
cf.addColumn(Column.create(json(aliasesAsStrings(columnAliases)), timestamp, cfName, "column_aliases"));
cf.addColumn(Column.create(json(compactionStrategyOptions), timestamp, cfName, "compaction_strategy_options"));
+ cf.addColumn(readConsistencyLevel == null ? DeletedColumn.create(ldt, timestamp, cfName, "default_read_consistency")
+ : Column.create(readConsistencyLevel.toString(), timestamp, cfName, "default_read_consistency"));
+ cf.addColumn(writeConsistencyLevel == null ? DeletedColumn.create(ldt, timestamp, cfName, "default_write_consistency")
+ : Column.create(writeConsistencyLevel.toString(), timestamp, cfName, "default_write_consistency"));
}
// Package protected for use by tests
@@ -1302,6 +1336,10 @@ public final class CFMetaData
if (result.has("value_alias"))
cfm.valueAlias(result.getBytes("value_alias"));
cfm.compactionStrategyOptions(fromJsonMap(result.getString("compaction_strategy_options")));
+ if (result.has("default_read_consistency"))
+ cfm.defaultReadCL(Enum.valueOf(ConsistencyLevel.class, result.getString("default_read_consistency")));
+ if (result.has("default_write_consistency"))
+ cfm.defaultWriteCL(Enum.valueOf(ConsistencyLevel.class, result.getString("default_write_consistency")));
return cfm;
}
@@ -1465,6 +1503,8 @@ public final class CFMetaData
.append("compressionOptions", compressionParameters.asThriftOptions())
.append("bloomFilterFpChance", bloomFilterFpChance)
.append("caching", caching)
+ .append("readConsistencyLevel", readConsistencyLevel)
+ .append("writeConsistencyLevel", writeConsistencyLevel)
.toString();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa8989d9/src/java/org/apache/cassandra/cql3/CFPropDefs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CFPropDefs.java b/src/java/org/apache/cassandra/cql3/CFPropDefs.java
index fb5f365..cb78db0 100644
--- a/src/java/org/apache/cassandra/cql3/CFPropDefs.java
+++ b/src/java/org/apache/cassandra/cql3/CFPropDefs.java
@@ -30,6 +30,8 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.SyntaxException;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
import org.apache.cassandra.io.compress.CompressionParameters;
@@ -46,6 +48,9 @@ public class CFPropDefs extends PropertyDefinitions
public static final String KW_REPLICATEONWRITE = "replicate_on_write";
public static final String KW_CACHING = "caching";
public static final String KW_BF_FP_CHANCE = "bloom_filter_fp_chance";
+ public static final String KW_DEFAULT_R_CONSISTENCY = "default_read_consistency";
+ public static final String KW_DEFAULT_W_CONSISTENCY = "default_write_consistency";
+
public static final String KW_COMPACTION = "compaction";
public static final String KW_COMPRESSION = "compression";
@@ -65,6 +70,8 @@ public class CFPropDefs extends PropertyDefinitions
keywords.add(KW_BF_FP_CHANCE);
keywords.add(KW_COMPACTION);
keywords.add(KW_COMPRESSION);
+ keywords.add(KW_DEFAULT_W_CONSISTENCY);
+ keywords.add(KW_DEFAULT_R_CONSISTENCY);
obsoleteKeywords.add("compaction_strategy_class");
obsoleteKeywords.add("compaction_strategy_options");
@@ -136,6 +143,42 @@ public class CFPropDefs extends PropertyDefinitions
if (!getCompressionOptions().isEmpty())
cfm.compressionParameters(CompressionParameters.create(getCompressionOptions()));
+
+ try
+ {
+ ConsistencyLevel readCL = getConsistencyLevel(KW_DEFAULT_R_CONSISTENCY);
+ if (readCL != null)
+ {
+ readCL.validateForRead(cfm.ksName);
+ cfm.defaultReadCL(readCL);
+ }
+ ConsistencyLevel writeCL = getConsistencyLevel(KW_DEFAULT_W_CONSISTENCY);
+ if (writeCL != null)
+ {
+ writeCL.validateForWrite(cfm.ksName);
+ cfm.defaultWriteCL(writeCL);
+ }
+ }
+ catch (InvalidRequestException e)
+ {
+ throw new ConfigurationException(e.getMessage(), e.getCause());
+ }
+ }
+
+ public ConsistencyLevel getConsistencyLevel(String key) throws ConfigurationException, SyntaxException
+ {
+ String value = getSimple(key);
+ if (value == null)
+ return null;
+
+ try
+ {
+ return Enum.valueOf(ConsistencyLevel.class, value);
+ }
+ catch (IllegalArgumentException e)
+ {
+ throw new ConfigurationException(String.format("Invalid consistency level value: %s", value));
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa8989d9/src/java/org/apache/cassandra/cql3/Cql.g
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Cql.g b/src/java/org/apache/cassandra/cql3/Cql.g
index 01dbafd..1379b9a 100644
--- a/src/java/org/apache/cassandra/cql3/Cql.g
+++ b/src/java/org/apache/cassandra/cql3/Cql.g
@@ -189,7 +189,7 @@ useStatement returns [UseStatement stmt]
selectStatement returns [SelectStatement.RawStatement expr]
@init {
boolean isCount = false;
- ConsistencyLevel cLevel = ConsistencyLevel.ONE;
+ ConsistencyLevel cLevel = null;
int limit = 10000;
Map<ColumnIdentifier, Boolean> orderings = new LinkedHashMap<ColumnIdentifier, Boolean>();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa8989d9/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index ac78c89..246a97b 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -23,9 +23,10 @@ import java.util.concurrent.TimeoutException;
import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.CounterMutation;
import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.db.RowMutation;
-import org.apache.cassandra.db.CounterMutation;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.thrift.RequestType;
@@ -76,11 +77,21 @@ public class BatchStatement extends ModificationStatement
}
}
+ @Override
+ public ConsistencyLevel getConsistencyLevel()
+ {
+ // We have validated that either the consistency is set, or all statements have the same default CL (see validate())
+ return isSetConsistencyLevel()
+ ? super.getConsistencyLevel()
+ : (statements.isEmpty() ? ConsistencyLevel.ONE : statements.get(0).getConsistencyLevel());
+ }
+
public void validate(ClientState state) throws InvalidRequestException
{
if (getTimeToLive() != 0)
throw new InvalidRequestException("Global TTL on the BATCH statement is not supported.");
+ ConsistencyLevel cLevel = null;
for (ModificationStatement statement : statements)
{
if (statement.isSetConsistencyLevel())
@@ -92,7 +103,19 @@ public class BatchStatement extends ModificationStatement
if (statement.getTimeToLive() < 0)
throw new InvalidRequestException("A TTL must be greater or equal to 0");
- getConsistencyLevel().validateForWrite(statement.keyspace());
+ if (isSetConsistencyLevel())
+ {
+ getConsistencyLevel().validateForWrite(statement.keyspace());
+ }
+ else
+ {
+ // If no consistency is set for the batch, we need all the CF in the batch to have the same default consistency level,
+ // otherwise the batch is invalid (i.e. the user must explicitely set the CL)
+ ConsistencyLevel stmtCL = statement.getConsistencyLevel();
+ if (cLevel != null && cLevel != stmtCL)
+ throw new InvalidRequestException("The tables involved in the BATCH have different default write consistency, you must explicitely set the BATCH consitency level with USING CONSISTENCY");
+ cLevel = stmtCL;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa8989d9/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 291ecd9..b960704 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -23,6 +23,8 @@ import java.nio.ByteBuffer;
import java.util.*;
import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
import org.apache.cassandra.cql3.*;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.db.*;
@@ -31,7 +33,6 @@ import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.StorageProxy;
-import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.thrift.RequestType;
import org.apache.cassandra.thrift.ThriftValidation;
@@ -80,7 +81,11 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt
public ConsistencyLevel getConsistencyLevel()
{
- return (cLevel != null) ? cLevel : defaultConsistency;
+ if (cLevel != null)
+ return cLevel;
+
+ CFMetaData cfm = Schema.instance.getCFMetaData(keyspace(), columnFamily());
+ return cfm == null ? ConsistencyLevel.ONE : cfm.getWriteConsistencyLevel();
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa8989d9/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 07c8453..1c2b631 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -180,7 +180,7 @@ public class SelectStatement implements CQLStatement
try
{
- return StorageProxy.read(commands, parameters.consistencyLevel);
+ return StorageProxy.read(commands, getConsistencyLevel());
}
catch (IOException e)
{
@@ -205,7 +205,7 @@ public class SelectStatement implements CQLStatement
getLimit(),
true, // limit by columns, not keys
false),
- parameters.consistencyLevel);
+ getConsistencyLevel());
}
catch (IOException e)
{
@@ -294,6 +294,11 @@ public class SelectStatement implements CQLStatement
return sliceRestriction != null && !sliceRestriction.isInclusive(Bound.START) ? parameters.limit + 1 : parameters.limit;
}
+ private ConsistencyLevel getConsistencyLevel()
+ {
+ return parameters.consistencyLevel == null ? cfDef.cfm.getReadConsistencyLevel() : parameters.consistencyLevel;
+ }
+
private Collection<ByteBuffer> getKeys(final List<ByteBuffer> variables) throws InvalidRequestException
{
List<ByteBuffer> keys = new ArrayList<ByteBuffer>();
@@ -917,7 +922,8 @@ public class SelectStatement implements CQLStatement
public ParsedStatement.Prepared prepare() throws InvalidRequestException
{
CFMetaData cfm = ThriftValidation.validateColumnFamily(keyspace(), columnFamily());
- parameters.consistencyLevel.validateForRead(keyspace());
+ if (parameters.consistencyLevel != null)
+ parameters.consistencyLevel.validateForRead(keyspace());
if (parameters.limit <= 0)
throw new InvalidRequestException("LIMIT must be strictly positive");