You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ee...@apache.org on 2010/11/12 20:32:54 UTC

svn commit: r1034536 - in /cassandra/trunk: src/java/org/apache/cassandra/cql/QueryProcessor.java src/java/org/apache/cassandra/cql/Relation.java src/java/org/apache/cassandra/cql/Term.java test/system/test_cql.py

Author: eevans
Date: Fri Nov 12 19:32:54 2010
New Revision: 1034536

URL: http://svn.apache.org/viewvc?rev=1034536&view=rev
Log:
add support for index scans

Patch by eevans

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java
    cassandra/trunk/src/java/org/apache/cassandra/cql/Relation.java
    cassandra/trunk/src/java/org/apache/cassandra/cql/Term.java
    cassandra/trunk/test/system/test_cql.py

Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java?rev=1034536&r1=1034535&r2=1034536&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java Fri Nov 12 19:32:54 2010
@@ -27,6 +27,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.TimeoutException;
 
 import org.antlr.runtime.ANTLRStringStream;
@@ -47,6 +48,7 @@ import org.apache.cassandra.db.ReadComma
 import org.apache.cassandra.db.RowMutation;
 import org.apache.cassandra.db.SliceByNamesReadCommand;
 import org.apache.cassandra.db.SliceFromReadCommand;
+import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Bounds;
@@ -54,6 +56,9 @@ import org.apache.cassandra.dht.IPartiti
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.thrift.IndexClause;
+import org.apache.cassandra.thrift.IndexExpression;
+import org.apache.cassandra.thrift.IndexOperator;
 import org.apache.cassandra.thrift.SlicePredicate;
 import org.apache.cassandra.thrift.SliceRange;
 import org.slf4j.Logger;
@@ -137,25 +142,8 @@ public class QueryProcessor
         AbstractBounds bounds = new Bounds(p.getToken(select.getKeyStart().getByteBuffer()),
                                            p.getToken(select.getKeyFinish().getByteBuffer()));
         
-        
         // XXX: Our use of Thrift structs internally makes me Sad. :(
-        SlicePredicate thriftSlicePredicate = new SlicePredicate();
-        if (select.isColumnRange() || select.getColumnNames().size() == 0)
-        {
-            SliceRange sliceRange = new SliceRange();
-            sliceRange.start = select.getColumnStart().getByteBuffer();
-            sliceRange.finish = select.getColumnFinish().getByteBuffer();
-            sliceRange.reversed = false;    // FIXME: hard-coded
-            sliceRange.count = select.getColumnsLimit();
-            thriftSlicePredicate.slice_range = sliceRange;
-        }
-        else
-        {
-            List<ByteBuffer> columnNames = new ArrayList<ByteBuffer>();
-            for (Term column : select.getColumnNames())
-                columnNames.add(column.getByteBuffer());
-            thriftSlicePredicate.column_names = columnNames;
-        }
+        SlicePredicate thriftSlicePredicate = slicePredicateFromSelect(select);
 
         try
         {
@@ -182,6 +170,99 @@ public class QueryProcessor
         
         return rows;
     }
