You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ee...@apache.org on 2010/10/31 15:37:27 UTC

svn commit: r1029365 - in /cassandra/trunk/src/java/org/apache/cassandra/cql: QueryProcessor.java SelectExpression.java Term.java

Author: eevans
Date: Sun Oct 31 14:37:27 2010
New Revision: 1029365

URL: http://svn.apache.org/viewvc?rev=1029365&view=rev
Log:
refactor QueryProcessor

- encapsulate the different reads better
- eliminate duplicate code

Patch by eevans

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java
    cassandra/trunk/src/java/org/apache/cassandra/cql/SelectExpression.java
    cassandra/trunk/src/java/org/apache/cassandra/cql/Term.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java?rev=1029365&r1=1029364&r2=1029365&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java Sun Oct 31 14:37:27 2010
@@ -25,9 +25,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.TimeoutException;
 
 import org.antlr.runtime.ANTLRStringStream;
@@ -42,8 +40,6 @@ import org.apache.cassandra.avro.CqlRow;
 import org.apache.cassandra.avro.InvalidRequestException;
 import org.apache.cassandra.avro.TimedOutException;
 import org.apache.cassandra.avro.UnavailableException;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.RangeSliceCommand;
 import org.apache.cassandra.db.ReadCommand;
@@ -57,7 +53,6 @@ import org.apache.cassandra.dht.IPartiti
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.thrift.ConsistencyLevel;
 import org.apache.cassandra.thrift.SlicePredicate;
 import org.apache.cassandra.thrift.SliceRange;
 import org.slf4j.Logger;
@@ -69,15 +64,43 @@ public class QueryProcessor
 {
     private static final Logger logger = LoggerFactory.getLogger(QueryProcessor.class);
     
-    public static Map<DecoratedKey<?>, ColumnFamily> readColumnFamily(List<ReadCommand> commands, ConsistencyLevel cLevel)
-    throws UnavailableException, InvalidRequestException, TimedOutException
+    private static List<org.apache.cassandra.db.Row> multiSlice(String keyspace, SelectStatement select)
+    throws InvalidRequestException, TimedOutException, UnavailableException
     {
-        Map<DecoratedKey<?>, ColumnFamily> columnFamilyKeyMap = new HashMap<DecoratedKey<?>, ColumnFamily>();
-        List<org.apache.cassandra.db.Row> rows;
+        List<org.apache.cassandra.db.Row> rows = null;
+        QueryPath queryPath = new QueryPath(select.getColumnFamily());
+        List<ReadCommand> commands = new ArrayList<ReadCommand>();
         
+        for (Term keyName : select.getKeyPredicates().getTerms())
+        {
+            ByteBuffer key = keyName.getByteBuffer();
+            validateKey(key);
+            
+            // ...of a list of column names
+            if ((!select.getColumnPredicates().isRange()) && select.getColumnPredicates().isInitialized())
+            {
+                Collection<ByteBuffer> columnNames = new ArrayList<ByteBuffer>();
+                for (Term column : select.getColumnPredicates().getTerms())
+                    columnNames.add(column.getByteBuffer());
+                
+                commands.add(new SliceByNamesReadCommand(keyspace, key, queryPath, columnNames));
+            }
+            // ...a range (slice) of column names
+            else
+            {
+                commands.add(new SliceFromReadCommand(keyspace,
+                                                      key,
+                                                      queryPath,
+                                                      select.getColumnPredicates().getStart().getByteBuffer(),
+                                                      select.getColumnPredicates().getFinish().getByteBuffer(),
+                                                      select.reversed(),
+                                                      select.getNumColumns()));
+            }
+        }
+
         try
         {
-            rows = StorageProxy.readProtocol(commands, cLevel);
+            rows = StorageProxy.readProtocol(commands, select.getConsistencyLevel());
         }
         catch (TimeoutException e)
         {
@@ -100,10 +123,63 @@ public class QueryProcessor
             throw error;
         }
         
-        for (org.apache.cassandra.db.Row row: rows)
-            columnFamilyKeyMap.put(row.key, row.cf);
+        return rows;
+    }
+    
+    private static List<org.apache.cassandra.db.Row> multiRangeSlice(String keyspace, SelectStatement select)
+    throws TimedOutException, UnavailableException
+    {
+        List<org.apache.cassandra.db.Row> rows = null;
         
-        return columnFamilyKeyMap;
+        // FIXME: ranges can be open-ended, but a start must exist.  Assert so here.
+        
+        IPartitioner<?> p = StorageService.getPartitioner();
+        AbstractBounds bounds = new Bounds(p.getToken(select.getKeyPredicates().getStart().getByteBuffer()),
+                                           p.getToken(select.getKeyPredicates().getFinish().getByteBuffer()));
+        
+        // XXX: Our use of Thrift structs internally makes me Sad. :(
+        SlicePredicate thriftSlicePredicate = new SlicePredicate();
+        if (select.getColumnPredicates().isRange() || select.getColumnPredicates().getTerms().size() == 0)
+        {
+            SliceRange sliceRange = new SliceRange();
+            sliceRange.start = select.getColumnPredicates().getStart().getByteBuffer();
+            sliceRange.finish = select.getColumnPredicates().getFinish().getByteBuffer();
+            sliceRange.reversed = false;    // FIXME: hard-coded
+            sliceRange.count = select.getNumColumns();
+            thriftSlicePredicate.slice_range = sliceRange;
+        }
+        else
+        {
+            List<ByteBuffer> columnNames = new ArrayList<ByteBuffer>();
+            for (Term column : select.getColumnPredicates().getTerms())
+                columnNames.add(column.getByteBuffer());
+            thriftSlicePredicate.column_names = columnNames;
+        }
+
+        try
+        {
+            rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace,
+                                                                    select.getColumnFamily(),
+                                                                    null,
+                                                                    thriftSlicePredicate,
+                                                                    bounds,
+                                                                    select.getNumRecords()),
+                                              select.getConsistencyLevel());
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+        catch (org.apache.cassandra.thrift.UnavailableException e)
+        {
+            throw new UnavailableException();
+        }
+        catch (TimeoutException e)
+        {
+            throw new TimedOutException();
+        }
+        
+        return rows;
     }
 
     public static CqlResult process(String queryString, ClientState clientState)
