You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ee...@apache.org on 2010/10/31 15:37:27 UTC
svn commit: r1029365 - in
/cassandra/trunk/src/java/org/apache/cassandra/cql: QueryProcessor.java
SelectExpression.java Term.java
Author: eevans
Date: Sun Oct 31 14:37:27 2010
New Revision: 1029365
URL: http://svn.apache.org/viewvc?rev=1029365&view=rev
Log:
refactor QueryProcessor
- encapsulate the different reads better
- eliminate duplicate code
Patch by eevans
Modified:
cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java
cassandra/trunk/src/java/org/apache/cassandra/cql/SelectExpression.java
cassandra/trunk/src/java/org/apache/cassandra/cql/Term.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java?rev=1029365&r1=1029364&r2=1029365&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java Sun Oct 31 14:37:27 2010
@@ -25,9 +25,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.antlr.runtime.ANTLRStringStream;
@@ -42,8 +40,6 @@ import org.apache.cassandra.avro.CqlRow;
import org.apache.cassandra.avro.InvalidRequestException;
import org.apache.cassandra.avro.TimedOutException;
import org.apache.cassandra.avro.UnavailableException;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.RangeSliceCommand;
import org.apache.cassandra.db.ReadCommand;
@@ -57,7 +53,6 @@ import org.apache.cassandra.dht.IPartiti
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
import org.slf4j.Logger;
@@ -69,15 +64,43 @@ public class QueryProcessor
{
private static final Logger logger = LoggerFactory.getLogger(QueryProcessor.class);
- public static Map<DecoratedKey<?>, ColumnFamily> readColumnFamily(List<ReadCommand> commands, ConsistencyLevel cLevel)
- throws UnavailableException, InvalidRequestException, TimedOutException
+ private static List<org.apache.cassandra.db.Row> multiSlice(String keyspace, SelectStatement select)
+ throws InvalidRequestException, TimedOutException, UnavailableException
{
- Map<DecoratedKey<?>, ColumnFamily> columnFamilyKeyMap = new HashMap<DecoratedKey<?>, ColumnFamily>();
- List<org.apache.cassandra.db.Row> rows;
+ List<org.apache.cassandra.db.Row> rows = null;
+ QueryPath queryPath = new QueryPath(select.getColumnFamily());
+ List<ReadCommand> commands = new ArrayList<ReadCommand>();
+ for (Term keyName : select.getKeyPredicates().getTerms())
+ {
+ ByteBuffer key = keyName.getByteBuffer();
+ validateKey(key);
+
+ // ...of a list of column names
+ if ((!select.getColumnPredicates().isRange()) && select.getColumnPredicates().isInitialized())
+ {
+ Collection<ByteBuffer> columnNames = new ArrayList<ByteBuffer>();
+ for (Term column : select.getColumnPredicates().getTerms())
+ columnNames.add(column.getByteBuffer());
+
+ commands.add(new SliceByNamesReadCommand(keyspace, key, queryPath, columnNames));
+ }
+ // ...a range (slice) of column names
+ else
+ {
+ commands.add(new SliceFromReadCommand(keyspace,
+ key,
+ queryPath,
+ select.getColumnPredicates().getStart().getByteBuffer(),
+ select.getColumnPredicates().getFinish().getByteBuffer(),
+ select.reversed(),
+ select.getNumColumns()));
+ }
+ }
+
try
{
- rows = StorageProxy.readProtocol(commands, cLevel);
+ rows = StorageProxy.readProtocol(commands, select.getConsistencyLevel());
}
catch (TimeoutException e)
{
@@ -100,10 +123,63 @@ public class QueryProcessor
throw error;
}
- for (org.apache.cassandra.db.Row row: rows)
- columnFamilyKeyMap.put(row.key, row.cf);
+ return rows;
+ }
+
+ private static List<org.apache.cassandra.db.Row> multiRangeSlice(String keyspace, SelectStatement select)
+ throws TimedOutException, UnavailableException
+ {
+ List<org.apache.cassandra.db.Row> rows = null;
- return columnFamilyKeyMap;
+ // FIXME: ranges can be open-ended, but a start must exist. Assert so here.
+
+ IPartitioner<?> p = StorageService.getPartitioner();
+ AbstractBounds bounds = new Bounds(p.getToken(select.getKeyPredicates().getStart().getByteBuffer()),
+ p.getToken(select.getKeyPredicates().getFinish().getByteBuffer()));
+
+ // XXX: Our use of Thrift structs internally makes me Sad. :(
+ SlicePredicate thriftSlicePredicate = new SlicePredicate();
+ if (select.getColumnPredicates().isRange() || select.getColumnPredicates().getTerms().size() == 0)
+ {
+ SliceRange sliceRange = new SliceRange();
+ sliceRange.start = select.getColumnPredicates().getStart().getByteBuffer();
+ sliceRange.finish = select.getColumnPredicates().getFinish().getByteBuffer();
+ sliceRange.reversed = false; // FIXME: hard-coded
+ sliceRange.count = select.getNumColumns();
+ thriftSlicePredicate.slice_range = sliceRange;
+ }
+ else
+ {
+ List<ByteBuffer> columnNames = new ArrayList<ByteBuffer>();
+ for (Term column : select.getColumnPredicates().getTerms())
+ columnNames.add(column.getByteBuffer());
+ thriftSlicePredicate.column_names = columnNames;
+ }
+
+ try
+ {
+ rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace,
+ select.getColumnFamily(),
+ null,
+ thriftSlicePredicate,
+ bounds,
+ select.getNumRecords()),
+ select.getConsistencyLevel());
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (org.apache.cassandra.thrift.UnavailableException e)
+ {
+ throw new UnavailableException();
+ }
+ catch (TimeoutException e)
+ {
+ throw new TimedOutException();
+ }
+
+ return rows;
}
public static CqlResult process(String queryString, ClientState clientState)
@@ -122,132 +198,37 @@ public class QueryProcessor
case SELECT:
SelectStatement select = (SelectStatement)statement.statement;
- QueryPath queryPath = new QueryPath(select.getColumnFamily());
- List<ReadCommand> commands = new ArrayList<ReadCommand>();
-
List<CqlRow> avroRows = new ArrayList<CqlRow>();
avroResult.type = CqlResultType.ROWS;
+ List<org.apache.cassandra.db.Row> rows = null;
- // It's a multiget...
if (!select.getKeyPredicates().isRange())
+ rows = multiSlice(keyspace, select);
+ else
+ rows = multiRangeSlice(keyspace, select);
+
+ // Create the result set
+ for (org.apache.cassandra.db.Row row : rows)
{
+ /// No results for this row
+ if (row.cf == null)
+ continue;
- for (Term keyName : select.getKeyPredicates().getTerms())
- {
- ByteBuffer key = keyName.getByteBuffer(); // FIXME: surely not good enough
- validateKey(key);
-
- // ...of a list of column names
- if (!select.getColumnPredicates().isRange())
- {
- Collection<ByteBuffer> columnNames = new ArrayList<ByteBuffer>();
- for (Term column : select.getColumnPredicates().getTerms())
- columnNames.add(column.getByteBuffer());
-
- commands.add(new SliceByNamesReadCommand(keyspace, key, queryPath, columnNames));
- }
- // ...a range (slice) of column names
- else
- {
- commands.add(new SliceFromReadCommand(keyspace,
- key,
- queryPath,
- select.getColumnPredicates().getStart().getByteBuffer(),
- select.getColumnPredicates().getFinish().getByteBuffer(),
- select.reversed(),
- select.getNumColumns()));
- }
-
- Map<DecoratedKey<?>, ColumnFamily> columnFamilies = readColumnFamily(commands,
- select.getConsistencyLevel());
- List<Column> avroColumns = new ArrayList<Column>();
-
- for (ReadCommand cmd : commands)
- {
- ColumnFamily cf = columnFamilies.get(StorageService.getPartitioner().decorateKey(cmd.key));
- // TODO: handle reversing order
- for (IColumn column : cf.getSortedColumns())
- {
- Column avroColumn = new Column();
- avroColumn.name = column.name();
- avroColumn.value = column.value();
- avroColumns.add(avroColumn);
- }
- }
-
- // Create a new row, add the columns to it, and then add it to the list of rows
- CqlRow avroRow = new CqlRow();
- avroRow.key = key;
- avroRow.columns = avroColumns;
- avroRows.add(avroRow);
- }
- }
- else // It is a range query (range of keys).
- {
- // FIXME: ranges can be open-ended, but a start must exist. Assert so here.
-
- List<org.apache.cassandra.db.Row> rows = null;
- IPartitioner<?> p = StorageService.getPartitioner();
- AbstractBounds bounds = new Bounds(p.getToken(select.getKeyPredicates().getStart().getByteBuffer()),
- p.getToken(select.getKeyPredicates().getFinish().getByteBuffer()));
+ List<Column> avroColumns = new ArrayList<Column>();
- // XXX: Our use of Thrift structs internally makes me Sad. :(
- SlicePredicate thriftSlicePredicate = new SlicePredicate();
- if (select.getColumnPredicates().isRange() || select.getColumnPredicates().getTerms().size() == 0)
- {
- SliceRange sliceRange = new SliceRange();
- sliceRange.start = select.getColumnPredicates().getStart().getByteBuffer();
- sliceRange.finish = select.getColumnPredicates().getFinish().getByteBuffer();
- sliceRange.reversed = false; // FIXME: hard-coded
- sliceRange.count = select.getNumColumns();
- thriftSlicePredicate.slice_range = sliceRange;
- }
- else
- {
- List<ByteBuffer> columnNames = new ArrayList<ByteBuffer>();
- for (Term column : select.getColumnPredicates().getTerms())
- columnNames.add(column.getByteBuffer());
- thriftSlicePredicate.column_names = columnNames;
- }
-
- try
- {
- rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace,
- select.getColumnFamily(),
- null,
- thriftSlicePredicate,
- bounds,
- select.getNumRecords()),
- select.getConsistencyLevel());
- }
- catch (IOException e)
+ for (IColumn column : row.cf.getSortedColumns())
{
- throw new RuntimeException(e);
- }
- catch (org.apache.cassandra.thrift.UnavailableException e)
- {
- throw new UnavailableException();
- }
- catch (TimeoutException e)
- {
- throw new TimedOutException();
+ Column avroColumn = new Column();
+ avroColumn.name = column.name();
+ avroColumn.value = column.value();
+ avroColumns.add(avroColumn);
}
- for (org.apache.cassandra.db.Row row : rows)
- {
- CqlRow avroRow = new CqlRow();
- avroRow.key = row.key.key;
- avroRow.columns = new ArrayList<Column>();
-
- for (IColumn column : row.cf.getSortedColumns())
- {
- Column avroColumn = new Column();
- avroColumn.name = column.name();
- avroColumn.value = column.value();
- avroRow.columns.add(avroColumn);
- }
- avroRows.add(avroRow);
- }
+ // Create a new row, add the columns to it, and then add it to the list of rows
+ CqlRow avroRow = new CqlRow();
+ avroRow.key = row.key.key;
+ avroRow.columns = avroColumns;
+ avroRows.add(avroRow);
}
avroResult.rows = avroRows;
Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/SelectExpression.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/SelectExpression.java?rev=1029365&r1=1029364&r2=1029365&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/SelectExpression.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/SelectExpression.java Sun Oct 31 14:37:27 2010
@@ -132,4 +132,9 @@ class Predicates
{
return isRange;
}
+
+ boolean isInitialized()
+ {
+ return initialized;
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/Term.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/Term.java?rev=1029365&r1=1029364&r2=1029365&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/Term.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/Term.java Sun Oct 31 14:37:27 2010
@@ -67,7 +67,7 @@ public class Term
}
/**
- * Get the typed value, serialized to a byte[].
+ * Get the typed value, serialized to a ByteBuffer.
*
* @return
*/