+    
+    private static List<org.apache.cassandra.db.Row> getIndexedSlices(String keyspace, SelectStatement select)
+    throws TimedOutException
+    {
+        // XXX: Our use of Thrift structs internally (still) makes me Sad. :~(
+        SlicePredicate thriftSlicePredicate = slicePredicateFromSelect(select);
+        
+        List<IndexExpression> expressions = new ArrayList<IndexExpression>();
+        for (Relation columnRelation : select.getColumnRelations())
+        {
+            expressions.add(new IndexExpression(columnRelation.getEntity().getByteBuffer(),
+                                                IndexOperator.valueOf(columnRelation.operator().toString()),
+                                                columnRelation.getValue().getByteBuffer()));
+        }
+        
+        ByteBuffer startKey = (!select.isKeyRange()) ? (new Term()).getByteBuffer() : select.getKeyStart().getByteBuffer();
+        IndexClause thriftIndexClause = new IndexClause(expressions, startKey, select.getNumRecords());
+        
+        List<org.apache.cassandra.db.Row> rows;
+        try
+        {
+            rows = StorageProxy.scan(keyspace,
+                                     select.getColumnFamily(),
+                                     thriftIndexClause,
+                                     thriftSlicePredicate,
+                                     select.getConsistencyLevel());
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+        catch (TimeoutException e)
+        {
+            throw new TimedOutException();
+        }
+        
+        return rows;
+    }
+    
+    private static SlicePredicate slicePredicateFromSelect(SelectStatement select)
+    {
+        SlicePredicate thriftSlicePredicate = new SlicePredicate();
+        
+        if (select.isColumnRange() || select.getColumnNames().size() == 0)
+        {
+            SliceRange sliceRange = new SliceRange();
+            sliceRange.start = select.getColumnStart().getByteBuffer();
+            sliceRange.finish = select.getColumnFinish().getByteBuffer();
+            sliceRange.reversed = select.isColumnsReversed();
+            sliceRange.count = select.getColumnsLimit();
+            thriftSlicePredicate.slice_range = sliceRange;
+        }
+        else
+        {
+            List<ByteBuffer> columnNames = new ArrayList<ByteBuffer>();
+            for (Term column : select.getColumnNames())
+                columnNames.add(column.getByteBuffer());
+            thriftSlicePredicate.column_names = columnNames;
+        }
+        
+        return thriftSlicePredicate;
+    }
+    
+    /* Test for SELECT-specific taboos */
+    private static void validateSelect(String keyspace, SelectStatement select) throws InvalidRequestException
+    {
+        // Finish key w/o start key (KEY < foo)
+        if (!select.isKeyRange() && (select.getKeyFinish() != null))
+            throw newInvalidRequestException("Key range clauses must include a start key (i.e. KEY > term)");
+        
+        // Key range and by-key(s) combined (KEY > foo AND KEY = bar)
+        if (select.isKeyRange() && select.getKeys().size() > 0)
+            throw newInvalidRequestException("You cannot combine key range and by-key clauses in a SELECT");
+        
+        // Start and finish keys, *and* column relations (KEY > foo AND KEY < bar and name1 = value1).
+        if (select.isKeyRange() && (select.getKeyFinish() != null) && (select.getColumnRelations().size() > 0))
+            throw newInvalidRequestException("You cannot combine key range and by-column clauses in a SELECT");
+        
+        // Multiget scenario (KEY = foo AND KEY = bar ...)
+        if (select.getKeys().size() > 1)
+            throw newInvalidRequestException("SELECTs can contain only by by-key clause");
+        
+        if (select.getColumnRelations().size() > 0)
+        {
+            Set<ByteBuffer> indexed = Table.open(keyspace).getColumnFamilyStore(select.getColumnFamily()).getIndexedColumns();
+            for (Relation relation : select.getColumnRelations())
+            {
+                if ((relation.operator().equals(RelationType.EQ)) && indexed.contains(relation.getEntity().getByteBuffer()))
+                    return;
+            }
+            throw newInvalidRequestException("No indexed columns present in by-columns clause with \"equals\" operator");
+        }
+    }
 
     public static CqlResult process(String queryString, ClientState clientState)
     throws RecognitionException, UnavailableException, InvalidRequestException, TimedOutException
@@ -199,27 +280,30 @@ public class QueryProcessor
         {
             case SELECT:
                 SelectStatement select = (SelectStatement)statement.statement;
+                validateColumnFamily(keyspace, select.getColumnFamily());
+                validateSelect(keyspace, select);
                 
                 List<CqlRow> avroRows = new ArrayList<CqlRow>();
                 avroResult.type = CqlResultType.ROWS;
                 List<org.apache.cassandra.db.Row> rows = null;
                 
+                // By-key
                 if (!select.isKeyRange() && (select.getKeys().size() > 0))
                 {
-                    // Multiple keys (aka "multiget") is not allowed( any longer).
-                    if (select.getKeys().size() > 1)
-                        throw newInvalidRequestException("SELECTs can contain only one by-key clause (i.e. KEY = TERM)");
-                    
                     rows = getSlice(keyspace, select);
                 }
                 else
                 {
-                    // Combining key ranges and column index queries is not currently allowed
-                    if (select.getColumnRelations().size() > 0)
-                        throw newInvalidRequestException("You cannot combine key ranges and by-column clauses " +
-                                "(i.e. \"name\" = \"value\") in a SELECT statement");
-                    
-                    rows = multiRangeSlice(keyspace, select);
+                    // Range query
+                    if ((select.getKeyFinish() != null) || (select.getColumnRelations().size() == 0))
+                    {
+                        rows = multiRangeSlice(keyspace, select);
+                    }
+                    // Index scan
+                    else
+                    {
+                        rows = getIndexedSlices(keyspace, select);
+                    }
                 }
                 
                 // Create the result set

Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/Relation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/Relation.java?rev=1034536&r1=1034535&r2=1034536&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/Relation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/Relation.java Fri Nov 12 19:32:54 2010
@@ -27,8 +27,8 @@ package org.apache.cassandra.cql;
  */
 public class Relation
 {
-    public EntityType entityType = EntityType.COLUMN;
-    public Term entity;
+    private EntityType entityType = EntityType.COLUMN;
+    private Term entity;
     private RelationType relationType;
     private Term value;
     
@@ -64,10 +64,20 @@ public class Relation
         return relationType;
     }
     
+    public Term getEntity()
+    {
+        return entity;
+    }
+    
     public Term getValue()
     {
         return value;
     }
+    
+    public String toString()
+    {
+        return String.format("Relation(%s, %s,nnn %s)", entity, relationType, value);
+    }
 }
 
 enum EntityType

Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/Term.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/Term.java?rev=1034536&r1=1034535&r2=1034536&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/Term.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/Term.java Fri Nov 12 19:32:54 2010
@@ -90,6 +90,11 @@ public class Term
         return type;
     }
     