@@ -122,132 +198,37 @@ public class QueryProcessor
             case SELECT:
                 SelectStatement select = (SelectStatement)statement.statement;
                 
-                QueryPath queryPath = new QueryPath(select.getColumnFamily());
-                List<ReadCommand> commands = new ArrayList<ReadCommand>();
-                
                 List<CqlRow> avroRows = new ArrayList<CqlRow>();
                 avroResult.type = CqlResultType.ROWS;
+                List<org.apache.cassandra.db.Row> rows = null;
                 
-                // It's a multiget...
                 if (!select.getKeyPredicates().isRange())
+                    rows = multiSlice(keyspace, select);
+                else
+                    rows = multiRangeSlice(keyspace, select);
+                
+                // Create the result set
+                for (org.apache.cassandra.db.Row row : rows)
                 {
+                    /// No results for this row
+                    if (row.cf == null)
+                        continue;
                     
-                    for (Term keyName : select.getKeyPredicates().getTerms())
-                    {
-                        ByteBuffer key = keyName.getByteBuffer();  // FIXME: surely not good enough
-                        validateKey(key);
-                        
-                        // ...of a list of column names
-                        if (!select.getColumnPredicates().isRange())
-                        {
-                            Collection<ByteBuffer> columnNames = new ArrayList<ByteBuffer>();
-                            for (Term column : select.getColumnPredicates().getTerms())
-                                columnNames.add(column.getByteBuffer());
-                            
-                            commands.add(new SliceByNamesReadCommand(keyspace, key, queryPath, columnNames));
-                        }
-                        // ...a range (slice) of column names
-                        else
-                        {
-                            commands.add(new SliceFromReadCommand(keyspace,
-                                                                  key,
-                                                                  queryPath,
-                                                                  select.getColumnPredicates().getStart().getByteBuffer(),
-                                                                  select.getColumnPredicates().getFinish().getByteBuffer(),
-                                                                  select.reversed(),
-                                                                  select.getNumColumns()));
-                        }
-                        
-                        Map<DecoratedKey<?>, ColumnFamily> columnFamilies = readColumnFamily(commands,
-                                                                                             select.getConsistencyLevel());
-                        List<Column> avroColumns = new ArrayList<Column>();
-                        
-                        for (ReadCommand cmd : commands)
-                        {
-                            ColumnFamily cf = columnFamilies.get(StorageService.getPartitioner().decorateKey(cmd.key));
-                            // TODO: handle reversing order
-                            for (IColumn column : cf.getSortedColumns())
-                            {
-                                Column avroColumn = new Column();
-                                avroColumn.name = column.name();
-                                avroColumn.value = column.value();
-                                avroColumns.add(avroColumn);
-                            }
-                        }
-                        
-                        // Create a new row, add the columns to it, and then add it to the list of rows
-                        CqlRow avroRow = new CqlRow();
-                        avroRow.key = key;
-                        avroRow.columns = avroColumns;
-                        avroRows.add(avroRow);
-                    }
-                }
-                else    // It is a range query (range of keys).
-                {
-                    // FIXME: ranges can be open-ended, but a start must exist.  Assert so here.
-                    
-                    List<org.apache.cassandra.db.Row> rows = null;
-                    IPartitioner<?> p = StorageService.getPartitioner();
-                    AbstractBounds bounds = new Bounds(p.getToken(select.getKeyPredicates().getStart().getByteBuffer()),
-                                                       p.getToken(select.getKeyPredicates().getFinish().getByteBuffer()));
+                    List<Column> avroColumns = new ArrayList<Column>();
                     
-                    // XXX: Our use of Thrift structs internally makes me Sad. :(
-                    SlicePredicate thriftSlicePredicate = new SlicePredicate();
-                    if (select.getColumnPredicates().isRange() || select.getColumnPredicates().getTerms().size() == 0)
-                    {
-                        SliceRange sliceRange = new SliceRange();
-                        sliceRange.start = select.getColumnPredicates().getStart().getByteBuffer();
-                        sliceRange.finish = select.getColumnPredicates().getFinish().getByteBuffer();
-                        sliceRange.reversed = false;    // FIXME: hard-coded
-                        sliceRange.count = select.getNumColumns();
-                        thriftSlicePredicate.slice_range = sliceRange;
-                    }
-                    else
-                    {
-                        List<ByteBuffer> columnNames = new ArrayList<ByteBuffer>();
-                        for (Term column : select.getColumnPredicates().getTerms())
-                            columnNames.add(column.getByteBuffer());
-                        thriftSlicePredicate.column_names = columnNames;
-                    }
-
-                    try
-                    {
-                        rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace,
-                                                                                select.getColumnFamily(),
-                                                                                null,
-                                                                                thriftSlicePredicate,
-                                                                                bounds,
-                                                                                select.getNumRecords()),
-                                                          select.getConsistencyLevel());
-                    }
-                    catch (IOException e)
+                    for (IColumn column : row.cf.getSortedColumns())
                     {
-                        throw new RuntimeException(e);
-                    }
-                    catch (org.apache.cassandra.thrift.UnavailableException e)
-                    {
-                        throw new UnavailableException();
-                    }
-                    catch (TimeoutException e)
-                    {
-                        throw new TimedOutException();
+                        Column avroColumn = new Column();
+                        avroColumn.name = column.name();
+                        avroColumn.value = column.value();
+                        avroColumns.add(avroColumn);
                     }
                     
