You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2014/05/22 14:47:30 UTC
[1/4] git commit: Backport first patch of 6975
Repository: cassandra
Updated Branches:
refs/heads/trunk d4bf6d328 -> 5f643ffcc
Backport first patch of 6975
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/362e5480
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/362e5480
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/362e5480
Branch: refs/heads/trunk
Commit: 362e54803434053fea25f874f64c69bdc1db78da
Parents: 2635632
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed May 14 14:25:29 2014 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu May 22 14:41:53 2014 +0200
----------------------------------------------------------------------
.../org/apache/cassandra/cql3/CQLStatement.java | 2 +-
.../apache/cassandra/cql3/QueryProcessor.java | 2 +-
.../statements/AuthenticationStatement.java | 2 +-
.../cql3/statements/AuthorizationStatement.java | 2 +-
.../cql3/statements/BatchStatement.java | 4 +-
.../cql3/statements/ModificationStatement.java | 4 +-
.../statements/SchemaAlteringStatement.java | 2 +-
.../cql3/statements/SelectStatement.java | 47 ++++++++++----------
.../cql3/statements/TruncateStatement.java | 2 +-
.../cassandra/cql3/statements/UseStatement.java | 2 +-
10 files changed, 34 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362e5480/src/java/org/apache/cassandra/cql3/CQLStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CQLStatement.java b/src/java/org/apache/cassandra/cql3/CQLStatement.java
index 81cd2b2..a1642ef 100644
--- a/src/java/org/apache/cassandra/cql3/CQLStatement.java
+++ b/src/java/org/apache/cassandra/cql3/CQLStatement.java
@@ -57,5 +57,5 @@ public interface CQLStatement
*
* @param state the current query state
*/
- public ResultMessage executeInternal(QueryState state) throws RequestValidationException, RequestExecutionException;
+ public ResultMessage executeInternal(QueryState state, QueryOptions options) throws RequestValidationException, RequestExecutionException;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362e5480/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 15ee59f..30d1bd7 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -205,7 +205,7 @@ public class QueryProcessor implements QueryHandler
state.setKeyspace(Keyspace.SYSTEM_KS);
CQLStatement statement = getStatement(query, state).statement;
statement.validate(state);
- ResultMessage result = statement.executeInternal(qState);
+ ResultMessage result = statement.executeInternal(qState, QueryOptions.DEFAULT);
if (result instanceof ResultMessage.Rows)
return new UntypedResultSet(((ResultMessage.Rows)result).result);
else
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362e5480/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java b/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
index 5fcf085..b47dd92 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
@@ -45,7 +45,7 @@ public abstract class AuthenticationStatement extends ParsedStatement implements
public abstract ResultMessage execute(ClientState state) throws RequestExecutionException, RequestValidationException;
- public ResultMessage executeInternal(QueryState state)
+ public ResultMessage executeInternal(QueryState state, QueryOptions options)
{
// executeInternal is for local query only, thus altering users doesn't make sense and is not supported
throw new UnsupportedOperationException();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362e5480/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java b/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
index db4581e..2c7f2cb 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
@@ -47,7 +47,7 @@ public abstract class AuthorizationStatement extends ParsedStatement implements
public abstract ResultMessage execute(ClientState state) throws RequestValidationException, RequestExecutionException;
- public ResultMessage executeInternal(QueryState state)
+ public ResultMessage executeInternal(QueryState state, QueryOptions options)
{
// executeInternal is for local query only, thus altering permission doesn't make sense and is not supported
throw new UnsupportedOperationException();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362e5480/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 6a1201b..875e41c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -327,11 +327,11 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
return new ResultMessage.Rows(ModificationStatement.buildCasResultSet(ksName, key, cfName, result, columnsWithConditions, true));
}
- public ResultMessage executeInternal(QueryState queryState) throws RequestValidationException, RequestExecutionException
+ public ResultMessage executeInternal(QueryState queryState, QueryOptions options) throws RequestValidationException, RequestExecutionException
{
assert !hasConditions;
- for (IMutation mutation : getMutations(new PreparedBatchVariables(Collections.<ByteBuffer>emptyList()), true, null, queryState.getTimestamp()))
+ for (IMutation mutation : getMutations(new PreparedBatchVariables(options.getValues()), true, null, queryState.getTimestamp()))
mutation.apply();
return null;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362e5480/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 448722e..b1e4561 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -671,12 +671,12 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
return builder.build();
}
- public ResultMessage executeInternal(QueryState queryState) throws RequestValidationException, RequestExecutionException
+ public ResultMessage executeInternal(QueryState queryState, QueryOptions options) throws RequestValidationException, RequestExecutionException
{
if (hasConditions())
throw new UnsupportedOperationException();
- for (IMutation mutation : getMutations(Collections.<ByteBuffer>emptyList(), true, null, queryState.getTimestamp()))
+ for (IMutation mutation : getMutations(options.getValues(), true, null, queryState.getTimestamp()))
mutation.apply();
return null;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362e5480/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
index 337e8dc..94df854 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
@@ -73,7 +73,7 @@ public abstract class SchemaAlteringStatement extends CFStatement implements CQL
return new ResultMessage.SchemaChange(changeType(), keyspace(), tableName);
}
- public ResultMessage executeInternal(QueryState state)
+ public ResultMessage executeInternal(QueryState state, QueryOptions options)
{
// executeInternal is for local query only, thus altering schema is not supported
throw new UnsupportedOperationException();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362e5480/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 6b4309f..92bc99c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -194,18 +194,8 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
cl.validateForRead(keyspace());
int limit = getLimit(variables);
- int limitForQuery = updateLimitForQuery(limit);
long now = System.currentTimeMillis();
- Pageable command;
- if (isKeyRange || usesSecondaryIndexing)
- {
- command = getRangeCommand(variables, limitForQuery, now);
- }
- else
- {
- List<ReadCommand> commands = getSliceCommands(variables, limitForQuery, now);
- command = commands == null ? null : new Pageable.ReadCommands(commands);
- }
+ Pageable command = getPageableCommand(options, limit, now);
int pageSize = options.getPageSize();
// A count query will never be paged for the user, but we always page it internally to avoid OOM.
@@ -237,6 +227,21 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
}
}
+ private Pageable getPageableCommand(QueryOptions options, int limit, long now) throws RequestValidationException
+ {
+ int limitForQuery = updateLimitForQuery(limit);
+ if (isKeyRange || usesSecondaryIndexing)
+ return getRangeCommand(options.getValues(), limitForQuery, now);
+
+ List<ReadCommand> commands = getSliceCommands(options.getValues(), limitForQuery, now);
+ return commands == null ? null : new Pageable.ReadCommands(commands);
+ }
+
+ public Pageable getPageableCommand(QueryOptions options) throws RequestValidationException
+ {
+ return getPageableCommand(options, getLimit(options.getValues()), System.currentTimeMillis());
+ }
+
private ResultMessage.Rows execute(Pageable command, ConsistencyLevel cl, List<ByteBuffer> variables, int limit, long now) throws RequestValidationException, RequestExecutionException
{
List<Row> rows;
@@ -288,23 +293,17 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
return rows;
}
- public ResultMessage.Rows executeInternal(QueryState state) throws RequestExecutionException, RequestValidationException
+ public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException
{
List<ByteBuffer> variables = Collections.emptyList();
int limit = getLimit(variables);
- int limitForQuery = updateLimitForQuery(limit);
long now = System.currentTimeMillis();
- List<Row> rows;
- if (isKeyRange || usesSecondaryIndexing)
- {
- RangeSliceCommand command = getRangeCommand(variables, limitForQuery, now);
- rows = command == null ? Collections.<Row>emptyList() : command.executeLocally();
- }
- else
- {
- List<ReadCommand> commands = getSliceCommands(variables, limitForQuery, now);
- rows = commands == null ? Collections.<Row>emptyList() : readLocally(keyspace(), commands);
- }
+ Pageable command = getPageableCommand(options, limit, now);
+ List<Row> rows = command == null
+ ? Collections.<Row>emptyList()
+ : (command instanceof Pageable.ReadCommands
+ ? readLocally(keyspace(), ((Pageable.ReadCommands)command).commands)
+ : ((RangeSliceCommand)command).executeLocally());
return processResults(rows, variables, limit, now);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362e5480/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
index 30e57d5..ef1c4a4 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
@@ -77,7 +77,7 @@ public class TruncateStatement extends CFStatement implements CQLStatement
return null;
}
- public ResultMessage executeInternal(QueryState state)
+ public ResultMessage executeInternal(QueryState state, QueryOptions options)
{
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362e5480/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
index ee70f9d..efda72d 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
@@ -59,7 +59,7 @@ public class UseStatement extends ParsedStatement implements CQLStatement
return new ResultMessage.SetKeyspace(keyspace);
}
- public ResultMessage executeInternal(QueryState state)
+ public ResultMessage executeInternal(QueryState state, QueryOptions options)
{
// Internal queries are exclusively on the system keyspace and 'use' is thus useless
throw new UnsupportedOperationException();
[3/4] git commit: Use prepared statement internally
Posted by sl...@apache.org.
Use prepared statement internally
patch by slebresne; reviewed by mishail for CASSANDRA-6975
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1147ee3a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1147ee3a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1147ee3a
Branch: refs/heads/trunk
Commit: 1147ee3a81e483b26b4b8c5d7cc7e55fcc2baeec
Parents: c3ec8fa
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed May 14 14:25:29 2014 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu May 22 14:46:01 2014 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/cql3/CQLStatement.java | 2 +-
.../org/apache/cassandra/cql3/QueryOptions.java | 5 +
.../apache/cassandra/cql3/QueryProcessor.java | 103 +++++++-
.../apache/cassandra/cql3/UntypedResultSet.java | 58 ++++-
.../statements/AuthenticationStatement.java | 2 +-
.../cql3/statements/AuthorizationStatement.java | 2 +-
.../cql3/statements/BatchStatement.java | 4 +-
.../cql3/statements/ModificationStatement.java | 4 +-
.../statements/SchemaAlteringStatement.java | 2 +-
.../cql3/statements/SelectStatement.java | 48 ++--
.../cql3/statements/TruncateStatement.java | 2 +-
.../cassandra/cql3/statements/UseStatement.java | 2 +-
.../apache/cassandra/db/BatchlogManager.java | 27 +-
.../org/apache/cassandra/db/SystemKeyspace.java | 245 ++++++++-----------
.../ScheduledRangeTransferExecutorService.java | 8 +-
.../cassandra/service/StorageService.java | 43 ++--
.../cassandra/db/BatchlogManagerTest.java | 8 +-
.../apache/cassandra/db/HintedHandOffTest.java | 6 +-
.../unit/org/apache/cassandra/db/ScrubTest.java | 10 +-
.../db/compaction/CompactionsPurgeTest.java | 20 +-
.../io/sstable/CQLSSTableWriterTest.java | 2 +-
.../service/LeaveAndBootstrapTest.java | 4 +-
.../cassandra/service/QueryPagerTest.java | 4 +-
.../apache/cassandra/triggers/TriggersTest.java | 4 +-
25 files changed, 368 insertions(+), 248 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6b08fad..55fc400 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -21,6 +21,7 @@
* New serialization format for UDT values (CASSANDRA-7209, CASSANDRA-7261)
* Fix nodetool netstats (CASSANDRA-7270)
* Fix potential ClassCastException in HintedHandoffManager (CASSANDRA-7284)
+ * Use prepared statements internally (CASSANDRA-6975)
Merged from 2.0:
* Always reallocate buffers in HSHA (CASSANDRA-6285)
* (Hadoop) support authentication in CqlRecordReader (CASSANDRA-7221)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/src/java/org/apache/cassandra/cql3/CQLStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CQLStatement.java b/src/java/org/apache/cassandra/cql3/CQLStatement.java
index 81cd2b2..a1642ef 100644
--- a/src/java/org/apache/cassandra/cql3/CQLStatement.java
+++ b/src/java/org/apache/cassandra/cql3/CQLStatement.java
@@ -57,5 +57,5 @@ public interface CQLStatement
*
* @param state the current query state
*/
- public ResultMessage executeInternal(QueryState state) throws RequestValidationException, RequestExecutionException;
+ public ResultMessage executeInternal(QueryState state, QueryOptions options) throws RequestValidationException, RequestExecutionException;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/src/java/org/apache/cassandra/cql3/QueryOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryOptions.java b/src/java/org/apache/cassandra/cql3/QueryOptions.java
index 9c28762..369dce4 100644
--- a/src/java/org/apache/cassandra/cql3/QueryOptions.java
+++ b/src/java/org/apache/cassandra/cql3/QueryOptions.java
@@ -61,6 +61,11 @@ public abstract class QueryOptions
return new DefaultQueryOptions(consistency, values, false, SpecificOptions.DEFAULT, 3);
}
+ public static QueryOptions forInternalCalls(List<ByteBuffer> values)
+ {
+ return new DefaultQueryOptions(ConsistencyLevel.ONE, values, false, SpecificOptions.DEFAULT, 3);
+ }
+
public static QueryOptions fromPreV3Batch(ConsistencyLevel consistency)
{
return new DefaultQueryOptions(consistency, Collections.<ByteBuffer>emptyList(), false, SpecificOptions.DEFAULT, 2);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 40c45af..fca9c42 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.cql3;
import java.nio.ByteBuffer;
import java.util.*;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
import com.google.common.primitives.Ints;
@@ -32,9 +34,12 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.cql3.statements.*;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.composites.*;
+import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.pager.QueryPager;
+import org.apache.cassandra.service.pager.QueryPagers;
import org.apache.cassandra.thrift.ThriftClientState;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.transport.messages.ResultMessage;
@@ -74,6 +79,11 @@ public class QueryProcessor implements QueryHandler
private static final ConcurrentLinkedHashMap<MD5Digest, ParsedStatement.Prepared> preparedStatements;
private static final ConcurrentLinkedHashMap<Integer, CQLStatement> thriftPreparedStatements;
+ // A map for prepared statements used internally (which we don't want to mix with user statement, in particular we don't
+ // bother with expiration on those.
+ private static final ConcurrentMap<String, ParsedStatement.Prepared> internalStatements = new ConcurrentHashMap<>();
+ private static final QueryState internalQueryState;
+
static
{
preparedStatements = new ConcurrentLinkedHashMap.Builder<MD5Digest, ParsedStatement.Prepared>()
@@ -84,6 +94,17 @@ public class QueryProcessor implements QueryHandler
.maximumWeightedCapacity(MAX_CACHE_PREPARED_MEMORY)
.weigher(thriftMemoryUsageWeigher)
.build();
+
+ ClientState state = ClientState.forInternalCalls();
+ try
+ {
+ state.setKeyspace(Keyspace.SYSTEM_KS);
+ }
+ catch (InvalidRequestException e)
+ {
+ throw new RuntimeException();
+ }
+ internalQueryState = new QueryState(state);
}
private QueryProcessor()
@@ -190,16 +211,40 @@ public class QueryProcessor implements QueryHandler
}
}
- public static UntypedResultSet processInternal(String query)
+ private static QueryOptions makeInternalOptions(ParsedStatement.Prepared prepared, Object[] values)
+ {
+ if (prepared.boundNames.size() != values.length)
+ throw new IllegalArgumentException(String.format("Invalid number of values. Expecting %d but got %d", prepared.boundNames.size(), values.length));
+
+ List<ByteBuffer> boundValues = new ArrayList<ByteBuffer>(values.length);
+ for (int i = 0; i < values.length; i++)
+ {
+ Object value = values[i];
+ AbstractType type = prepared.boundNames.get(i).type;
+ boundValues.add(value instanceof ByteBuffer || value == null ? (ByteBuffer)value : type.decompose(value));
+ }
+ return QueryOptions.forInternalCalls(boundValues);
+ }
+
+ private static ParsedStatement.Prepared prepareInternal(String query) throws RequestValidationException
+ {
+ ParsedStatement.Prepared prepared = internalStatements.get(query);
+ if (prepared != null)
+ return prepared;
+
+ // Note: if 2 threads prepare the same query, we'll live so don't bother synchronizing
+ prepared = parseStatement(query, internalQueryState);
+ prepared.statement.validate(internalQueryState.getClientState());
+ internalStatements.putIfAbsent(query, prepared);
+ return prepared;
+ }
+
+ public static UntypedResultSet executeInternal(String query, Object... values)
{
try
{
- ClientState state = ClientState.forInternalCalls();
- QueryState qState = new QueryState(state);
- state.setKeyspace(Keyspace.SYSTEM_KS);
- CQLStatement statement = getStatement(query, state).statement;
- statement.validate(state);
- ResultMessage result = statement.executeInternal(qState);
+ ParsedStatement.Prepared prepared = prepareInternal(query);
+ ResultMessage result = prepared.statement.executeInternal(internalQueryState, makeInternalOptions(prepared, values));
if (result instanceof ResultMessage.Rows)
return UntypedResultSet.create(((ResultMessage.Rows)result).result);
else
@@ -215,6 +260,50 @@ public class QueryProcessor implements QueryHandler
}
}
+ public static UntypedResultSet executeInternalWithPaging(String query, int pageSize, Object... values)
+ {
+ try
+ {
+ ParsedStatement.Prepared prepared = prepareInternal(query);
+ if (!(prepared.statement instanceof SelectStatement))
+ throw new IllegalArgumentException("Only SELECTs can be paged");
+
+ SelectStatement select = (SelectStatement)prepared.statement;
+ QueryPager pager = QueryPagers.localPager(select.getPageableCommand(makeInternalOptions(prepared, values)));
+ return UntypedResultSet.create(select, pager, pageSize);
+ }
+ catch (RequestValidationException e)
+ {
+ throw new RuntimeException("Error validating query" + e);
+ }
+ }
+
+ /**
+ * Same than executeInternal, but to use for queries we know are only executed once so that the
+ * created statement object is not cached.
+ */
+ public static UntypedResultSet executeOnceInternal(String query, Object... values)
+ {
+ try
+ {
+ ParsedStatement.Prepared prepared = parseStatement(query, internalQueryState);
+ prepared.statement.validate(internalQueryState.getClientState());
+ ResultMessage result = prepared.statement.executeInternal(internalQueryState, makeInternalOptions(prepared, values));
+ if (result instanceof ResultMessage.Rows)
+ return UntypedResultSet.create(((ResultMessage.Rows)result).result);
+ else
+ return null;
+ }
+ catch (RequestExecutionException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (RequestValidationException e)
+ {
+ throw new RuntimeException("Error validating query " + query, e);
+ }
+ }
+
public static UntypedResultSet resultify(String query, Row row)
{
return resultify(query, Collections.singletonList(row));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java b/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
index 5519f2e..7e0f15a 100644
--- a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
@@ -24,8 +24,10 @@ import java.util.*;
import com.google.common.collect.AbstractIterator;
+import org.apache.cassandra.cql3.statements.SelectStatement;
import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.cql3.ResultSet;
+import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.service.pager.QueryPager;
/** a utility for doing internal cql-based queries */
public abstract class UntypedResultSet implements Iterable<UntypedResultSet.Row>
@@ -40,6 +42,11 @@ public abstract class UntypedResultSet implements Iterable<UntypedResultSet.Row>
return new FromResultList(results);
}
+ public static UntypedResultSet create(SelectStatement select, QueryPager pager, int pageSize)
+ {
+ return new FromPager(select, pager, pageSize);
+ }
+
public boolean isEmpty()
{
return size() == 0;
@@ -122,6 +129,55 @@ public abstract class UntypedResultSet implements Iterable<UntypedResultSet.Row>
}
}
+ private static class FromPager extends UntypedResultSet
+ {
+ private final SelectStatement select;
+ private final QueryPager pager;
+ private final int pageSize;
+ private final List<ColumnSpecification> metadata;
+
+ private FromPager(SelectStatement select, QueryPager pager, int pageSize)
+ {
+ this.select = select;
+ this.pager = pager;
+ this.pageSize = pageSize;
+ this.metadata = select.getResultMetadata().names;
+ }
+
+ public int size()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public Row one()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public Iterator<Row> iterator()
+ {
+ return new AbstractIterator<Row>()
+ {
+ private Iterator<List<ByteBuffer>> currentPage;
+
+ protected Row computeNext()
+ {
+ try {
+ while (currentPage == null || !currentPage.hasNext())
+ {
+ if (pager.isExhausted())
+ return endOfData();
+ currentPage = select.process(pager.fetchPage(pageSize)).rows.iterator();
+ }
+ return new Row(metadata, currentPage.next());
+ } catch (RequestValidationException | RequestExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+ }
+
public static class Row
{
private final Map<String, ByteBuffer> data = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java b/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
index 5fcf085..b47dd92 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
@@ -45,7 +45,7 @@ public abstract class AuthenticationStatement extends ParsedStatement implements
public abstract ResultMessage execute(ClientState state) throws RequestExecutionException, RequestValidationException;
- public ResultMessage executeInternal(QueryState state)
+ public ResultMessage executeInternal(QueryState state, QueryOptions options)
{
// executeInternal is for local query only, thus altering users doesn't make sense and is not supported
throw new UnsupportedOperationException();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java b/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
index db4581e..2c7f2cb 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
@@ -47,7 +47,7 @@ public abstract class AuthorizationStatement extends ParsedStatement implements
public abstract ResultMessage execute(ClientState state) throws RequestValidationException, RequestExecutionException;
- public ResultMessage executeInternal(QueryState state)
+ public ResultMessage executeInternal(QueryState state, QueryOptions options)
{
// executeInternal is for local query only, thus altering permission doesn't make sense and is not supported
throw new UnsupportedOperationException();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 05d37da..5b058f3 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -311,10 +311,10 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
return new ResultMessage.Rows(ModificationStatement.buildCasResultSet(ksName, key, cfName, result, columnsWithConditions, true, options.forStatement(0)));
}
- public ResultMessage executeInternal(QueryState queryState) throws RequestValidationException, RequestExecutionException
+ public ResultMessage executeInternal(QueryState queryState, QueryOptions options) throws RequestValidationException, RequestExecutionException
{
assert !hasConditions;
- for (IMutation mutation : getMutations(BatchQueryOptions.DEFAULT, true, queryState.getTimestamp()))
+ for (IMutation mutation : getMutations(BatchQueryOptions.withoutPerStatementVariables(options), true, queryState.getTimestamp()))
{
// We don't use counters internally.
assert mutation instanceof Mutation;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index e1468fb..ad88eaf 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -621,12 +621,12 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
return builder.build();
}
- public ResultMessage executeInternal(QueryState queryState) throws RequestValidationException, RequestExecutionException
+ public ResultMessage executeInternal(QueryState queryState, QueryOptions options) throws RequestValidationException, RequestExecutionException
{
if (hasConditions())
throw new UnsupportedOperationException();
- for (IMutation mutation : getMutations(QueryOptions.DEFAULT, true, queryState.getTimestamp()))
+ for (IMutation mutation : getMutations(options, true, queryState.getTimestamp()))
{
// We don't use counters internally.
assert mutation instanceof Mutation;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
index 337e8dc..94df854 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
@@ -73,7 +73,7 @@ public abstract class SchemaAlteringStatement extends CFStatement implements CQL
return new ResultMessage.SchemaChange(changeType(), keyspace(), tableName);
}
- public ResultMessage executeInternal(QueryState state)
+ public ResultMessage executeInternal(QueryState state, QueryOptions options)
{
// executeInternal is for local query only, thus altering schema is not supported
throw new UnsupportedOperationException();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index b212147..420f475 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -191,18 +191,8 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
cl.validateForRead(keyspace());
int limit = getLimit(options);
- int limitForQuery = updateLimitForQuery(limit);
long now = System.currentTimeMillis();
- Pageable command;
- if (isKeyRange || usesSecondaryIndexing)
- {
- command = getRangeCommand(options, limitForQuery, now);
- }
- else
- {
- List<ReadCommand> commands = getSliceCommands(options, limitForQuery, now);
- command = commands == null ? null : new Pageable.ReadCommands(commands);
- }
+ Pageable command = getPageableCommand(options, limit, now);
int pageSize = options.getPageSize();
// A count query will never be paged for the user, but we always page it internally to avoid OOM.
@@ -234,6 +224,21 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
}
}
+ private Pageable getPageableCommand(QueryOptions options, int limit, long now) throws RequestValidationException
+ {
+ int limitForQuery = updateLimitForQuery(limit);
+ if (isKeyRange || usesSecondaryIndexing)
+ return getRangeCommand(options, limitForQuery, now);
+
+ List<ReadCommand> commands = getSliceCommands(options, limitForQuery, now);
+ return commands == null ? null : new Pageable.ReadCommands(commands);
+ }
+
+ public Pageable getPageableCommand(QueryOptions options) throws RequestValidationException
+ {
+ return getPageableCommand(options, getLimit(options), System.currentTimeMillis());
+ }
+
private ResultMessage.Rows execute(Pageable command, QueryOptions options, int limit, long now) throws RequestValidationException, RequestExecutionException
{
List<Row> rows;
@@ -285,23 +290,16 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
return rows;
}
- public ResultMessage.Rows executeInternal(QueryState state) throws RequestExecutionException, RequestValidationException
+ public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException
{
- QueryOptions options = QueryOptions.DEFAULT;
int limit = getLimit(options);
- int limitForQuery = updateLimitForQuery(limit);
long now = System.currentTimeMillis();
- List<Row> rows;
- if (isKeyRange || usesSecondaryIndexing)
- {
- RangeSliceCommand command = getRangeCommand(options, limitForQuery, now);
- rows = command == null ? Collections.<Row>emptyList() : command.executeLocally();
- }
- else
- {
- List<ReadCommand> commands = getSliceCommands(options, limitForQuery, now);
- rows = commands == null ? Collections.<Row>emptyList() : readLocally(keyspace(), commands);
- }
+ Pageable command = getPageableCommand(options, limit, now);
+ List<Row> rows = command == null
+ ? Collections.<Row>emptyList()
+ : (command instanceof Pageable.ReadCommands
+ ? readLocally(keyspace(), ((Pageable.ReadCommands)command).commands)
+ : ((RangeSliceCommand)command).executeLocally());
return processResults(rows, options, limit, now);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
index 30e57d5..ef1c4a4 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
@@ -77,7 +77,7 @@ public class TruncateStatement extends CFStatement implements CQLStatement
return null;
}
- public ResultMessage executeInternal(QueryState state)
+ public ResultMessage executeInternal(QueryState state, QueryOptions options)
{
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
index ee70f9d..efda72d 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
@@ -59,7 +59,7 @@ public class UseStatement extends ParsedStatement implements CQLStatement
return new ResultMessage.SetKeyspace(keyspace);
}
- public ResultMessage executeInternal(QueryState state)
+ public ResultMessage executeInternal(QueryState state, QueryOptions options)
{
// Internal queries are exclusively on the system keyspace and 'use' is thus useless
throw new UnsupportedOperationException();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index 1a441f6..4e7e412 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -58,6 +58,8 @@ import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.WrappedRunnable;
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
+
public class BatchlogManager implements BatchlogManagerMBean
{
private static final String MBEAN_NAME = "org.apache.cassandra.db:type=BatchlogManager";
@@ -97,7 +99,7 @@ public class BatchlogManager implements BatchlogManagerMBean
public int countAllBatches()
{
- return (int) process("SELECT count(*) FROM %s.%s", Keyspace.SYSTEM_KS, SystemKeyspace.BATCHLOG_CF).one().getLong("count");
+ return (int) executeInternal("SELECT count(*) FROM %s.%s", Keyspace.SYSTEM_KS, SystemKeyspace.BATCHLOG_CF).one().getLong("count");
}
public long getTotalBatchesReplayed()
@@ -166,10 +168,10 @@ public class BatchlogManager implements BatchlogManagerMBean
try
{
- UntypedResultSet page = process("SELECT id, data, written_at, version FROM %s.%s LIMIT %d",
- Keyspace.SYSTEM_KS,
- SystemKeyspace.BATCHLOG_CF,
- PAGE_SIZE);
+ UntypedResultSet page = executeInternal(String.format("SELECT id, data, written_at, version FROM %s.%s LIMIT %d",
+ Keyspace.SYSTEM_KS,
+ SystemKeyspace.BATCHLOG_CF,
+ PAGE_SIZE));
while (!page.isEmpty())
{
@@ -178,11 +180,11 @@ public class BatchlogManager implements BatchlogManagerMBean
if (page.size() < PAGE_SIZE)
break; // we've exhausted the batchlog, next query would be empty.
- page = process("SELECT id, data, written_at, version FROM %s.%s WHERE token(id) > token(%s) LIMIT %d",
- Keyspace.SYSTEM_KS,
- SystemKeyspace.BATCHLOG_CF,
- id,
- PAGE_SIZE);
+ page = executeInternal(String.format("SELECT id, data, written_at, version FROM %s.%s WHERE token(id) > token(?) LIMIT %d",
+ Keyspace.SYSTEM_KS,
+ SystemKeyspace.BATCHLOG_CF,
+ PAGE_SIZE),
+ id);
}
cleanup();
@@ -450,11 +452,6 @@ public class BatchlogManager implements BatchlogManagerMBean
CompactionManager.instance.submitUserDefined(cfs, descriptors, Integer.MAX_VALUE).get();
}
- private static UntypedResultSet process(String format, Object... args)
- {
- return QueryProcessor.processInternal(String.format(format, args));
- }
-
public static class EndpointFilter
{
private final String localRack;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 2025d5e..9cb6e94 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -61,7 +61,8 @@ import org.apache.cassandra.thrift.cassandraConstants;
import org.apache.cassandra.transport.Server;
import org.apache.cassandra.utils.*;
-import static org.apache.cassandra.cql3.QueryProcessor.processInternal;
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
+import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal;
public class SystemKeyspace
{
@@ -123,10 +124,7 @@ public class SystemKeyspace
// delete old, possibly obsolete entries in schema columnfamilies
for (String cfname : Arrays.asList(SystemKeyspace.SCHEMA_KEYSPACES_CF, SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF, SystemKeyspace.SCHEMA_COLUMNS_CF))
- {
- String req = String.format("DELETE FROM system.%s WHERE keyspace_name = '%s'", cfname, ksmd.name);
- processInternal(req);
- }
+ executeOnceInternal(String.format("DELETE FROM system.%s WHERE keyspace_name = ?", cfname), ksmd.name);
// (+1 to timestamp to make sure we don't get shadowed by the tombstones we just added)
ksmd.toSchema(FBUtilities.timestampMicros() + 1).apply();
@@ -135,24 +133,24 @@ public class SystemKeyspace
private static void setupVersion()
{
- String req = "INSERT INTO system.%s (key, release_version, cql_version, thrift_version, native_protocol_version, data_center, rack, partitioner) VALUES ('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s')";
+ String req = "INSERT INTO system.%s (key, release_version, cql_version, thrift_version, native_protocol_version, data_center, rack, partitioner) VALUES (?, ?, ?, ?, ?, ?, ?, ?)";
IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
- processInternal(String.format(req, LOCAL_CF,
- LOCAL_KEY,
- FBUtilities.getReleaseVersionString(),
- QueryProcessor.CQL_VERSION.toString(),
- cassandraConstants.VERSION,
- Server.CURRENT_VERSION,
- snitch.getDatacenter(FBUtilities.getBroadcastAddress()),
- snitch.getRack(FBUtilities.getBroadcastAddress()),
- DatabaseDescriptor.getPartitioner().getClass().getName()));
+ executeOnceInternal(String.format(req, LOCAL_CF),
+ LOCAL_KEY,
+ FBUtilities.getReleaseVersionString(),
+ QueryProcessor.CQL_VERSION.toString(),
+ cassandraConstants.VERSION,
+ String.valueOf(Server.CURRENT_VERSION),
+ snitch.getDatacenter(FBUtilities.getBroadcastAddress()),
+ snitch.getRack(FBUtilities.getBroadcastAddress()),
+ DatabaseDescriptor.getPartitioner().getClass().getName());
}
// TODO: In 3.0, remove this and the index_interval column from system.schema_columnfamilies
/** Migrates index_interval values to min_index_interval and sets index_interval to null */
private static void migrateIndexInterval()
{
- for (UntypedResultSet.Row row : processInternal(String.format("SELECT * FROM system.%s", SCHEMA_COLUMNFAMILIES_CF)))
+ for (UntypedResultSet.Row row : executeOnceInternal(String.format("SELECT * FROM system.%s", SCHEMA_COLUMNFAMILIES_CF)))
{
if (!row.has("index_interval"))
continue;
@@ -160,13 +158,8 @@ public class SystemKeyspace
logger.debug("Migrating index_interval to min_index_interval");
CFMetaData table = CFMetaData.fromSchema(row);
- String query = String.format("SELECT writetime(type) "
- + "FROM system.%s "
- + "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'",
- SCHEMA_COLUMNFAMILIES_CF,
- table.ksName,
- table.cfName);
- long timestamp = processInternal(query).one().getLong("writetime(type)");
+ String query = String.format("SELECT writetime(type) FROM system.%s WHERE keyspace_name = ? AND columnfamily_name = ?", SCHEMA_COLUMNFAMILIES_CF);
+ long timestamp = executeOnceInternal(query, table.ksName, table.cfName).one().getLong("writetime(type)");
try
{
table.toSchema(timestamp).apply();
@@ -180,7 +173,7 @@ public class SystemKeyspace
private static void migrateCachingOption()
{
- for (UntypedResultSet.Row row : processInternal(String.format("SELECT * FROM system.%s", SCHEMA_COLUMNFAMILIES_CF)))
+ for (UntypedResultSet.Row row : executeOnceInternal(String.format("SELECT * FROM system.%s", SCHEMA_COLUMNFAMILIES_CF)))
{
if (!row.has("caching"))
continue;
@@ -192,13 +185,8 @@ public class SystemKeyspace
CachingOptions caching = CachingOptions.fromString(row.getString("caching"));
CFMetaData table = CFMetaData.fromSchema(row);
logger.info("Migrating caching option {} to {} for {}.{}", row.getString("caching"), caching.toString(), table.ksName, table.cfName);
- String query = String.format("SELECT writetime(type) "
- + "FROM system.%s "
- + "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'",
- SCHEMA_COLUMNFAMILIES_CF,
- table.ksName,
- table.cfName);
- long timestamp = processInternal(query).one().getLong("writetime(type)");
+ String query = String.format("SELECT writetime(type) FROM system.%s WHERE keyspace_name = ? AND columnfamily_name = ?", SCHEMA_COLUMNFAMILIES_CF);
+ long timestamp = executeOnceInternal(query, table.ksName, table.cfName).one().getLong("writetime(type)");
table.toSchema(timestamp).apply();
}
catch (ConfigurationException e)
@@ -221,7 +209,6 @@ public class SystemKeyspace
return null;
UUID compactionId = UUIDGen.getTimeUUID();
- String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, inputs) VALUES (%s, '%s', '%s', {%s})";
Iterable<Integer> generations = Iterables.transform(toCompact, new Function<SSTableReader, Integer>()
{
public Integer apply(SSTableReader sstable)
@@ -229,7 +216,8 @@ public class SystemKeyspace
return sstable.descriptor.generation;
}
});
- processInternal(String.format(req, COMPACTION_LOG, compactionId, cfs.keyspace.getName(), cfs.name, StringUtils.join(Sets.newHashSet(generations), ',')));
+ String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, inputs) VALUES (?, ?, ?, ?)";
+ executeInternal(String.format(req, COMPACTION_LOG), compactionId, cfs.keyspace.getName(), cfs.name, Sets.newHashSet(generations));
forceBlockingFlush(COMPACTION_LOG);
return compactionId;
}
@@ -243,8 +231,7 @@ public class SystemKeyspace
{
assert taskId != null;
- String req = "DELETE FROM system.%s WHERE id = %s";
- processInternal(String.format(req, COMPACTION_LOG, taskId));
+ executeInternal(String.format("DELETE FROM system.%s WHERE id = ?", COMPACTION_LOG), taskId);
forceBlockingFlush(COMPACTION_LOG);
}
@@ -255,7 +242,7 @@ public class SystemKeyspace
public static Map<Pair<String, String>, Map<Integer, UUID>> getUnfinishedCompactions()
{
String req = "SELECT * FROM system.%s";
- UntypedResultSet resultSet = processInternal(String.format(req, COMPACTION_LOG));
+ UntypedResultSet resultSet = executeInternal(String.format(req, COMPACTION_LOG));
Map<Pair<String, String>, Map<Integer, UUID>> unfinishedCompactions = new HashMap<>();
for (UntypedResultSet.Row row : resultSet)
@@ -294,21 +281,20 @@ public class SystemKeyspace
// don't write anything when the history table itself is compacted, since that would in turn cause new compactions
if (ksname.equals("system") && cfname.equals(COMPACTION_HISTORY_CF))
return;
- String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, compacted_at, bytes_in, bytes_out, rows_merged) "
- + "VALUES (%s, '%s', '%s', %d, %d, %d, {%s})";
- processInternal(String.format(req, COMPACTION_HISTORY_CF, UUIDGen.getTimeUUID().toString(), ksname, cfname, compactedAt, bytesIn, bytesOut, FBUtilities.toString(rowsMerged)));
+ String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, compacted_at, bytes_in, bytes_out, rows_merged) VALUES (?, ?, ?, ?, ?, ?, ?)";
+ executeInternal(String.format(req, COMPACTION_HISTORY_CF), UUIDGen.getTimeUUID(), ksname, cfname, ByteBufferUtil.bytes(compactedAt), bytesIn, bytesOut, rowsMerged);
}
public static TabularData getCompactionHistory() throws OpenDataException
{
- UntypedResultSet queryResultSet = processInternal("SELECT * from system.compaction_history");
+ UntypedResultSet queryResultSet = executeInternal(String.format("SELECT * from system.%s", COMPACTION_HISTORY_CF));
return CompactionHistoryTabularData.from(queryResultSet);
}
public static synchronized void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
{
- String req = "UPDATE system.%s SET truncated_at = truncated_at + %s WHERE key = '%s'";
- processInternal(String.format(req, LOCAL_CF, truncationAsMapEntry(cfs, truncatedAt, position), LOCAL_KEY));
+ String req = "UPDATE system.%s SET truncated_at = truncated_at + ? WHERE key = '%s'";
+ executeInternal(String.format(req, LOCAL_CF, LOCAL_KEY), truncationAsMapEntry(cfs, truncatedAt, position));
truncationRecords = null;
forceBlockingFlush(LOCAL_CF);
}
@@ -318,13 +304,13 @@ public class SystemKeyspace
*/
public static synchronized void removeTruncationRecord(UUID cfId)
{
- String req = "DELETE truncated_at[%s] from system.%s WHERE key = '%s'";
- processInternal(String.format(req, cfId, LOCAL_CF, LOCAL_KEY));
+ String req = "DELETE truncated_at[?] from system.%s WHERE key = '%s'";
+ executeInternal(String.format(req, LOCAL_CF, LOCAL_KEY), cfId);
truncationRecords = null;
forceBlockingFlush(LOCAL_CF);
}
- private static String truncationAsMapEntry(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
+ private static Map<UUID, ByteBuffer> truncationAsMapEntry(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
{
DataOutputBuffer out = new DataOutputBuffer();
try
@@ -336,9 +322,7 @@ public class SystemKeyspace
{
throw new RuntimeException(e);
}
- return String.format("{%s: 0x%s}",
- cfs.metadata.cfId,
- ByteBufferUtil.bytesToHex(ByteBuffer.wrap(out.getData(), 0, out.getLength())));
+ return Collections.<UUID, ByteBuffer>singletonMap(cfs.metadata.cfId, ByteBuffer.wrap(out.getData(), 0, out.getLength()));
}
public static ReplayPosition getTruncatedPosition(UUID cfId)
@@ -362,9 +346,7 @@ public class SystemKeyspace
private static Map<UUID, Pair<ReplayPosition, Long>> readTruncationRecords()
{
- UntypedResultSet rows = processInternal(String.format("SELECT truncated_at FROM system.%s WHERE key = '%s'",
- LOCAL_CF,
- LOCAL_KEY));
+ UntypedResultSet rows = executeInternal(String.format("SELECT truncated_at FROM system.%s WHERE key = '%s'", LOCAL_CF, LOCAL_KEY));
Map<UUID, Pair<ReplayPosition, Long>> records = new HashMap<>();
@@ -402,53 +384,46 @@ public class SystemKeyspace
return;
}
- String req = "INSERT INTO system.%s (peer, tokens) VALUES ('%s', %s)";
- processInternal(String.format(req, PEERS_CF, ep.getHostAddress(), tokensAsSet(tokens)));
+ String req = "INSERT INTO system.%s (peer, tokens) VALUES (?, ?)";
+ executeInternal(String.format(req, PEERS_CF), ep, tokensAsSet(tokens));
}
public static synchronized void updatePreferredIP(InetAddress ep, InetAddress preferred_ip)
{
- String req = "INSERT INTO system.%s (peer, preferred_ip) VALUES ('%s', '%s')";
- processInternal(String.format(req, PEERS_CF, ep.getHostAddress(), preferred_ip.getHostAddress()));
+ String req = "INSERT INTO system.%s (peer, preferred_ip) VALUES (?, ?)";
+ executeInternal(String.format(req, PEERS_CF), ep, preferred_ip);
forceBlockingFlush(PEERS_CF);
}
- public static synchronized void updatePeerInfo(InetAddress ep, String columnName, String value)
+ public static synchronized void updatePeerInfo(InetAddress ep, String columnName, Object value)
{
if (ep.equals(FBUtilities.getBroadcastAddress()))
return;
- String req = "INSERT INTO system.%s (peer, %s) VALUES ('%s', %s)";
- processInternal(String.format(req, PEERS_CF, columnName, ep.getHostAddress(), value));
+ String req = "INSERT INTO system.%s (peer, %s) VALUES (?, ?)";
+ executeInternal(String.format(req, PEERS_CF, columnName), ep, value);
}
public static synchronized void updateHintsDropped(InetAddress ep, UUID timePeriod, int value)
{
// with 30 day TTL
- String req = "UPDATE system.%s USING TTL 2592000 SET hints_dropped[ %s ] = %s WHERE peer = '%s'";
- processInternal(String.format(req, PEER_EVENTS_CF, timePeriod.toString(), value, ep.getHostAddress()));
+ String req = "UPDATE system.%s USING TTL 2592000 SET hints_dropped[ ? ] = ? WHERE peer = ?";
+ executeInternal(String.format(req, PEER_EVENTS_CF), timePeriod, value, ep);
}
public static synchronized void updateSchemaVersion(UUID version)
{
- String req = "INSERT INTO system.%s (key, schema_version) VALUES ('%s', %s)";
- processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, version.toString()));
+ String req = "INSERT INTO system.%s (key, schema_version) VALUES ('%s', ?)";
+ executeInternal(String.format(req, LOCAL_CF, LOCAL_KEY), version);
}
- private static String tokensAsSet(Collection<Token> tokens)
+ private static Set<String> tokensAsSet(Collection<Token> tokens)
{
Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory();
- StringBuilder sb = new StringBuilder();
- sb.append("{");
- Iterator<Token> iter = tokens.iterator();
- while (iter.hasNext())
- {
- sb.append("'").append(factory.toString(iter.next())).append("'");
- if (iter.hasNext())
- sb.append(",");
- }
- sb.append("}");
- return sb.toString();
+ Set<String> s = new HashSet<>(tokens.size());
+ for (Token tk : tokens)
+ s.add(factory.toString(tk));
+ return s;
}
private static Collection<Token> deserializeTokens(Collection<String> tokensStrings)
@@ -465,8 +440,8 @@ public class SystemKeyspace
*/
public static synchronized void removeEndpoint(InetAddress ep)
{
- String req = "DELETE FROM system.%s WHERE peer = '%s'";
- processInternal(String.format(req, PEERS_CF, ep.getHostAddress()));
+ String req = "DELETE FROM system.%s WHERE peer = ?";
+ executeInternal(String.format(req, PEERS_CF), ep);
}
/**
@@ -475,8 +450,8 @@ public class SystemKeyspace
public static synchronized void updateTokens(Collection<Token> tokens)
{
assert !tokens.isEmpty() : "removeEndpoint should be used instead";
- String req = "INSERT INTO system.%s (key, tokens) VALUES ('%s', %s)";
- processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, tokensAsSet(tokens)));
+ String req = "INSERT INTO system.%s (key, tokens) VALUES ('%s', ?)";
+ executeInternal(String.format(req, LOCAL_CF, LOCAL_KEY), tokensAsSet(tokens));
forceBlockingFlush(LOCAL_CF);
}
@@ -509,7 +484,7 @@ public class SystemKeyspace
public static SetMultimap<InetAddress, Token> loadTokens()
{
SetMultimap<InetAddress, Token> tokenMap = HashMultimap.create();
- for (UntypedResultSet.Row row : processInternal("SELECT peer, tokens FROM system." + PEERS_CF))
+ for (UntypedResultSet.Row row : executeInternal("SELECT peer, tokens FROM system." + PEERS_CF))
{
InetAddress peer = row.getInetAddress("peer");
if (row.has("tokens"))
@@ -526,7 +501,7 @@ public class SystemKeyspace
public static Map<InetAddress, UUID> loadHostIds()
{
Map<InetAddress, UUID> hostIdMap = new HashMap<InetAddress, UUID>();
- for (UntypedResultSet.Row row : processInternal("SELECT peer, host_id FROM system." + PEERS_CF))
+ for (UntypedResultSet.Row row : executeInternal("SELECT peer, host_id FROM system." + PEERS_CF))
{
InetAddress peer = row.getInetAddress("peer");
if (row.has("host_id"))
@@ -539,8 +514,8 @@ public class SystemKeyspace
public static InetAddress getPreferredIP(InetAddress ep)
{
- String req = "SELECT preferred_ip FROM system.%s WHERE peer='%s'";
- UntypedResultSet result = processInternal(String.format(req, PEERS_CF, ep.getHostAddress()));
+ String req = "SELECT preferred_ip FROM system.%s WHERE peer=?";
+ UntypedResultSet result = executeInternal(String.format(req, PEERS_CF), ep);
if (!result.isEmpty() && result.one().has("preferred_ip"))
return result.one().getInetAddress("preferred_ip");
return null;
@@ -552,7 +527,7 @@ public class SystemKeyspace
public static Map<InetAddress, Map<String,String>> loadDcRackInfo()
{
Map<InetAddress, Map<String, String>> result = new HashMap<InetAddress, Map<String, String>>();
- for (UntypedResultSet.Row row : processInternal("SELECT peer, data_center, rack from system." + PEERS_CF))
+ for (UntypedResultSet.Row row : executeInternal("SELECT peer, data_center, rack from system." + PEERS_CF))
{
InetAddress peer = row.getInetAddress("peer");
if (row.has("data_center") && row.has("rack"))
@@ -590,7 +565,7 @@ public class SystemKeyspace
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(LOCAL_CF);
String req = "SELECT cluster_name FROM system.%s WHERE key='%s'";
- UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
+ UntypedResultSet result = executeInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
if (result.isEmpty() || !result.one().has("cluster_name"))
{
@@ -599,8 +574,8 @@ public class SystemKeyspace
throw new ConfigurationException("Found system keyspace files, but they couldn't be loaded!");
// no system files. this is a new node.
- req = "INSERT INTO system.%s (key, cluster_name) VALUES ('%s', '%s')";
- processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, DatabaseDescriptor.getClusterName()));
+ req = "INSERT INTO system.%s (key, cluster_name) VALUES ('%s', ?)";
+ executeInternal(String.format(req, LOCAL_CF, LOCAL_KEY), DatabaseDescriptor.getClusterName());
return;
}
@@ -612,7 +587,7 @@ public class SystemKeyspace
public static Collection<Token> getSavedTokens()
{
String req = "SELECT tokens FROM system.%s WHERE key='%s'";
- UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
+ UntypedResultSet result = executeInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
return result.isEmpty() || !result.one().has("tokens")
? Collections.<Token>emptyList()
: deserializeTokens(result.one().<String>getSet("tokens", UTF8Type.instance));
@@ -621,7 +596,7 @@ public class SystemKeyspace
public static int incrementAndGetGeneration()
{
String req = "SELECT gossip_generation FROM system.%s WHERE key='%s'";
- UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
+ UntypedResultSet result = executeInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
int generation;
if (result.isEmpty() || !result.one().has("gossip_generation"))
@@ -648,8 +623,8 @@ public class SystemKeyspace
}
}
- req = "INSERT INTO system.%s (key, gossip_generation) VALUES ('%s', %d)";
- processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, generation));
+ req = "INSERT INTO system.%s (key, gossip_generation) VALUES ('%s', ?)";
+ executeInternal(String.format(req, LOCAL_CF, LOCAL_KEY), generation);
forceBlockingFlush(LOCAL_CF);
return generation;
@@ -658,7 +633,7 @@ public class SystemKeyspace
public static BootstrapState getBootstrapState()
{
String req = "SELECT bootstrapped FROM system.%s WHERE key='%s'";
- UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
+ UntypedResultSet result = executeInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
if (result.isEmpty() || !result.one().has("bootstrapped"))
return BootstrapState.NEEDS_BOOTSTRAP;
@@ -678,8 +653,8 @@ public class SystemKeyspace
public static void setBootstrapState(BootstrapState state)
{
- String req = "INSERT INTO system.%s (key, bootstrapped) VALUES ('%s', '%s')";
- processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, state.name()));
+ String req = "INSERT INTO system.%s (key, bootstrapped) VALUES ('%s', ?)";
+ executeInternal(String.format(req, LOCAL_CF, LOCAL_KEY), state.name());
forceBlockingFlush(LOCAL_CF);
}
@@ -716,7 +691,7 @@ public class SystemKeyspace
UUID hostId = null;
String req = "SELECT host_id FROM system.%s WHERE key='%s'";
- UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
+ UntypedResultSet result = executeInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
// Look up the Host UUID (return it if found)
if (!result.isEmpty() && result.one().has("host_id"))
@@ -735,8 +710,8 @@ public class SystemKeyspace
*/
public static UUID setLocalHostId(UUID hostId)
{
- String req = "INSERT INTO system.%s (key, host_id) VALUES ('%s', %s)";
- processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, hostId));
+ String req = "INSERT INTO system.%s (key, host_id) VALUES ('%s', ?)";
+ executeInternal(String.format(req, LOCAL_CF, LOCAL_KEY), hostId);
return hostId;
}
@@ -890,8 +865,8 @@ public class SystemKeyspace
public static PaxosState loadPaxosState(ByteBuffer key, CFMetaData metadata)
{
- String req = "SELECT * FROM system.%s WHERE row_key = 0x%s AND cf_id = %s";
- UntypedResultSet results = processInternal(String.format(req, PAXOS_CF, ByteBufferUtil.bytesToHex(key), metadata.cfId));
+ String req = "SELECT * FROM system.%s WHERE row_key = ? AND cf_id = ?";
+ UntypedResultSet results = executeInternal(String.format(req, PAXOS_CF), key, metadata.cfId);
if (results.isEmpty())
return new PaxosState(key, metadata);
UntypedResultSet.Row row = results.one();
@@ -911,26 +886,24 @@ public class SystemKeyspace
public static void savePaxosPromise(Commit promise)
{
- String req = "UPDATE %s USING TIMESTAMP %d AND TTL %d SET in_progress_ballot = %s WHERE row_key = 0x%s AND cf_id = %s";
- processInternal(String.format(req,
- PAXOS_CF,
- UUIDGen.microsTimestamp(promise.ballot),
- paxosTtl(promise.update.metadata),
- promise.ballot,
- ByteBufferUtil.bytesToHex(promise.key),
- promise.update.id()));
+ String req = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET in_progress_ballot = ? WHERE row_key = ? AND cf_id = ?";
+ executeInternal(String.format(req, PAXOS_CF),
+ UUIDGen.microsTimestamp(promise.ballot),
+ paxosTtl(promise.update.metadata),
+ promise.ballot,
+ promise.key,
+ promise.update.id());
}
public static void savePaxosProposal(Commit proposal)
{
- processInternal(String.format("UPDATE %s USING TIMESTAMP %d AND TTL %d SET proposal_ballot = %s, proposal = 0x%s WHERE row_key = 0x%s AND cf_id = %s",
- PAXOS_CF,
- UUIDGen.microsTimestamp(proposal.ballot),
- paxosTtl(proposal.update.metadata),
- proposal.ballot,
- ByteBufferUtil.bytesToHex(proposal.update.toBytes()),
- ByteBufferUtil.bytesToHex(proposal.key),
- proposal.update.id()));
+ executeInternal(String.format("UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = ?, proposal = ? WHERE row_key = ? AND cf_id = ?", PAXOS_CF),
+ UUIDGen.microsTimestamp(proposal.ballot),
+ paxosTtl(proposal.update.metadata),
+ proposal.ballot,
+ proposal.update.toBytes(),
+ proposal.key,
+ proposal.update.id());
}
private static int paxosTtl(CFMetaData metadata)
@@ -943,15 +916,14 @@ public class SystemKeyspace
{
// We always erase the last proposal (with the commit timestamp to no erase more recent proposal in case the commit is old)
// even though that's really just an optimization since SP.beginAndRepairPaxos will exclude accepted proposal older than the mrc.
- String cql = "UPDATE %s USING TIMESTAMP %d AND TTL %d SET proposal_ballot = null, proposal = null, most_recent_commit_at = %s, most_recent_commit = 0x%s WHERE row_key = 0x%s AND cf_id = %s";
- processInternal(String.format(cql,
- PAXOS_CF,
- UUIDGen.microsTimestamp(commit.ballot),
- paxosTtl(commit.update.metadata),
- commit.ballot,
- ByteBufferUtil.bytesToHex(commit.update.toBytes()),
- ByteBufferUtil.bytesToHex(commit.key),
- commit.update.id()));
+ String cql = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = null, proposal = null, most_recent_commit_at = ?, most_recent_commit = ? WHERE row_key = ? AND cf_id = ?";
+ executeInternal(String.format(cql, PAXOS_CF),
+ UUIDGen.microsTimestamp(commit.ballot),
+ paxosTtl(commit.update.metadata),
+ commit.ballot,
+ commit.update.toBytes(),
+ commit.key,
+ commit.update.id());
}
/**
@@ -963,12 +935,8 @@ public class SystemKeyspace
*/
public static RestorableMeter getSSTableReadMeter(String keyspace, String table, int generation)
{
- String cql = "SELECT * FROM %s WHERE keyspace_name='%s' and columnfamily_name='%s' and generation=%d";
- UntypedResultSet results = processInternal(String.format(cql,
- SSTABLE_ACTIVITY_CF,
- keyspace,
- table,
- generation));
+ String cql = "SELECT * FROM system.%s WHERE keyspace_name=? and columnfamily_name=? and generation=?";
+ UntypedResultSet results = executeInternal(String.format(cql, SSTABLE_ACTIVITY_CF), keyspace, table, generation);
if (results.isEmpty())
return new RestorableMeter();
@@ -985,14 +953,13 @@ public class SystemKeyspace
public static void persistSSTableReadMeter(String keyspace, String table, int generation, RestorableMeter meter)
{
// Store values with a one-day TTL to handle corner cases where cleanup might not occur
- String cql = "INSERT INTO %s (keyspace_name, columnfamily_name, generation, rate_15m, rate_120m) VALUES ('%s', '%s', %d, %f, %f) USING TTL 864000";
- processInternal(String.format(cql,
- SSTABLE_ACTIVITY_CF,
- keyspace,
- table,
- generation,
- meter.fifteenMinuteRate(),
- meter.twoHourRate()));
+ String cql = "INSERT INTO system.%s (keyspace_name, columnfamily_name, generation, rate_15m, rate_120m) VALUES (?, ?, ?, ?, ?) USING TTL 864000";
+ executeInternal(String.format(cql, SSTABLE_ACTIVITY_CF),
+ keyspace,
+ table,
+ generation,
+ meter.fifteenMinuteRate(),
+ meter.twoHourRate());
}
/**
@@ -1000,7 +967,7 @@ public class SystemKeyspace
*/
public static void clearSSTableReadMeter(String keyspace, String table, int generation)
{
- String cql = "DELETE FROM %s WHERE keyspace_name='%s' AND columnfamily_name='%s' and generation=%d";
- processInternal(String.format(cql, SSTABLE_ACTIVITY_CF, keyspace, table, generation));
+ String cql = "DELETE FROM system.%s WHERE keyspace_name=? AND columnfamily_name=? and generation=?";
+ executeInternal(String.format(cql, SSTABLE_ACTIVITY_CF), keyspace, table, generation);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java b/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java
index 5591ea4..860619a 100644
--- a/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java
+++ b/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java
@@ -17,7 +17,7 @@
*/
package org.apache.cassandra.service;
-import static org.apache.cassandra.cql3.QueryProcessor.processInternal;
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
import java.nio.ByteBuffer;
import java.util.Collections;
@@ -74,7 +74,7 @@ class RangeTransfer implements Runnable
public void run()
{
- UntypedResultSet res = processInternal("SELECT * FROM system." + SystemKeyspace.RANGE_XFERS_CF);
+ UntypedResultSet res = executeInternal("SELECT * FROM system." + SystemKeyspace.RANGE_XFERS_CF);
if (res.size() < 1)
{
@@ -103,9 +103,7 @@ class RangeTransfer implements Runnable
finally
{
LOG.debug("Removing queued entry for transfer of {}", token);
- processInternal(String.format("DELETE FROM system.%s WHERE token_bytes = 0x%s",
- SystemKeyspace.RANGE_XFERS_CF,
- ByteBufferUtil.bytesToHex(tokenBytes)));
+ executeInternal(String.format("DELETE FROM system.%s WHERE token_bytes = ?", SystemKeyspace.RANGE_XFERS_CF), tokenBytes);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index e29530a..2425baf 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1431,23 +1431,30 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
switch (state)
{
case RELEASE_VERSION:
- SystemKeyspace.updatePeerInfo(endpoint, "release_version", quote(value.value));
+ SystemKeyspace.updatePeerInfo(endpoint, "release_version", value.value);
break;
case DC:
- SystemKeyspace.updatePeerInfo(endpoint, "data_center", quote(value.value));
+ SystemKeyspace.updatePeerInfo(endpoint, "data_center", value.value);
break;
case RACK:
- SystemKeyspace.updatePeerInfo(endpoint, "rack", quote(value.value));
+ SystemKeyspace.updatePeerInfo(endpoint, "rack", value.value);
break;
case RPC_ADDRESS:
- SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", quote(value.value));
+ try
+ {
+ SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", InetAddress.getByName(value.value));
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
break;
case SCHEMA:
- SystemKeyspace.updatePeerInfo(endpoint, "schema_version", value.value);
+ SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(value.value));
MigrationManager.instance.scheduleSchemaPull(endpoint, epState);
break;
case HOST_ID:
- SystemKeyspace.updatePeerInfo(endpoint, "host_id", value.value);
+ SystemKeyspace.updatePeerInfo(endpoint, "host_id", UUID.fromString(value.value));
break;
}
}
@@ -1461,32 +1468,34 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
switch (entry.getKey())
{
case RELEASE_VERSION:
- SystemKeyspace.updatePeerInfo(endpoint, "release_version", quote(entry.getValue().value));
+ SystemKeyspace.updatePeerInfo(endpoint, "release_version", entry.getValue().value);
break;
case DC:
- SystemKeyspace.updatePeerInfo(endpoint, "data_center", quote(entry.getValue().value));
+ SystemKeyspace.updatePeerInfo(endpoint, "data_center", entry.getValue().value);
break;
case RACK:
- SystemKeyspace.updatePeerInfo(endpoint, "rack", quote(entry.getValue().value));
+ SystemKeyspace.updatePeerInfo(endpoint, "rack", entry.getValue().value);
break;
case RPC_ADDRESS:
- SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", quote(entry.getValue().value));
+ try
+ {
+ SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", InetAddress.getByName(entry.getValue().value));
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
break;
case SCHEMA:
- SystemKeyspace.updatePeerInfo(endpoint, "schema_version", entry.getValue().value);
+ SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(entry.getValue().value));
break;
case HOST_ID:
- SystemKeyspace.updatePeerInfo(endpoint, "host_id", entry.getValue().value);
+ SystemKeyspace.updatePeerInfo(endpoint, "host_id", UUID.fromString(entry.getValue().value));
break;
}
}
}
- private String quote(String value)
- {
- return "'" + value + "'";
- }
-
private byte[] getApplicationStateValue(InetAddress endpoint, ApplicationState appstate)
{
String vvalue = Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(appstate).value;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
index 9982be9..e0eae6b 100644
--- a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
@@ -93,7 +93,7 @@ public class BatchlogManagerTest extends SchemaLoader
for (int i = 0; i < 1000; i++)
{
- UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT * FROM \"Keyspace1\".\"Standard1\" WHERE key = intAsBlob(%d)", i));
+ UntypedResultSet result = QueryProcessor.executeInternal(String.format("SELECT * FROM \"Keyspace1\".\"Standard1\" WHERE key = intAsBlob(%d)", i));
if (i < 500)
{
assertEquals(bytes(i), result.one().getBytes("key"));
@@ -107,7 +107,7 @@ public class BatchlogManagerTest extends SchemaLoader
}
// Ensure that no stray mutations got somehow applied.
- UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT count(*) FROM \"Keyspace1\".\"Standard1\""));
+ UntypedResultSet result = QueryProcessor.executeInternal(String.format("SELECT count(*) FROM \"Keyspace1\".\"Standard1\""));
assertEquals(500, result.one().getLong("count"));
}
@@ -157,7 +157,7 @@ public class BatchlogManagerTest extends SchemaLoader
// We should see half of Standard2-targeted mutations written after the replay and all of Standard3 mutations applied.
for (int i = 0; i < 1000; i++)
{
- UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT * FROM \"Keyspace1\".\"Standard2\" WHERE key = intAsBlob(%d)", i));
+ UntypedResultSet result = QueryProcessor.executeInternal(String.format("SELECT * FROM \"Keyspace1\".\"Standard2\" WHERE key = intAsBlob(%d)", i));
if (i >= 500)
{
assertEquals(bytes(i), result.one().getBytes("key"));
@@ -172,7 +172,7 @@ public class BatchlogManagerTest extends SchemaLoader
for (int i = 0; i < 1000; i++)
{
- UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT * FROM \"Keyspace1\".\"Standard3\" WHERE key = intAsBlob(%d)", i));
+ UntypedResultSet result = QueryProcessor.executeInternal(String.format("SELECT * FROM \"Keyspace1\".\"Standard3\" WHERE key = intAsBlob(%d)", i));
assertEquals(bytes(i), result.one().getBytes("key"));
assertEquals(bytes(i), result.one().getBytes("column1"));
assertEquals(bytes(i), result.one().getBytes("value"));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/HintedHandOffTest.java b/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
index 622c816..9bc0724 100644
--- a/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
+++ b/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
@@ -40,7 +40,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import static org.junit.Assert.assertEquals;
-import static org.apache.cassandra.cql3.QueryProcessor.processInternal;
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
public class HintedHandOffTest extends SchemaLoader
{
@@ -92,7 +92,7 @@ public class HintedHandOffTest extends SchemaLoader
HintedHandOffManager.instance.metrics.incrPastWindow(InetAddress.getLocalHost());
HintedHandOffManager.instance.metrics.log();
- UntypedResultSet rows = processInternal("SELECT hints_dropped FROM system." + SystemKeyspace.PEER_EVENTS_CF);
+ UntypedResultSet rows = executeInternal("SELECT hints_dropped FROM system." + SystemKeyspace.PEER_EVENTS_CF);
Map<UUID, Integer> returned = rows.one().getMap("hints_dropped", UUIDType.instance, Int32Type.instance);
assertEquals(Iterators.getLast(returned.values().iterator()).intValue(), 99);
}
@@ -129,7 +129,7 @@ public class HintedHandOffTest extends SchemaLoader
private int getNoOfHints()
{
String req = "SELECT * FROM system.%s";
- UntypedResultSet resultSet = processInternal(String.format(req, SystemKeyspace.HINTS_CF));
+ UntypedResultSet resultSet = executeInternal(String.format(req, SystemKeyspace.HINTS_CF));
return resultSet.size();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index 220e2a4..571fe0e 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -288,7 +288,7 @@ public class ScrubTest extends SchemaLoader
Keyspace keyspace = Keyspace.open("Keyspace1");
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("test_compact_static_columns");
- QueryProcessor.processInternal("INSERT INTO \"Keyspace1\".test_compact_static_columns (a, b, c, d) VALUES (123, c3db07e8-b602-11e3-bc6b-e0b9a54a6d93, true, 'foobar')");
+ QueryProcessor.executeInternal("INSERT INTO \"Keyspace1\".test_compact_static_columns (a, b, c, d) VALUES (123, c3db07e8-b602-11e3-bc6b-e0b9a54a6d93, true, 'foobar')");
cfs.forceBlockingFlush();
CompactionManager.instance.performScrub(cfs, false);
}
@@ -324,14 +324,14 @@ public class ScrubTest extends SchemaLoader
Keyspace keyspace = Keyspace.open("Keyspace1");
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("test_compact_dynamic_columns");
- QueryProcessor.processInternal("INSERT INTO \"Keyspace1\".test_compact_dynamic_columns (a, b, c) VALUES (0, 'a', 'foo')");
- QueryProcessor.processInternal("INSERT INTO \"Keyspace1\".test_compact_dynamic_columns (a, b, c) VALUES (0, 'b', 'bar')");
- QueryProcessor.processInternal("INSERT INTO \"Keyspace1\".test_compact_dynamic_columns (a, b, c) VALUES (0, 'c', 'boo')");
+ QueryProcessor.executeInternal("INSERT INTO \"Keyspace1\".test_compact_dynamic_columns (a, b, c) VALUES (0, 'a', 'foo')");
+ QueryProcessor.executeInternal("INSERT INTO \"Keyspace1\".test_compact_dynamic_columns (a, b, c) VALUES (0, 'b', 'bar')");
+ QueryProcessor.executeInternal("INSERT INTO \"Keyspace1\".test_compact_dynamic_columns (a, b, c) VALUES (0, 'c', 'boo')");
cfs.forceBlockingFlush();
CompactionManager.instance.performScrub(cfs, true);
// Scrub is silent, but it will remove broken records. So reading everything back to make sure nothing to "scrubbed away"
- UntypedResultSet rs = QueryProcessor.processInternal("SELECT * FROM \"Keyspace1\".test_compact_dynamic_columns");
+ UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM \"Keyspace1\".test_compact_dynamic_columns");
assertEquals(3, rs.size());
Iterator<UntypedResultSet.Row> iter = rs.iterator();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
index 751e7ae..912c7f1 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
@@ -37,7 +37,7 @@ import static org.apache.cassandra.db.KeyspaceTest.assertColumns;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import static org.apache.cassandra.cql3.QueryProcessor.processInternal;
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
import static org.apache.cassandra.Util.cellname;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -329,20 +329,20 @@ public class CompactionsPurgeTest extends SchemaLoader
cfs.disableAutoCompaction();
// write a row out to one sstable
- processInternal(String.format("INSERT INTO %s.%s (k, v1, v2) VALUES (%d, '%s', %d)",
+ executeInternal(String.format("INSERT INTO %s.%s (k, v1, v2) VALUES (%d, '%s', %d)",
keyspace, table, 1, "foo", 1));
cfs.forceBlockingFlush();
- UntypedResultSet result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+ UntypedResultSet result = executeInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
assertEquals(1, result.size());
// write a row tombstone out to a second sstable
- processInternal(String.format("DELETE FROM %s.%s WHERE k = %d", keyspace, table, 1));
+ executeInternal(String.format("DELETE FROM %s.%s WHERE k = %d", keyspace, table, 1));
cfs.forceBlockingFlush();
// basic check that the row is considered deleted
assertEquals(2, cfs.getSSTables().size());
- result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+ result = executeInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
assertEquals(0, result.size());
// compact the two sstables with a gcBefore that does *not* allow the row tombstone to be purged
@@ -351,19 +351,19 @@ public class CompactionsPurgeTest extends SchemaLoader
// the data should be gone, but the tombstone should still exist
assertEquals(1, cfs.getSSTables().size());
- result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+ result = executeInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
assertEquals(0, result.size());
// write a row out to one sstable
- processInternal(String.format("INSERT INTO %s.%s (k, v1, v2) VALUES (%d, '%s', %d)",
+ executeInternal(String.format("INSERT INTO %s.%s (k, v1, v2) VALUES (%d, '%s', %d)",
keyspace, table, 1, "foo", 1));
cfs.forceBlockingFlush();
assertEquals(2, cfs.getSSTables().size());
- result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+ result = executeInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
assertEquals(1, result.size());
// write a row tombstone out to a different sstable
- processInternal(String.format("DELETE FROM %s.%s WHERE k = %d", keyspace, table, 1));
+ executeInternal(String.format("DELETE FROM %s.%s WHERE k = %d", keyspace, table, 1));
cfs.forceBlockingFlush();
// compact the two sstables with a gcBefore that *does* allow the row tombstone to be purged
@@ -372,7 +372,7 @@ public class CompactionsPurgeTest extends SchemaLoader
// both the data and the tombstone should be gone this time
assertEquals(0, cfs.getSSTables().size());
- result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+ result = executeInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
assertEquals(0, result.size());
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
index c35a1df..f60e173 100644
--- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
@@ -93,7 +93,7 @@ public class CQLSSTableWriterTest
loader.stream().get();
- UntypedResultSet rs = QueryProcessor.processInternal("SELECT * FROM cql_keyspace.table1;");
+ UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM cql_keyspace.table1;");
assertEquals(4, rs.size());
Iterator<UntypedResultSet.Row> iter = rs.iterator();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
index ff5a394..eef8c86 100644
--- a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
+++ b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
@@ -666,8 +666,8 @@ public class LeaveAndBootstrapTest
Util.createInitialRing(ss, partitioner, endpointTokens, new ArrayList<Token>(), hosts, new ArrayList<UUID>(), 2);
InetAddress toRemove = hosts.get(1);
- SystemKeyspace.updatePeerInfo(toRemove, "data_center", "'dc42'");
- SystemKeyspace.updatePeerInfo(toRemove, "rack", "'rack42'");
+ SystemKeyspace.updatePeerInfo(toRemove, "data_center", "dc42");
+ SystemKeyspace.updatePeerInfo(toRemove, "rack", "rack42");
assertEquals("rack42", SystemKeyspace.loadDcRackInfo().get(toRemove).get("rack"));
// mark the node as removed
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/test/unit/org/apache/cassandra/service/QueryPagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/QueryPagerTest.java b/test/unit/org/apache/cassandra/service/QueryPagerTest.java
index abd030d..e71e97a 100644
--- a/test/unit/org/apache/cassandra/service/QueryPagerTest.java
+++ b/test/unit/org/apache/cassandra/service/QueryPagerTest.java
@@ -38,7 +38,7 @@ import org.apache.cassandra.service.pager.*;
import org.apache.cassandra.utils.ByteBufferUtil;
import static org.junit.Assert.*;
-import static org.apache.cassandra.cql3.QueryProcessor.processInternal;
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
import static org.apache.cassandra.Util.range;
import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
@@ -341,7 +341,7 @@ public class QueryPagerTest extends SchemaLoader
// Insert rows but with a tombstone as last cell
for (int i = 0; i < 5; i++)
- processInternal(String.format("INSERT INTO %s.%s (k, c, v) VALUES ('k%d', 'c%d', null)", keyspace, table, 0, i));
+ executeInternal(String.format("INSERT INTO %s.%s (k, c, v) VALUES ('k%d', 'c%d', null)", keyspace, table, 0, i));
SliceQueryFilter filter = new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, false, 100);
QueryPager pager = QueryPagers.localPager(new SliceFromReadCommand(keyspace, bytes("k0"), table, 0, filter));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/test/unit/org/apache/cassandra/triggers/TriggersTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/triggers/TriggersTest.java b/test/unit/org/apache/cassandra/triggers/TriggersTest.java
index ee97d8a..74fde69 100644
--- a/test/unit/org/apache/cassandra/triggers/TriggersTest.java
+++ b/test/unit/org/apache/cassandra/triggers/TriggersTest.java
@@ -289,7 +289,7 @@ public class TriggersTest extends SchemaLoader
private void assertUpdateIsAugmented(int key)
{
- UntypedResultSet rs = QueryProcessor.processInternal(
+ UntypedResultSet rs = QueryProcessor.executeInternal(
String.format("SELECT * FROM %s.%s WHERE k=%s", ksName, cfName, key));
assertTrue(String.format("Expected value (%s) for augmented cell v2 was not found", key), rs.one().has("v2"));
assertEquals(999, rs.one().getInt("v2"));
@@ -297,7 +297,7 @@ public class TriggersTest extends SchemaLoader
private void assertUpdateNotExecuted(String cf, int key)
{
- UntypedResultSet rs = QueryProcessor.processInternal(
+ UntypedResultSet rs = QueryProcessor.executeInternal(
String.format("SELECT * FROM %s.%s WHERE k=%s", ksName, cf, key));
assertTrue(rs.isEmpty());
}
[2/4] git commit: Merge commit
'362e54803434053fea25f874f64c69bdc1db78da' into cassandra-2.1
Posted by sl...@apache.org.
Merge commit '362e54803434053fea25f874f64c69bdc1db78da' into cassandra-2.1
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c3ec8fa1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c3ec8fa1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c3ec8fa1
Branch: refs/heads/trunk
Commit: c3ec8fa11b322d01044976c43bcfe18c58b08ed8
Parents: 6127f85 362e548
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu May 22 14:43:31 2014 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu May 22 14:43:31 2014 +0200
----------------------------------------------------------------------
----------------------------------------------------------------------
[4/4] git commit: Merge branch 'cassandra-2.1' into trunk
Posted by sl...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5f643ffc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5f643ffc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5f643ffc
Branch: refs/heads/trunk
Commit: 5f643ffcc3ebdb9ba4295bb09098790914df7b9b
Parents: d4bf6d3 1147ee3
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu May 22 14:46:33 2014 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu May 22 14:46:33 2014 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/cql3/CQLStatement.java | 2 +-
.../org/apache/cassandra/cql3/QueryOptions.java | 5 +
.../apache/cassandra/cql3/QueryProcessor.java | 103 +++++++-
.../apache/cassandra/cql3/UntypedResultSet.java | 58 ++++-
.../statements/AuthenticationStatement.java | 2 +-
.../cql3/statements/AuthorizationStatement.java | 2 +-
.../cql3/statements/BatchStatement.java | 4 +-
.../cql3/statements/ModificationStatement.java | 4 +-
.../statements/SchemaAlteringStatement.java | 2 +-
.../cql3/statements/SelectStatement.java | 48 ++--
.../cql3/statements/TruncateStatement.java | 2 +-
.../cassandra/cql3/statements/UseStatement.java | 2 +-
.../apache/cassandra/db/BatchlogManager.java | 27 +-
.../org/apache/cassandra/db/SystemKeyspace.java | 245 ++++++++-----------
.../ScheduledRangeTransferExecutorService.java | 8 +-
.../cassandra/service/StorageService.java | 43 ++--
.../cassandra/db/BatchlogManagerTest.java | 8 +-
.../apache/cassandra/db/HintedHandOffTest.java | 6 +-
.../unit/org/apache/cassandra/db/ScrubTest.java | 10 +-
.../db/compaction/CompactionsPurgeTest.java | 20 +-
.../io/sstable/CQLSSTableWriterTest.java | 2 +-
.../service/LeaveAndBootstrapTest.java | 4 +-
.../cassandra/service/QueryPagerTest.java | 4 +-
.../apache/cassandra/triggers/TriggersTest.java | 4 +-
25 files changed, 368 insertions(+), 248 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f643ffc/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f643ffc/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f643ffc/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f643ffc/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f643ffc/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
index 5820312,912c7f1..80608f5
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
@@@ -36,10 -36,8 +36,10 @@@ import static org.junit.Assert.assertEq
import static org.apache.cassandra.db.KeyspaceTest.assertColumns;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
- import static org.apache.cassandra.cql3.QueryProcessor.processInternal;
+ import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
import static org.apache.cassandra.Util.cellname;
import org.apache.cassandra.utils.ByteBufferUtil;