+    public String toString()
+    {
+        return String.format("Term(%s, type=%s)", getText(), type);
+    }
+    
 }
 
 enum TermType

Modified: cassandra/trunk/test/system/test_cql.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_cql.py?rev=1034536&r1=1034535&r2=1034536&view=diff
==============================================================================
--- cassandra/trunk/test/system/test_cql.py (original)
+++ cassandra/trunk/test/system/test_cql.py Fri Nov 12 19:32:54 2010
@@ -30,6 +30,16 @@ def load_sample(dbconn):
         ROW("af", COL(1L, "1"), COL(2L, "2"), COL(3L, "3"), COL(4L, "4")) AND
         ROW("ag", COL(5L, "5"), COL(6L, "6"), COL(7L, "8"), COL(9L, "9"));
     """)
+    dbconn.execute("""
+        UPDATE
+            Indexed1
+        WITH
+            ROW("asmith",   COL("birthdate", 100L), COL("unindexed", 250L)) AND
+            ROW("dozer",    COL("birthdate", 100L), COL("unindexed", 200L)) AND
+            ROW("morpheus", COL("birthdate", 175L), COL("unindexed", 200L)) AND
+            ROW("neo",      COL("birthdate", 150L), COL("unindexed", 250L)) AND
+            ROW("trinity",  COL("birthdate", 125L), COL("unindexed", 200L));
+    """)
 
 def init(keyspace="Keyspace1"):
     dbconn = Connection(keyspace, 'localhost', 9170)
@@ -104,3 +114,30 @@ class TestCql(AvroTester):
         query = 'SELECT "col" FROM Standard1 WHERE KEY = "ka" AND KEY = "kb";'
         assert_raises(CQLException, conn.execute, query)
 
+    def test_index_scan_equality(self):
+        "indexed scan where column equals value"
+        conn = init()
+        r = conn.execute('SELECT "birthdate" FROM Indexed1 WHERE "birthdate" = 100L')
+        assert len(r) == 2
+        assert r[0]['key'] == "asmith"
+        assert r[1]['key'] == "dozer"
+        assert len(r[0]['columns']) == 1
+        assert len(r[1]['columns']) == 1
+
+    def test_index_scan_greater_than(self):
+        "indexed scan where a column is greater than a value"
+        conn = init()
+        r = conn.execute("""
+            SELECT "birthdate" FROM Indexed1 WHERE "birthdate" = 100L AND "unindexed" > 200L
+        """)
+        assert len(r) == 1
+        assert r[0]['key'] == "asmith"
+
+    def test_index_scan_with_start_key(self):
+        "indexed scan with a starting key"
+        conn = init()
+        r = conn.execute("""
+            SELECT "birthdate" FROM Indexed1 WHERE "birthdate" = 100L AND KEY > "asmithZ"
+        """)
+        assert len(r) == 1
+        assert r[0]['key'] == "dozer"