You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/09/13 21:35:24 UTC

git commit: (cql3) Allow defining default consistency levels

Updated Branches:
  refs/heads/trunk 75c4cfa5c -> aa8989d9b


(cql3) Allow defining default consistency levels

patch by slebresne; reviewed by yukim for CASSANDRA-4448


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/aa8989d9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/aa8989d9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/aa8989d9

Branch: refs/heads/trunk
Commit: aa8989d9b98f35a95c988f0b664a0b4ffd232bac
Parents: 75c4cfa
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu Sep 13 21:34:34 2012 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Sep 13 21:34:34 2012 +0200

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../org/apache/cassandra/config/CFMetaData.java    |   42 ++++++++++++++-
 src/java/org/apache/cassandra/cql3/CFPropDefs.java |   43 +++++++++++++++
 src/java/org/apache/cassandra/cql3/Cql.g           |    2 +-
 .../cassandra/cql3/statements/BatchStatement.java  |   27 +++++++++-
 .../cql3/statements/ModificationStatement.java     |    9 +++-
 .../cassandra/cql3/statements/SelectStatement.java |   12 +++-
 7 files changed, 127 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa8989d9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 371af7c..50ecbd1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -62,6 +62,7 @@
  * fix counter add/get using CQL2 and CQL3 in stress tool (CASSANDRA-4633)
  * Add sstable count per level to cfstats (CASSANDRA-4537)
  * (cql3) Add ALTER KEYSPACE statement (CASSANDRA-4611)
+ * (cql3) Allow defining default consistency levels (CASSANDRA-4448)
 
 
 1.1.6

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa8989d9/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index cb8951c..adbd853 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -131,6 +131,8 @@ public final class CFMetaData
                                                                      + "value_alias text,"
                                                                      + "column_aliases text,"
                                                                      + "compaction_strategy_options text,"
