You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/10/16 18:00:44 UTC
[3/3] Move consistency level to the protocol level
http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
----------------------------------------------------------------------
diff --git a/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java b/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
index 1810548..8721462 100644
--- a/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
+++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
@@ -44,6 +44,6 @@ import org.slf4j.LoggerFactory;
public class Constants {
- public static final String VERSION = "19.34.0";
+ public static final String VERSION = "19.35.0";
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/interface/thrift/gen-java/org/apache/cassandra/thrift/TimedOutException.java
----------------------------------------------------------------------
diff --git a/interface/thrift/gen-java/org/apache/cassandra/thrift/TimedOutException.java b/interface/thrift/gen-java/org/apache/cassandra/thrift/TimedOutException.java
index 0a53222..8c793c2 100644
--- a/interface/thrift/gen-java/org/apache/cassandra/thrift/TimedOutException.java
+++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/TimedOutException.java
@@ -461,8 +461,6 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
try {
- // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
- __isset_bit_vector = new BitSet(1);
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index fe44b54..fb5681b 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -259,11 +259,6 @@ public final class CFMetaData
public volatile CompressionParameters compressionParameters;
- // Default consistency levels for CQL3. The default for those values is ONE,
- // but we keep the internal default to null as it help handling thrift compatibility
- private volatile ConsistencyLevel readConsistencyLevel;
- private volatile ConsistencyLevel writeConsistencyLevel;
-
// Processed infos used by CQL. This can be fully reconstructed from the CFMedata,
// so it's not saved on disk. It is however costlyish to recreate for each query
// so we cache it here (and update on each relevant CFMetadata change)
@@ -287,8 +282,6 @@ public final class CFMetaData
public CFMetaData compressionParameters(CompressionParameters prop) {compressionParameters = prop; return this;}
public CFMetaData bloomFilterFpChance(Double prop) {bloomFilterFpChance = prop; return this;}
public CFMetaData caching(Caching prop) {caching = prop; return this;}
- public CFMetaData defaultReadCL(ConsistencyLevel prop) {readConsistencyLevel = prop; return this;}
- public CFMetaData defaultWriteCL(ConsistencyLevel prop) {writeConsistencyLevel = prop; return this;}
public CFMetaData(String keyspace, String name, ColumnFamilyType type, AbstractType<?> comp, AbstractType<?> subcc)
{
@@ -454,9 +447,7 @@ public final class CFMetaData
.compactionStrategyOptions(oldCFMD.compactionStrategyOptions)
.compressionParameters(oldCFMD.compressionParameters)
.bloomFilterFpChance(oldCFMD.bloomFilterFpChance)
- .caching(oldCFMD.caching)
- .defaultReadCL(oldCFMD.readConsistencyLevel)
- .defaultWriteCL(oldCFMD.writeConsistencyLevel);
+ .caching(oldCFMD.caching);
}
/**
@@ -542,16 +533,6 @@ public final class CFMetaData
return valueAlias;
}
- public ConsistencyLevel getReadConsistencyLevel()
- {
- return readConsistencyLevel == null ? ConsistencyLevel.ONE : readConsistencyLevel;
- }
-
- public ConsistencyLevel getWriteConsistencyLevel()
- {
- return writeConsistencyLevel == null ? ConsistencyLevel.ONE : writeConsistencyLevel;
- }
-
public CompressionParameters compressionParameters()
{
return compressionParameters;
@@ -614,8 +595,6 @@ public final class CFMetaData
.append(compressionParameters, rhs.compressionParameters)
.append(bloomFilterFpChance, rhs.bloomFilterFpChance)
.append(caching, rhs.caching)
- .append(readConsistencyLevel, rhs.readConsistencyLevel)
- .append(writeConsistencyLevel, rhs.writeConsistencyLevel)
.isEquals();
}
@@ -646,8 +625,6 @@ public final class CFMetaData
.append(compressionParameters)
.append(bloomFilterFpChance)
.append(caching)
- .append(readConsistencyLevel)
- .append(writeConsistencyLevel)
.toHashCode();
}
@@ -836,10 +813,6 @@ public final class CFMetaData
}
if (cfm.valueAlias != null)
valueAlias = cfm.valueAlias;
- if (cfm.readConsistencyLevel != null)
- readConsistencyLevel = cfm.readConsistencyLevel;
- if (cfm.writeConsistencyLevel != null)
- writeConsistencyLevel = cfm.writeConsistencyLevel;
bloomFilterFpChance = cfm.bloomFilterFpChance;
caching = cfm.caching;
@@ -1331,10 +1304,6 @@ public final class CFMetaData
: Column.create(valueAlias, timestamp, cfName, "value_alias"));
cf.addColumn(Column.create(json(aliasesAsStrings(columnAliases)), timestamp, cfName, "column_aliases"));
cf.addColumn(Column.create(json(compactionStrategyOptions), timestamp, cfName, "compaction_strategy_options"));
- cf.addColumn(readConsistencyLevel == null ? DeletedColumn.create(ldt, timestamp, cfName, "default_read_consistency")
- : Column.create(readConsistencyLevel.toString(), timestamp, cfName, "default_read_consistency"));
- cf.addColumn(writeConsistencyLevel == null ? DeletedColumn.create(ldt, timestamp, cfName, "default_write_consistency")
- : Column.create(writeConsistencyLevel.toString(), timestamp, cfName, "default_write_consistency"));
}
// Package protected for use by tests
@@ -1379,10 +1348,6 @@ public final class CFMetaData
if (result.has("value_alias"))
cfm.valueAlias(result.getBytes("value_alias"));
cfm.compactionStrategyOptions(fromJsonMap(result.getString("compaction_strategy_options")));
- if (result.has("default_read_consistency"))
- cfm.defaultReadCL(Enum.valueOf(ConsistencyLevel.class, result.getString("default_read_consistency")));
- if (result.has("default_write_consistency"))
- cfm.defaultWriteCL(Enum.valueOf(ConsistencyLevel.class, result.getString("default_write_consistency")));
return cfm;
}
@@ -1546,8 +1511,6 @@ public final class CFMetaData
.append("compressionOptions", compressionParameters.asThriftOptions())
.append("bloomFilterFpChance", bloomFilterFpChance)
.append("caching", caching)
- .append("readConsistencyLevel", readConsistencyLevel)
- .append("writeConsistencyLevel", writeConsistencyLevel)
.toString();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/cql3/Attributes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Attributes.java b/src/java/org/apache/cassandra/cql3/Attributes.java
index 00fd30b..6733d0d 100644
--- a/src/java/org/apache/cassandra/cql3/Attributes.java
+++ b/src/java/org/apache/cassandra/cql3/Attributes.java
@@ -25,7 +25,6 @@ import org.apache.cassandra.db.ConsistencyLevel;
*/
public class Attributes
{
- public ConsistencyLevel cLevel;
public Long timestamp;
public int timeToLive;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/cql3/CFPropDefs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CFPropDefs.java b/src/java/org/apache/cassandra/cql3/CFPropDefs.java
index cb78db0..53a5bc6 100644
--- a/src/java/org/apache/cassandra/cql3/CFPropDefs.java
+++ b/src/java/org/apache/cassandra/cql3/CFPropDefs.java
@@ -48,8 +48,6 @@ public class CFPropDefs extends PropertyDefinitions
public static final String KW_REPLICATEONWRITE = "replicate_on_write";
public static final String KW_CACHING = "caching";
public static final String KW_BF_FP_CHANCE = "bloom_filter_fp_chance";
- public static final String KW_DEFAULT_R_CONSISTENCY = "default_read_consistency";
- public static final String KW_DEFAULT_W_CONSISTENCY = "default_write_consistency";
public static final String KW_COMPACTION = "compaction";
public static final String KW_COMPRESSION = "compression";
@@ -70,8 +68,6 @@ public class CFPropDefs extends PropertyDefinitions
keywords.add(KW_BF_FP_CHANCE);
keywords.add(KW_COMPACTION);
keywords.add(KW_COMPRESSION);
- keywords.add(KW_DEFAULT_W_CONSISTENCY);
- keywords.add(KW_DEFAULT_R_CONSISTENCY);
obsoleteKeywords.add("compaction_strategy_class");
obsoleteKeywords.add("compaction_strategy_options");
@@ -143,26 +139,6 @@ public class CFPropDefs extends PropertyDefinitions
if (!getCompressionOptions().isEmpty())
cfm.compressionParameters(CompressionParameters.create(getCompressionOptions()));
-
- try
- {
- ConsistencyLevel readCL = getConsistencyLevel(KW_DEFAULT_R_CONSISTENCY);
- if (readCL != null)
- {
- readCL.validateForRead(cfm.ksName);
- cfm.defaultReadCL(readCL);
- }
- ConsistencyLevel writeCL = getConsistencyLevel(KW_DEFAULT_W_CONSISTENCY);
- if (writeCL != null)
- {
- writeCL.validateForWrite(cfm.ksName);
- cfm.defaultWriteCL(writeCL);
- }
- }
- catch (InvalidRequestException e)
- {
- throw new ConfigurationException(e.getMessage(), e.getCause());
- }
}
public ConsistencyLevel getConsistencyLevel(String key) throws ConfigurationException, SyntaxException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/cql3/CQLStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CQLStatement.java b/src/java/org/apache/cassandra/cql3/CQLStatement.java
index 00fb406..90883e6 100644
--- a/src/java/org/apache/cassandra/cql3/CQLStatement.java
+++ b/src/java/org/apache/cassandra/cql3/CQLStatement.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.cql3;
import java.nio.ByteBuffer;
import java.util.List;
+import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.exceptions.*;
@@ -53,7 +54,7 @@ public interface CQLStatement
* @param variables the values for bounded variables. The implementation
* can assume that each bound term have a corresponding value.
*/
- public ResultMessage execute(ClientState state, List<ByteBuffer> variables) throws RequestValidationException, RequestExecutionException;
+ public ResultMessage execute(ConsistencyLevel cl, ClientState state, List<ByteBuffer> variables) throws RequestValidationException, RequestExecutionException;
/**
* Variante of execute used for internal query against the system tables, and thus only query the local node.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/cql3/Cql.g
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Cql.g b/src/java/org/apache/cassandra/cql3/Cql.g
index 320e934..76f2509 100644
--- a/src/java/org/apache/cassandra/cql3/Cql.g
+++ b/src/java/org/apache/cassandra/cql3/Cql.g
@@ -42,7 +42,6 @@ options {
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.SyntaxException;
import org.apache.cassandra.utils.Pair;
- import org.apache.cassandra.db.ConsistencyLevel;
}
@members {
@@ -189,19 +188,16 @@ useStatement returns [UseStatement stmt]
selectStatement returns [SelectStatement.RawStatement expr]
@init {
boolean isCount = false;
- ConsistencyLevel cLevel = null;
int limit = 10000;
Map<ColumnIdentifier, Boolean> orderings = new LinkedHashMap<ColumnIdentifier, Boolean>();
}
: K_SELECT ( sclause=selectClause | (K_COUNT '(' sclause=selectCountClause ')' { isCount = true; }) )
K_FROM cf=columnFamilyName
- ( K_USING K_CONSISTENCY K_LEVEL { cLevel = ConsistencyLevel.valueOf($K_LEVEL.text.toUpperCase()); } )?
( K_WHERE wclause=whereClause )?
( K_ORDER K_BY orderByClause[orderings] ( ',' orderByClause[orderings] )* )?
( K_LIMIT rows=INTEGER { limit = Integer.parseInt($rows.text); } )?
{
- SelectStatement.Parameters params = new SelectStatement.Parameters(cLevel,
- limit,
+ SelectStatement.Parameters params = new SelectStatement.Parameters(limit,
orderings,
isCount);
$expr = new SelectStatement.RawStatement(cf, params, sclause, wclause);
@@ -240,9 +236,8 @@ orderByClause[Map<ColumnIdentifier, Boolean> orderings]
/**
* INSERT INTO <CF> (<column>, <column>, <column>, ...)
* VALUES (<value>, <value>, <value>, ...)
- * USING CONSISTENCY <level> AND TIMESTAMP <long>;
+ * USING TIMESTAMP <long>;
*
- * Consistency level is set to ONE by default
*/
insertStatement returns [UpdateStatement expr]
@init {
@@ -269,8 +264,7 @@ usingClauseDelete[Attributes attrs]
;
usingClauseDeleteObjective[Attributes attrs]
- : K_CONSISTENCY K_LEVEL { attrs.cLevel = ConsistencyLevel.valueOf($K_LEVEL.text.toUpperCase()); }
- | K_TIMESTAMP ts=INTEGER { attrs.timestamp = Long.valueOf($ts.text); }
+ : K_TIMESTAMP ts=INTEGER { attrs.timestamp = Long.valueOf($ts.text); }
;
usingClauseObjective[Attributes attrs]
@@ -727,7 +721,6 @@ collection_type returns [ParsedType pt]
unreserved_keyword returns [String str]
: k=( K_KEY
- | K_CONSISTENCY
| K_CLUSTERING
| K_LEVEL
| K_COUNT
@@ -755,7 +748,6 @@ K_UPDATE: U P D A T E;
K_WITH: W I T H;
K_LIMIT: L I M I T;
K_USING: U S I N G;
-K_CONSISTENCY: C O N S I S T E N C Y;
K_LEVEL: ( O N E
| Q U O R U M
| A L L
http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 58a8f44..ce9dfec 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -122,20 +122,20 @@ public class QueryProcessor
}
}
- private static ResultMessage processStatement(CQLStatement statement, ClientState clientState, List<ByteBuffer> variables)
+ private static ResultMessage processStatement(CQLStatement statement, ConsistencyLevel cl, ClientState clientState, List<ByteBuffer> variables)
throws RequestExecutionException, RequestValidationException
{
statement.checkAccess(clientState);
statement.validate(clientState);
- ResultMessage result = statement.execute(clientState, variables);
+ ResultMessage result = statement.execute(cl, clientState, variables);
return result == null ? ResultMessage.Void.instance() : result;
}
- public static ResultMessage process(String queryString, ClientState clientState)
+ public static ResultMessage process(String queryString, ConsistencyLevel cl, ClientState clientState)
throws RequestExecutionException, RequestValidationException
{
logger.trace("CQL QUERY: {}", queryString);
- return processStatement(getStatement(queryString, clientState).statement, clientState, Collections.<ByteBuffer>emptyList());
+ return processStatement(getStatement(queryString, clientState).statement, cl, clientState, Collections.<ByteBuffer>emptyList());
}
public static UntypedResultSet processInternal(String query)
@@ -210,7 +210,7 @@ public class QueryProcessor
}
}
- public static ResultMessage processPrepared(CQLStatement statement, ClientState clientState, List<ByteBuffer> variables)
+ public static ResultMessage processPrepared(CQLStatement statement, ConsistencyLevel cl, ClientState clientState, List<ByteBuffer> variables)
throws RequestExecutionException, RequestValidationException
{
// Check to see if there are any bound variables to verify
@@ -228,7 +228,7 @@ public class QueryProcessor
logger.trace("[{}] '{}'", i+1, variables.get(i));
}
- return processStatement(statement, clientState, variables);
+ return processStatement(statement, cl, clientState, variables);
}
private static ParsedStatement.Prepared getStatement(String queryStr, ClientState clientState)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 92c708b..41f4f76 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -22,6 +22,7 @@ import java.util.*;
import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.CounterMutation;
import org.apache.cassandra.db.IMutation;
@@ -78,49 +79,28 @@ public class BatchStatement extends ModificationStatement
}
}
- @Override
- public ConsistencyLevel getConsistencyLevel()
- {
- // We have validated that either the consistency is set, or all statements have the same default CL (see validate())
- return isSetConsistencyLevel()
- ? super.getConsistencyLevel()
- : (statements.isEmpty() ? ConsistencyLevel.ONE : statements.get(0).getConsistencyLevel());
- }
-
public void validate(ClientState state) throws InvalidRequestException
{
if (getTimeToLive() != 0)
throw new InvalidRequestException("Global TTL on the BATCH statement is not supported.");
- ConsistencyLevel cLevel = null;
for (ModificationStatement statement : statements)
{
- if (statement.isSetConsistencyLevel())
- throw new InvalidRequestException("Consistency level must be set on the BATCH, not individual statements");
-
if (isSetTimestamp() && statement.isSetTimestamp())
throw new InvalidRequestException("Timestamp must be set either on BATCH or individual statements");
if (statement.getTimeToLive() < 0)
throw new InvalidRequestException("A TTL must be greater or equal to 0");
-
- if (isSetConsistencyLevel())
- {
- getConsistencyLevel().validateForWrite(statement.keyspace());
- }
- else
- {
- // If no consistency is set for the batch, we need all the CF in the batch to have the same default consistency level,
- // otherwise the batch is invalid (i.e. the user must explicitely set the CL)
- ConsistencyLevel stmtCL = statement.getConsistencyLevel();
- if (cLevel != null && cLevel != stmtCL)
- throw new InvalidRequestException("The tables involved in the BATCH have different default write consistency, you must explicitely set the BATCH consitency level with USING CONSISTENCY");
- cLevel = stmtCL;
- }
}
}
- public Collection<? extends IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local)
+ protected void validateConsistency(ConsistencyLevel cl) throws InvalidRequestException
+ {
+ for (ModificationStatement statement : statements)
+ statement.validateConsistency(cl);
+ }
+
+ public Collection<? extends IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local, ConsistencyLevel cl)
throws RequestExecutionException, RequestValidationException
{
Map<Pair<String, ByteBuffer>, IMutation> mutations = new HashMap<Pair<String, ByteBuffer>, IMutation>();
@@ -130,7 +110,7 @@ public class BatchStatement extends ModificationStatement
statement.setTimestamp(getTimestamp(clientState));
// Group mutation together, otherwise they won't get applied atomically
- for (IMutation m : statement.getMutations(clientState, variables, local))
+ for (IMutation m : statement.getMutations(clientState, variables, local, cl))
{
if (m instanceof CounterMutation && type != Type.COUNTER)
throw new InvalidRequestException("Counter mutations are only allowed in COUNTER batches");
@@ -169,6 +149,6 @@ public class BatchStatement extends ModificationStatement
public String toString()
{
- return String.format("BatchStatement(type=%s, statements=%s, consistency=%s)", type, statements, getConsistencyLevel());
+ return String.format("BatchStatement(type=%s, statements=%s)", type, statements);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
index 86af858..c7d32b1 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.cql3.operations.ListOperation;
import org.apache.cassandra.cql3.operations.MapOperation;
import org.apache.cassandra.cql3.operations.Operation;
import org.apache.cassandra.cql3.operations.SetOperation;
+import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.DeletionInfo;
import org.apache.cassandra.db.RowMutation;
@@ -60,7 +61,15 @@ public class DeleteStatement extends ModificationStatement
this.toRemove = new ArrayList<Pair<CFDefinition.Name, Term>>(columns.size());
}
- public Collection<RowMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local)
+ protected void validateConsistency(ConsistencyLevel cl) throws InvalidRequestException
+ {
+ if (type == Type.COUNTER)
+ cl.validateCounterForWrite(cfDef.cfm);
+ else
+ cl.validateForWrite(cfDef.cfm.ksName);
+ }
+
+ public Collection<RowMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local, ConsistencyLevel cl)
throws RequestExecutionException, RequestValidationException
{
// keys
@@ -90,7 +99,7 @@ public class DeleteStatement extends ModificationStatement
}
}
- Map<ByteBuffer, ColumnGroupMap> rows = needsReading ? readRows(keys, builder, (CompositeType)cfDef.cfm.comparator, local) : null;
+ Map<ByteBuffer, ColumnGroupMap> rows = needsReading ? readRows(keys, builder, (CompositeType)cfDef.cfm.comparator, local, cl) : null;
Collection<RowMutation> rowMutations = new ArrayList<RowMutation>(keys.size());
UpdateParameters params = new UpdateParameters(variables, getTimestamp(clientState), -1);
@@ -228,10 +237,9 @@ public class DeleteStatement extends ModificationStatement
public String toString()
{
- return String.format("DeleteStatement(name=%s, columns=%s, consistency=%s keys=%s)",
+ return String.format("DeleteStatement(name=%s, columns=%s, keys=%s)",
cfName,
columns,
- getConsistencyLevel(),
whereClause);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 9f335a7..6e92ec2 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -51,19 +51,17 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt
protected Type type;
- private final ConsistencyLevel cLevel;
private Long timestamp;
private final int timeToLive;
public ModificationStatement(CFName name, Attributes attrs)
{
- this(name, attrs.cLevel, attrs.timestamp, attrs.timeToLive);
+ this(name, attrs.timestamp, attrs.timeToLive);
}
- public ModificationStatement(CFName name, ConsistencyLevel cLevel, Long timestamp, int timeToLive)
+ public ModificationStatement(CFName name, Long timestamp, int timeToLive)
{
super(name);
- this.cLevel = cLevel;
this.timestamp = timestamp;
this.timeToLive = timeToLive;
}
@@ -80,14 +78,18 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt
if (timeToLive > ExpiringColumn.MAX_TTL)
throw new InvalidRequestException(String.format("ttl is too large. requested (%d) maximum (%d)", timeToLive, ExpiringColumn.MAX_TTL));
-
- getConsistencyLevel().validateForWrite(keyspace());
}
- public ResultMessage execute(ClientState state, List<ByteBuffer> variables) throws RequestExecutionException, RequestValidationException
+ protected abstract void validateConsistency(ConsistencyLevel cl) throws InvalidRequestException;
+
+ public ResultMessage execute(ConsistencyLevel cl, ClientState state, List<ByteBuffer> variables) throws RequestExecutionException, RequestValidationException
{
- Collection<? extends IMutation> mutations = getMutations(state, variables, false);
- ConsistencyLevel cl = getConsistencyLevel();
+ if (cl == null)
+ throw new InvalidRequestException("Invalid empty consistency level");
+
+ validateConsistency(cl);
+
+ Collection<? extends IMutation> mutations = getMutations(state, variables, false, cl);
// The type should have been set by now or we have a bug
assert type != null;
@@ -111,33 +113,13 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt
return null;
}
-
public ResultMessage executeInternal(ClientState state) throws RequestValidationException, RequestExecutionException
{
- for (IMutation mutation : getMutations(state, Collections.<ByteBuffer>emptyList(), true))
+ for (IMutation mutation : getMutations(state, Collections.<ByteBuffer>emptyList(), true, null))
mutation.apply();
return null;
}
- public ConsistencyLevel getConsistencyLevel()
- {
- if (cLevel != null)
- return cLevel;
-
- CFMetaData cfm = Schema.instance.getCFMetaData(keyspace(), columnFamily());
- return cfm == null ? ConsistencyLevel.ONE : cfm.getWriteConsistencyLevel();
- }
-
- /**
- * True if an explicit consistency level was parsed from the statement.
- *
- * @return true if a consistency was parsed, false otherwise.
- */
- public boolean isSetConsistencyLevel()
- {
- return cLevel != null;
- }
-
public long getTimestamp(ClientState clientState)
{
return timestamp == null ? clientState.getTimestamp() : timestamp;
@@ -158,9 +140,18 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt
return timeToLive;
}
- protected Map<ByteBuffer, ColumnGroupMap> readRows(List<ByteBuffer> keys, ColumnNameBuilder builder, CompositeType composite, boolean local)
+ protected Map<ByteBuffer, ColumnGroupMap> readRows(List<ByteBuffer> keys, ColumnNameBuilder builder, CompositeType composite, boolean local, ConsistencyLevel cl)
throws RequestExecutionException, RequestValidationException
{
+ try
+ {
+ cl.validateForRead(keyspace());
+ }
+ catch (InvalidRequestException e)
+ {
+ throw new InvalidRequestException(String.format("Write operation require a read but consistency %s is not supported on reads", cl));
+ }
+
List<ReadCommand> commands = new ArrayList<ReadCommand>(keys.size());
for (ByteBuffer key : keys)
{
@@ -177,7 +168,7 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt
{
List<Row> rows = local
? SelectStatement.readLocally(keyspace(), commands)
- : StorageProxy.read(commands, getConsistencyLevel());
+ : StorageProxy.read(commands, cl);
Map<ByteBuffer, ColumnGroupMap> map = new HashMap<ByteBuffer, ColumnGroupMap>();
for (Row row : rows)
@@ -212,7 +203,7 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt
* @return list of the mutations
* @throws InvalidRequestException on invalid requests
*/
- protected abstract Collection<? extends IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local)
+ protected abstract Collection<? extends IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local, ConsistencyLevel cl)
throws RequestExecutionException, RequestValidationException;
public abstract ParsedStatement.Prepared prepare(CFDefinition.Name[] boundNames) throws InvalidRequestException;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/cql3/statements/PermissionAlteringStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/PermissionAlteringStatement.java b/src/java/org/apache/cassandra/cql3/statements/PermissionAlteringStatement.java
index d3d1c9f..48c2d03 100644
--- a/src/java/org/apache/cassandra/cql3/statements/PermissionAlteringStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/PermissionAlteringStatement.java
@@ -17,7 +17,12 @@
*/
package org.apache.cassandra.cql3.statements;
+import java.nio.ByteBuffer;
+import java.util.List;
+
import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.transport.messages.ResultMessage;
@@ -40,6 +45,13 @@ public abstract class PermissionAlteringStatement extends ParsedStatement implem
public void validate(ClientState state)
{}
+ public ResultMessage execute(ConsistencyLevel cl, ClientState state, List<ByteBuffer> variables) throws UnauthorizedException, InvalidRequestException
+ {
+ return execute(state, variables);
+ }
+
+ public abstract ResultMessage execute(ClientState state, List<ByteBuffer> variables) throws UnauthorizedException, InvalidRequestException;
+
public ResultMessage executeInternal(ClientState state)
{
// executeInternal is for local query only, thus altering permission doesn't make sense and is not supported
http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
index e9d4450..ef0f0a1 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
@@ -21,18 +21,15 @@ import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
-import org.apache.cassandra.cql3.CQLStatement;
-import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.cql3.CFName;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.exceptions.RequestValidationException;
-import org.apache.cassandra.exceptions.UnauthorizedException;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.transport.messages.ResultMessage;
import com.google.common.base.Predicates;
import com.google.common.collect.Maps;
@@ -77,7 +74,7 @@ public abstract class SchemaAlteringStatement extends CFStatement implements CQL
public void validate(ClientState state) throws RequestValidationException
{}
- public ResultMessage execute(ClientState state, List<ByteBuffer> variables) throws RequestValidationException
+ public ResultMessage execute(ConsistencyLevel cl, ClientState state, List<ByteBuffer> variables) throws RequestValidationException
{
announceMigration();
String tableName = cfName == null || columnFamily() == null ? "" : columnFamily();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index ec50ef9..226d004 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -118,13 +118,18 @@ public class SelectStatement implements CQLStatement
// Nothing to do, all validation has been done by RawStatement.prepare()
}
- public ResultMessage.Rows execute(ClientState state, List<ByteBuffer> variables) throws RequestExecutionException, RequestValidationException
+ public ResultMessage.Rows execute(ConsistencyLevel cl, ClientState state, List<ByteBuffer> variables) throws RequestExecutionException, RequestValidationException
{
+ if (cl == null)
+ throw new InvalidRequestException("Invalid empty consistency level");
+
+ cl.validateForRead(keyspace());
+
try
{
List<Row> rows = isKeyRange
- ? StorageProxy.getRangeSlice(getRangeCommand(variables), getConsistencyLevel())
- : StorageProxy.read(getSliceCommands(variables), getConsistencyLevel());
+ ? StorageProxy.getRangeSlice(getRangeCommand(variables), cl)
+ : StorageProxy.read(getSliceCommands(variables), cl);
return processResults(rows, variables);
}
@@ -322,11 +327,6 @@ public class SelectStatement implements CQLStatement
return sliceRestriction != null && !sliceRestriction.isInclusive(Bound.START) ? parameters.limit + 1 : parameters.limit;
}
- private ConsistencyLevel getConsistencyLevel()
- {
- return parameters.consistencyLevel == null ? cfDef.cfm.getReadConsistencyLevel() : parameters.consistencyLevel;
- }
-
private Collection<ByteBuffer> getKeys(final List<ByteBuffer> variables) throws InvalidRequestException
{
List<ByteBuffer> keys = new ArrayList<ByteBuffer>();
@@ -971,8 +971,6 @@ public class SelectStatement implements CQLStatement
public ParsedStatement.Prepared prepare() throws InvalidRequestException
{
CFMetaData cfm = ThriftValidation.validateColumnFamily(keyspace(), columnFamily());
- if (parameters.consistencyLevel != null)
- parameters.consistencyLevel.validateForRead(keyspace());
if (parameters.limit <= 0)
throw new InvalidRequestException("LIMIT must be strictly positive");
@@ -1280,12 +1278,11 @@ public class SelectStatement implements CQLStatement
@Override
public String toString()
{
- return String.format("SelectRawStatement[name=%s, selectClause=%s, whereClause=%s, isCount=%s, cLevel=%s, limit=%s]",
+ return String.format("SelectRawStatement[name=%s, selectClause=%s, whereClause=%s, isCount=%s, limit=%s]",
cfName,
selectClause,
whereClause,
parameters.isCount,
- parameters.consistencyLevel,
parameters.limit);
}
}
@@ -1428,13 +1425,11 @@ public class SelectStatement implements CQLStatement
public static class Parameters
{
private final int limit;
- private final ConsistencyLevel consistencyLevel;
private final Map<ColumnIdentifier, Boolean> orderings;
private final boolean isCount;
- public Parameters(ConsistencyLevel consistency, int limit, Map<ColumnIdentifier, Boolean> orderings, boolean isCount)
+ public Parameters(int limit, Map<ColumnIdentifier, Boolean> orderings, boolean isCount)
{
- this.consistencyLevel = consistency;
this.limit = limit;
this.orderings = orderings;
this.isCount = isCount;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
index e709a06..af4b4e0 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeoutException;
import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.service.ClientState;
@@ -52,7 +53,7 @@ public class TruncateStatement extends CFStatement implements CQLStatement
ThriftValidation.validateColumnFamily(keyspace(), columnFamily());
}
- public ResultMessage execute(ClientState state, List<ByteBuffer> variables) throws InvalidRequestException, TruncateException
+ public ResultMessage execute(ConsistencyLevel cl, ClientState state, List<ByteBuffer> variables) throws InvalidRequestException, TruncateException
{
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index c3401b2..2065f32 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -96,9 +96,16 @@ public class UpdateStatement extends ModificationStatement
this.columns = null;
}
+ protected void validateConsistency(ConsistencyLevel cl) throws InvalidRequestException
+ {
+ if (type == Type.COUNTER)
+ cl.validateCounterForWrite(cfDef.cfm);
+ else
+ cl.validateForWrite(cfDef.cfm.ksName);
+ }
/** {@inheritDoc} */
- public Collection<IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local)
+ public Collection<IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local, ConsistencyLevel cl)
throws RequestExecutionException, RequestValidationException
{
List<ByteBuffer> keys = buildKeyNames(cfDef, processedKeys, variables);
@@ -125,13 +132,13 @@ public class UpdateStatement extends ModificationStatement
}
}
- Map<ByteBuffer, ColumnGroupMap> rows = needsReading ? readRows(keys, builder, (CompositeType)cfDef.cfm.comparator, local) : null;
+ Map<ByteBuffer, ColumnGroupMap> rows = needsReading ? readRows(keys, builder, (CompositeType)cfDef.cfm.comparator, local, cl) : null;
Collection<IMutation> mutations = new LinkedList<IMutation>();
UpdateParameters params = new UpdateParameters(variables, getTimestamp(clientState), getTimeToLive());
for (ByteBuffer key: keys)
- mutations.add(mutationForKey(cfDef, key, builder, params, rows == null ? null : rows.get(key)));
+ mutations.add(mutationForKey(cfDef, key, builder, params, rows == null ? null : rows.get(key), cl));
return mutations;
}
@@ -196,7 +203,7 @@ public class UpdateStatement extends ModificationStatement
*
* @throws InvalidRequestException on the wrong request
*/
- private IMutation mutationForKey(CFDefinition cfDef, ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params, ColumnGroupMap group)
+ private IMutation mutationForKey(CFDefinition cfDef, ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params, ColumnGroupMap group, ConsistencyLevel cl)
throws InvalidRequestException
{
validateKey(key);
@@ -252,7 +259,7 @@ public class UpdateStatement extends ModificationStatement
}
}
- return (hasCounterColumn) ? new CounterMutation(rm, getConsistencyLevel()) : rm;
+ return (hasCounterColumn) ? new CounterMutation(rm, cl) : rm;
}
private boolean addToMutation(ColumnFamily cf,
@@ -319,9 +326,6 @@ public class UpdateStatement extends ModificationStatement
// Deal here with the keyspace overwrite thingy to avoid mistake
CFMetaData metadata = validateColumnFamily(keyspace(), columnFamily(), type == Type.COUNTER);
- if (type == Type.COUNTER)
- getConsistencyLevel().validateCounterForWrite(metadata);
-
cfDef = metadata.getCfDef();
if (columns == null)
@@ -440,11 +444,10 @@ public class UpdateStatement extends ModificationStatement
public String toString()
{
- return String.format("UpdateStatement(name=%s, keys=%s, columns=%s, consistency=%s, timestamp=%s, timeToLive=%s)",
+ return String.format("UpdateStatement(name=%s, keys=%s, columns=%s, timestamp=%s, timeToLive=%s)",
cfName,
whereClause,
columns,
- getConsistencyLevel(),
isSetTimestamp() ? getTimestamp(null) : "<now>",
getTimeToLive());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
index 70e95f6..c381978 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
import java.util.List;
import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.service.ClientState;
@@ -48,7 +49,7 @@ public class UseStatement extends ParsedStatement implements CQLStatement
{
}
- public ResultMessage execute(ClientState state, List<ByteBuffer> variables) throws InvalidRequestException
+ public ResultMessage execute(ConsistencyLevel cl, ClientState state, List<ByteBuffer> variables) throws InvalidRequestException
{
state.setKeyspace(keyspace);
return new ResultMessage.SetKeyspace(keyspace);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 6a5ef45..16b953f 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -1651,11 +1651,41 @@ public class CassandraServer implements Cassandra.Iface
logger.debug("execute_cql_query");
}
- ClientState cState = state();
- if (cState.getCQLVersion().major == 2)
- return QueryProcessor.process(queryString, state());
+ return QueryProcessor.process(queryString, state());
+ }
+ catch (RequestExecutionException e)
+ {
+ ThriftConversion.rethrow(e);
+ return null;
+ }
+ catch (RequestValidationException e)
+ {
+ throw ThriftConversion.toThrift(e);
+ }
+ finally
+ {
+ Tracing.instance().stopSession();
+ }
+ }
+
+ public CqlResult execute_cql3_query(ByteBuffer query, Compression compression, ConsistencyLevel cLevel)
+ throws InvalidRequestException, UnavailableException, TimedOutException, SchemaDisagreementException, TException
+ {
+ try
+ {
+ String queryString = uncompress(query, compression);
+ if (startSessionIfRequested())
+ {
+ Tracing.instance().begin("execute_cql3_query",
+ ImmutableMap.of("query", queryString));
+ }
else
- return org.apache.cassandra.cql3.QueryProcessor.process(queryString, cState).toThriftResult();
+ {
+ logger.debug("execute_cql3_query");
+ }
+
+ ClientState cState = state();
+ return org.apache.cassandra.cql3.QueryProcessor.process(queryString, ThriftConversion.fromThrift(cLevel), cState).toThriftResult();
}
catch (RequestExecutionException e)
{
@@ -1680,13 +1710,27 @@ public class CassandraServer implements Cassandra.Iface
try
{
+ ClientState cState = state();
String queryString = uncompress(query,compression);
+ return QueryProcessor.prepare(queryString, cState);
+ }
+ catch (RequestValidationException e)
+ {
+ throw ThriftConversion.toThrift(e);
+ }
+ }
+ public CqlPreparedResult prepare_cql3_query(ByteBuffer query, Compression compression)
+ throws InvalidRequestException, TException
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("prepare_cql3_query");
+
+ try
+ {
ClientState cState = state();
- if (cState.getCQLVersion().major == 2)
- return QueryProcessor.prepare(queryString, cState);
- else
- return org.apache.cassandra.cql3.QueryProcessor.prepare(queryString, cState, true).toThriftPreparedResult();
+ String queryString = uncompress(query,compression);
+ return org.apache.cassandra.cql3.QueryProcessor.prepare(queryString, cState, true).toThriftPreparedResult();
}
catch (RequestValidationException e)
{
@@ -1710,30 +1754,13 @@ public class CassandraServer implements Cassandra.Iface
try
{
ClientState cState = state();
- if (cState.getCQLVersion().major == 2)
- {
- CQLStatement statement = cState.getPrepared().get(itemId);
+ CQLStatement statement = cState.getPrepared().get(itemId);
- if (statement == null)
- throw new InvalidRequestException(String.format("Prepared query with ID %d not found", itemId));
- logger.trace("Retrieved prepared statement #{} with {} bind markers", itemId, statement.boundTerms);
+ if (statement == null)
+ throw new InvalidRequestException(String.format("Prepared query with ID %d not found", itemId));
+ logger.trace("Retrieved prepared statement #{} with {} bind markers", itemId, statement.boundTerms);
- return QueryProcessor.processPrepared(statement, cState, bindVariables);
- }
- else
- {
- org.apache.cassandra.cql3.CQLStatement statement = org.apache.cassandra.cql3.QueryProcessor.getPrepared(itemId);
-
- if (statement == null)
- throw new InvalidRequestException(String.format("Prepared query with ID %d not found" +
- " (either the query was not prepared on this host (maybe the host has been restarted?)" +
- " or you have prepared more than %d queries and queries %d has been evicted from the internal cache)",
- itemId, org.apache.cassandra.cql3.QueryProcessor.MAX_CACHE_PREPARED, itemId));
- logger.trace("Retrieved prepared statement #{} with {} bind markers", itemId,
- statement.getBoundsTerms());
-
- return org.apache.cassandra.cql3.QueryProcessor.processPrepared(statement, cState, bindVariables).toThriftResult();
- }
+ return QueryProcessor.processPrepared(statement, cState, bindVariables);
}
catch (RequestExecutionException e)
{
@@ -1750,18 +1777,51 @@ public class CassandraServer implements Cassandra.Iface
}
}
- public void set_cql_version(String version) throws InvalidRequestException
+ public CqlResult execute_prepared_cql3_query(int itemId, List<ByteBuffer> bindVariables, ConsistencyLevel cLevel)
+ throws InvalidRequestException, UnavailableException, TimedOutException, SchemaDisagreementException, TException
{
- logger.debug("set_cql_version: " + version);
+ if (startSessionIfRequested())
+ {
+ // TODO we don't have [typed] access to CQL bind variables here. CASSANDRA-4560 is open to add support.
+ Tracing.instance().begin("execute_prepared_cql3_query", Collections.<String, String>emptyMap());
+ }
+ else
+ {
+ logger.debug("execute_prepared_cql3_query");
+ }
try
{
- state().setCQLVersion(version);
+ ClientState cState = state();
+ org.apache.cassandra.cql3.CQLStatement statement = org.apache.cassandra.cql3.QueryProcessor.getPrepared(itemId);
+
+ if (statement == null)
+ throw new InvalidRequestException(String.format("Prepared query with ID %d not found" +
+ " (either the query was not prepared on this host (maybe the host has been restarted?)" +
+ " or you have prepared more than %d queries and queries %d has been evicted from the internal cache)",
+ itemId, org.apache.cassandra.cql3.QueryProcessor.MAX_CACHE_PREPARED, itemId));
+ logger.trace("Retrieved prepared statement #{} with {} bind markers", itemId, statement.getBoundsTerms());
+
+ return org.apache.cassandra.cql3.QueryProcessor.processPrepared(statement, ThriftConversion.fromThrift(cLevel), cState, bindVariables).toThriftResult();
+ }
+ catch (RequestExecutionException e)
+ {
+ ThriftConversion.rethrow(e);
+ return null;
}
catch (RequestValidationException e)
{
throw ThriftConversion.toThrift(e);
}
+ finally
+ {
+ Tracing.instance().stopSession();
+ }
+ }
+
+ public void set_cql_version(String version) throws InvalidRequestException
+ {
+ // Deprecated, no-op
}
public ByteBuffer trace_next_query() throws TException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/transport/CBUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java
index fe8863a..45affc6 100644
--- a/src/java/org/apache/cassandra/transport/CBUtil.java
+++ b/src/java/org/apache/cassandra/transport/CBUtil.java
@@ -31,6 +31,8 @@ import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.util.CharsetUtil;
+import org.apache.cassandra.db.ConsistencyLevel;
+
/**
* ChannelBuffer utility methods.
* Note that contrarily to ByteBufferUtil, these method do "read" the
@@ -131,6 +133,29 @@ public abstract class CBUtil
}
}
+ public static ChannelBuffer consistencyLevelToCB(ConsistencyLevel consistency)
+ {
+ if (consistency == null)
+ return shortToCB(0);
+ else
+ return stringToCB(consistency.toString());
+ }
+
+ public static ConsistencyLevel readConsistencyLevel(ChannelBuffer cb)
+ {
+ String cl = CBUtil.readString(cb);
+ try
+ {
+ if (cl.isEmpty())
+ return null;
+ return Enum.valueOf(ConsistencyLevel.class, cl.toUpperCase());
+ }
+ catch (IllegalArgumentException e)
+ {
+ throw new ProtocolException("Unknown consistency level: " + cl);
+ }
+ }
+
public static ChannelBuffer longStringToCB(String str)
{
ChannelBuffer bytes = bytes(str);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/transport/Client.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Client.java b/src/java/org/apache/cassandra/transport/Client.java
index 3b4ace9..46a3297 100644
--- a/src/java/org/apache/cassandra/transport/Client.java
+++ b/src/java/org/apache/cassandra/transport/Client.java
@@ -25,8 +25,9 @@ import java.util.*;
import com.google.common.base.Splitter;
-import org.apache.cassandra.transport.messages.*;
import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.transport.messages.*;
import org.apache.cassandra.utils.Hex;
public class Client extends SimpleClient
@@ -99,7 +100,7 @@ public class Client extends SimpleClient
else if (msgType.equals("QUERY"))
{
String query = line.substring(6);
- return new QueryMessage(query);
+ return new QueryMessage(query, ConsistencyLevel.ONE);
}
else if (msgType.equals("PREPARE"))
{
@@ -127,7 +128,7 @@ public class Client extends SimpleClient
}
values.add(bb);
}
- return new ExecuteMessage(id, values);
+ return new ExecuteMessage(id, values, ConsistencyLevel.ONE);
}
catch (Exception e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/transport/SimpleClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java
index 8132e65..1d67317 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -36,6 +36,7 @@ import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.logging.Slf4JLoggerFactory;
+import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.transport.messages.*;
import org.apache.cassandra.service.ClientState;
@@ -112,9 +113,9 @@ public class SimpleClient
execute(msg);
}
- public ResultMessage execute(String query)
+ public ResultMessage execute(String query, ConsistencyLevel consistency)
{
- Message.Response msg = execute(new QueryMessage(query));
+ Message.Response msg = execute(new QueryMessage(query, consistency));
assert msg instanceof ResultMessage;
return (ResultMessage)msg;
}
@@ -126,9 +127,9 @@ public class SimpleClient
return (ResultMessage.Prepared)msg;
}
- public ResultMessage executePrepared(byte[] statementId, List<ByteBuffer> values)
+ public ResultMessage executePrepared(byte[] statementId, List<ByteBuffer> values, ConsistencyLevel consistency)
{
- Message.Response msg = execute(new ExecuteMessage(statementId, values));
+ Message.Response msg = execute(new ExecuteMessage(statementId, values, consistency));
assert msg instanceof ResultMessage;
return (ResultMessage)msg;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
index 73a965f..b42ba62 100644
--- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
@@ -61,7 +61,7 @@ public class ErrorMessage extends Message.Response
break;
case UNAVAILABLE:
{
- ConsistencyLevel cl = Enum.valueOf(ConsistencyLevel.class, CBUtil.readString(body));
+ ConsistencyLevel cl = CBUtil.readConsistencyLevel(body);
int required = body.readInt();
int alive = body.readInt();
te = new UnavailableException(cl, required, alive);
@@ -132,10 +132,8 @@ public class ErrorMessage extends Message.Response
{
case UNAVAILABLE:
UnavailableException ue = (UnavailableException)msg.error;
- ByteBuffer ueCl = ByteBufferUtil.bytes(ue.consistency.toString());
-
- acb = ChannelBuffers.buffer(2 + ueCl.remaining() + 8);
- acb.writeShort((short)ueCl.remaining());
+ ChannelBuffer ueCl = CBUtil.consistencyLevelToCB(ue.consistency);
+ acb = ChannelBuffers.buffer(ueCl.readableBytes() + 8);
acb.writeBytes(ueCl);
acb.writeInt(ue.required);
acb.writeInt(ue.alive);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
index 4400d12..842fb22 100644
--- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
@@ -25,6 +25,7 @@ import org.jboss.netty.buffer.ChannelBuffer;
import org.apache.cassandra.cql3.CQLStatement;
import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.exceptions.PreparedQueryNotFoundException;
import org.apache.cassandra.transport.*;
import org.apache.cassandra.utils.MD5Digest;
@@ -42,7 +43,8 @@ public class ExecuteMessage extends Message.Request
for (int i = 0; i < count; i++)
values.add(CBUtil.readValue(body));
- return new ExecuteMessage(id, values);
+ ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body);
+ return new ExecuteMessage(id, values, consistency);
}
public ChannelBuffer encode(ExecuteMessage msg)
@@ -53,7 +55,7 @@ public class ExecuteMessage extends Message.Request
// - The values
// - options
int vs = msg.values.size();
- CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(2, 0, vs);
+ CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(3, 0, vs);
builder.add(CBUtil.bytesToCB(msg.statementId.bytes));
builder.add(CBUtil.shortToCB(vs));
@@ -61,23 +63,26 @@ public class ExecuteMessage extends Message.Request
for (ByteBuffer value : msg.values)
builder.addValue(value);
+ builder.add(CBUtil.consistencyLevelToCB(msg.consistency));
return builder.build();
}
};
public final MD5Digest statementId;
public final List<ByteBuffer> values;
+ public final ConsistencyLevel consistency;
- public ExecuteMessage(byte[] statementId, List<ByteBuffer> values)
+ public ExecuteMessage(byte[] statementId, List<ByteBuffer> values, ConsistencyLevel consistency)
{
- this(MD5Digest.wrap(statementId), values);
+ this(MD5Digest.wrap(statementId), values, consistency);
}
- public ExecuteMessage(MD5Digest statementId, List<ByteBuffer> values)
+ public ExecuteMessage(MD5Digest statementId, List<ByteBuffer> values, ConsistencyLevel consistency)
{
super(Message.Type.EXECUTE);
this.statementId = statementId;
this.values = values;
+ this.consistency = consistency;
}
public ChannelBuffer encode()
@@ -95,7 +100,7 @@ public class ExecuteMessage extends Message.Request
if (statement == null)
throw new PreparedQueryNotFoundException(statementId);
- return QueryProcessor.processPrepared(statement, c.clientState(), values);
+ return QueryProcessor.processPrepared(statement, consistency, c.clientState(), values);
}
catch (Exception e)
{
@@ -106,6 +111,6 @@ public class ExecuteMessage extends Message.Request
@Override
public String toString()
{
- return "EXECUTE " + statementId + " with " + values.size() + " values";
+ return "EXECUTE " + statementId + " with " + values.size() + " values at consistency " + consistency;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
index 0880ee0..5223528 100644
--- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
@@ -18,8 +18,10 @@
package org.apache.cassandra.transport.messages;
import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.transport.*;
@@ -33,21 +35,25 @@ public class QueryMessage extends Message.Request
public QueryMessage decode(ChannelBuffer body)
{
String query = CBUtil.readLongString(body);
- return new QueryMessage(query);
+ ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body);
+ return new QueryMessage(query, consistency);
}
public ChannelBuffer encode(QueryMessage msg)
{
- return CBUtil.longStringToCB(msg.query);
+
+ return ChannelBuffers.wrappedBuffer(CBUtil.longStringToCB(msg.query), CBUtil.consistencyLevelToCB(msg.consistency));
}
};
public final String query;
+ public final ConsistencyLevel consistency;
- public QueryMessage(String query)
+ public QueryMessage(String query, ConsistencyLevel consistency)
{
super(Message.Type.QUERY);
this.query = query;
+ this.consistency = consistency;
}
public ChannelBuffer encode()
@@ -59,7 +65,7 @@ public class QueryMessage extends Message.Request
{
try
{
- return QueryProcessor.process(query, ((ServerConnection)connection).clientState());
+ return QueryProcessor.process(query, consistency, ((ServerConnection)connection).clientState());
}
catch (Exception e)
{