-                    for (org.apache.cassandra.db.Row row : rows)
-                    {
-                        CqlRow avroRow = new CqlRow();
-                        avroRow.key = row.key.key;
-                        avroRow.columns = new ArrayList<Column>();
-                        
-                        for (IColumn column : row.cf.getSortedColumns())
-                        {
-                            Column avroColumn = new Column();
-                            avroColumn.name = column.name();
-                            avroColumn.value = column.value();
-                            avroRow.columns.add(avroColumn);
-                        }
-                        avroRows.add(avroRow);
-                    }
+                    // Create a new row, add the columns to it, and then add it to the list of rows
+                    CqlRow avroRow = new CqlRow();
+                    avroRow.key = row.key.key;
+                    avroRow.columns = avroColumns;
+                    avroRows.add(avroRow);
                 }
                 
                 avroResult.rows = avroRows;

Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/SelectExpression.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/SelectExpression.java?rev=1029365&r1=1029364&r2=1029365&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/SelectExpression.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/SelectExpression.java Sun Oct 31 14:37:27 2010
@@ -132,4 +132,9 @@ class Predicates
     {
         return isRange;
     }
+
+    boolean isInitialized()
+    {
+        return initialized;
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/Term.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/Term.java?rev=1029365&r1=1029364&r2=1029365&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/Term.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/Term.java Sun Oct 31 14:37:27 2010
@@ -67,7 +67,7 @@ public class Term
     }
     
     /**
-     * Get the typed value, serialized to a byte[].
+     * Get the typed value, serialized to a ByteBuffer.
      * 
      * @return
      */