+                                                                     + "default_read_consistency text,"
+                                                                     + "default_write_consistency text,"
                                                                      + "PRIMARY KEY (keyspace_name, columnfamily_name)"
                                                                      + ") WITH COMMENT='ColumnFamily definitions' AND gc_grace_seconds=8640");
     public static final CFMetaData SchemaColumnsCf = compile(10, "CREATE TABLE " + SystemTable.SCHEMA_COLUMNS_CF + "("
@@ -245,6 +247,11 @@ public final class CFMetaData
 
     public CompressionParameters compressionParameters;
 
+    // Default consistency levels for CQL3. The default for those values is ONE,
+    // but we keep the internal default to null as it help handling thrift compatibility
+    private ConsistencyLevel readConsistencyLevel;
+    private ConsistencyLevel writeConsistencyLevel;
+
     // Processed infos used by CQL. This can be fully reconstructed from the CFMedata,
     // so it's not saved on disk. It is however costlyish to recreate for each query
     // so we cache it here (and update on each relevant CFMetadata change)
@@ -268,6 +275,8 @@ public final class CFMetaData
     public CFMetaData compressionParameters(CompressionParameters prop) {compressionParameters = prop; return this;}
     public CFMetaData bloomFilterFpChance(Double prop) {bloomFilterFpChance = prop; return this;}
     public CFMetaData caching(Caching prop) {caching = prop; return this;}
+    public CFMetaData defaultReadCL(ConsistencyLevel prop) {readConsistencyLevel = prop; return this;}
+    public CFMetaData defaultWriteCL(ConsistencyLevel prop) {writeConsistencyLevel = prop; return this;}
 
     public CFMetaData(String keyspace, String name, ColumnFamilyType type, AbstractType<?> comp, AbstractType<?> subcc)
     {
@@ -433,7 +442,9 @@ public final class CFMetaData
                       .compactionStrategyOptions(oldCFMD.compactionStrategyOptions)
                       .compressionParameters(oldCFMD.compressionParameters)
                       .bloomFilterFpChance(oldCFMD.bloomFilterFpChance)
-                      .caching(oldCFMD.caching);
+                      .caching(oldCFMD.caching)
+                      .defaultReadCL(oldCFMD.readConsistencyLevel)
+                      .defaultWriteCL(oldCFMD.writeConsistencyLevel);
     }
 
     /**
@@ -519,6 +530,16 @@ public final class CFMetaData
         return valueAlias;
     }
 
+    public ConsistencyLevel getReadConsistencyLevel()
+    {
+        return readConsistencyLevel == null ? ConsistencyLevel.ONE : readConsistencyLevel;
+    }
+
+    public ConsistencyLevel getWriteConsistencyLevel()
+    {
+        return writeConsistencyLevel == null ? ConsistencyLevel.ONE : writeConsistencyLevel;
+    }
+
     public CompressionParameters compressionParameters()
     {
         return compressionParameters;
@@ -581,6 +602,8 @@ public final class CFMetaData
             .append(compressionParameters, rhs.compressionParameters)
             .append(bloomFilterFpChance, rhs.bloomFilterFpChance)
             .append(caching, rhs.caching)
+            .append(readConsistencyLevel, rhs.readConsistencyLevel)
+            .append(writeConsistencyLevel, rhs.writeConsistencyLevel)
             .isEquals();
     }
 
@@ -611,6 +634,8 @@ public final class CFMetaData
             .append(compressionParameters)
             .append(bloomFilterFpChance)
             .append(caching)
+            .append(readConsistencyLevel)
+            .append(writeConsistencyLevel)
             .toHashCode();
     }
 
@@ -799,6 +824,11 @@ public final class CFMetaData
         }
         if (cfm.valueAlias != null)
             valueAlias = cfm.valueAlias;
+        if (cfm.readConsistencyLevel != null)
+            readConsistencyLevel = cfm.readConsistencyLevel;
+        if (cfm.writeConsistencyLevel != null)
+            writeConsistencyLevel = cfm.writeConsistencyLevel;
+
         bloomFilterFpChance = cfm.bloomFilterFpChance;
         caching = cfm.caching;
 
@@ -1258,6 +1288,10 @@ public final class CFMetaData
                                         : Column.create(valueAlias, timestamp, cfName, "value_alias"));
         cf.addColumn(Column.create(json(aliasesAsStrings(columnAliases)), timestamp, cfName, "column_aliases"));
         cf.addColumn(Column.create(json(compactionStrategyOptions), timestamp, cfName, "compaction_strategy_options"));
+        cf.addColumn(readConsistencyLevel == null ? DeletedColumn.create(ldt, timestamp, cfName, "default_read_consistency")
+                                                  : Column.create(readConsistencyLevel.toString(), timestamp, cfName, "default_read_consistency"));
+        cf.addColumn(writeConsistencyLevel == null ? DeletedColumn.create(ldt, timestamp, cfName, "default_write_consistency")
+                                                   : Column.create(writeConsistencyLevel.toString(), timestamp, cfName, "default_write_consistency"));
     }
 
     // Package protected for use by tests
@@ -1302,6 +1336,10 @@ public final class CFMetaData
             if (result.has("value_alias"))
                 cfm.valueAlias(result.getBytes("value_alias"));
             cfm.compactionStrategyOptions(fromJsonMap(result.getString("compaction_strategy_options")));
+            if (result.has("default_read_consistency"))
+                cfm.defaultReadCL(Enum.valueOf(ConsistencyLevel.class, result.getString("default_read_consistency")));
+            if (result.has("default_write_consistency"))
+                cfm.defaultWriteCL(Enum.valueOf(ConsistencyLevel.class, result.getString("default_write_consistency")));
 
             return cfm;
         }
@@ -1465,6 +1503,8 @@ public final class CFMetaData
             .append("compressionOptions", compressionParameters.asThriftOptions())
             .append("bloomFilterFpChance", bloomFilterFpChance)
             .append("caching", caching)
+            .append("readConsistencyLevel", readConsistencyLevel)
+            .append("writeConsistencyLevel", writeConsistencyLevel)
             .toString();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa8989d9/src/java/org/apache/cassandra/cql3/CFPropDefs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CFPropDefs.java b/src/java/org/apache/cassandra/cql3/CFPropDefs.java
index fb5f365..cb78db0 100644
--- a/src/java/org/apache/cassandra/cql3/CFPropDefs.java
+++ b/src/java/org/apache/cassandra/cql3/CFPropDefs.java
@@ -30,6 +30,8 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
 import org.apache.cassandra.io.compress.CompressionParameters;
 
@@ -46,6 +48,9 @@ public class CFPropDefs extends PropertyDefinitions
     public static final String KW_REPLICATEONWRITE = "replicate_on_write";
     public static final String KW_CACHING = "caching";
     public static final String KW_BF_FP_CHANCE = "bloom_filter_fp_chance";
+    public static final String KW_DEFAULT_R_CONSISTENCY = "default_read_consistency";
+    public static final String KW_DEFAULT_W_CONSISTENCY = "default_write_consistency";
+
     public static final String KW_COMPACTION = "compaction";
     public static final String KW_COMPRESSION = "compression";
 
@@ -65,6 +70,8 @@ public class CFPropDefs extends PropertyDefinitions
         keywords.add(KW_BF_FP_CHANCE);
         keywords.add(KW_COMPACTION);
         keywords.add(KW_COMPRESSION);
+        keywords.add(KW_DEFAULT_W_CONSISTENCY);
+        keywords.add(KW_DEFAULT_R_CONSISTENCY);
 
         obsoleteKeywords.add("compaction_strategy_class");
         obsoleteKeywords.add("compaction_strategy_options");
@@ -136,6 +143,42 @@ public class CFPropDefs extends PropertyDefinitions
 
         if (!getCompressionOptions().isEmpty())
             cfm.compressionParameters(CompressionParameters.create(getCompressionOptions()));
+
+        try
+        {
+            ConsistencyLevel readCL = getConsistencyLevel(KW_DEFAULT_R_CONSISTENCY);
+            if (readCL != null)
+            {
+                readCL.validateForRead(cfm.ksName);
+                cfm.defaultReadCL(readCL);
+            }
+            ConsistencyLevel writeCL = getConsistencyLevel(KW_DEFAULT_W_CONSISTENCY);
+            if (writeCL != null)
+            {
+                writeCL.validateForWrite(cfm.ksName);
+                cfm.defaultWriteCL(writeCL);
+            }
+        }
+        catch (InvalidRequestException e)
+        {
+            throw new ConfigurationException(e.getMessage(), e.getCause());
+        }
+    }
+
+    public ConsistencyLevel getConsistencyLevel(String key) throws ConfigurationException, SyntaxException
+    {
+        String value = getSimple(key);
+        if (value == null)
+            return null;
+
+        try
+        {
+            return Enum.valueOf(ConsistencyLevel.class, value);
+        }
+        catch (IllegalArgumentException e)
+        {
+            throw new ConfigurationException(String.format("Invalid consistency level value: %s", value));
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa8989d9/src/java/org/apache/cassandra/cql3/Cql.g
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Cql.g b/src/java/org/apache/cassandra/cql3/Cql.g
index 01dbafd..1379b9a 100644
--- a/src/java/org/apache/cassandra/cql3/Cql.g
+++ b/src/java/org/apache/cassandra/cql3/Cql.g
@@ -189,7 +189,7 @@ useStatement returns [UseStatement stmt]
 selectStatement returns [SelectStatement.RawStatement expr]
     @init {
         boolean isCount = false;
-        ConsistencyLevel cLevel = ConsistencyLevel.ONE;
+        ConsistencyLevel cLevel = null;
         int limit = 10000;
         Map<ColumnIdentifier, Boolean> orderings = new LinkedHashMap<ColumnIdentifier, Boolean>();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa8989d9/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index ac78c89..246a97b 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -23,9 +23,10 @@ import java.util.concurrent.TimeoutException;
 
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.CounterMutation;
 import org.apache.cassandra.db.IMutation;
 import org.apache.cassandra.db.RowMutation;
-import org.apache.cassandra.db.CounterMutation;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.thrift.RequestType;
@@ -76,11 +77,21 @@ public class BatchStatement extends ModificationStatement
         }
     }
 
+    @Override
+    public ConsistencyLevel getConsistencyLevel()
+    {
+        // We have validated that either the consistency is set, or all statements have the same default CL (see validate())
+        return isSetConsistencyLevel()
+             ? super.getConsistencyLevel()
+             : (statements.isEmpty() ? ConsistencyLevel.ONE : statements.get(0).getConsistencyLevel());
+    }
+
     public void validate(ClientState state) throws InvalidRequestException
     {
         if (getTimeToLive() != 0)
             throw new InvalidRequestException("Global TTL on the BATCH statement is not supported.");
 
+        ConsistencyLevel cLevel = null;
         for (ModificationStatement statement : statements)
         {
             if (statement.isSetConsistencyLevel())
@@ -92,7 +103,19 @@ public class BatchStatement extends ModificationStatement
             if (statement.getTimeToLive() < 0)
                 throw new InvalidRequestException("A TTL must be greater or equal to 0");
 
-            getConsistencyLevel().validateForWrite(statement.keyspace());
+            if (isSetConsistencyLevel())
+            {
+                getConsistencyLevel().validateForWrite(statement.keyspace());
+            }
+            else
+            {
+                // If no consistency is set for the batch, we need all the CF in the batch to have the same default consistency level,
+                // otherwise the batch is invalid (i.e. the user must explicitely set the CL)
+                ConsistencyLevel stmtCL = statement.getConsistencyLevel();
+                if (cLevel != null && cLevel != stmtCL)
+                    throw new InvalidRequestException("The tables involved in the BATCH have different default write consistency, you must explicitely set the BATCH consitency level with USING CONSISTENCY");
+                cLevel = stmtCL;
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa8989d9/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 291ecd9..b960704 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -23,6 +23,8 @@ import java.nio.ByteBuffer;
 import java.util.*;
 
 import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.db.*;
@@ -31,7 +33,6 @@ import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.StorageProxy;
-import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.thrift.RequestType;
 import org.apache.cassandra.thrift.ThriftValidation;
 
@@ -80,7 +81,11 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt
 
     public ConsistencyLevel getConsistencyLevel()
     {
-        return (cLevel != null) ? cLevel : defaultConsistency;
+        if (cLevel != null)
+            return cLevel;
+
+        CFMetaData cfm = Schema.instance.getCFMetaData(keyspace(), columnFamily());
+        return cfm == null ? ConsistencyLevel.ONE : cfm.getWriteConsistencyLevel();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa8989d9/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 07c8453..1c2b631 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -180,7 +180,7 @@ public class SelectStatement implements CQLStatement
 
         try
         {
-            return StorageProxy.read(commands, parameters.consistencyLevel);
+            return StorageProxy.read(commands, getConsistencyLevel());
         }
         catch (IOException e)
         {
@@ -205,7 +205,7 @@ public class SelectStatement implements CQLStatement
                                                                     getLimit(),
                                                                     true, // limit by columns, not keys
                                                                     false),
-                                              parameters.consistencyLevel);
+                                              getConsistencyLevel());
         }
         catch (IOException e)
         {
@@ -294,6 +294,11 @@ public class SelectStatement implements CQLStatement
         return sliceRestriction != null && !sliceRestriction.isInclusive(Bound.START) ? parameters.limit + 1 : parameters.limit;
     }
 
+    private ConsistencyLevel getConsistencyLevel()
+    {
+        return parameters.consistencyLevel == null ? cfDef.cfm.getReadConsistencyLevel() : parameters.consistencyLevel;
+    }
+
     private Collection<ByteBuffer> getKeys(final List<ByteBuffer> variables) throws InvalidRequestException
     {
         List<ByteBuffer> keys = new ArrayList<ByteBuffer>();
@@ -917,7 +922,8 @@ public class SelectStatement implements CQLStatement
         public ParsedStatement.Prepared prepare() throws InvalidRequestException
         {
             CFMetaData cfm = ThriftValidation.validateColumnFamily(keyspace(), columnFamily());
-            parameters.consistencyLevel.validateForRead(keyspace());
+            if (parameters.consistencyLevel != null)
+                parameters.consistencyLevel.validateForRead(keyspace());
 
             if (parameters.limit <= 0)
                 throw new InvalidRequestException("LIMIT must be strictly positive");