You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2014/02/14 13:57:54 UTC

git commit: Lift limitation that order by columns must be selected for IN queries

Updated Branches:
  refs/heads/trunk c36656c06 -> 0b42b0e76


Lift limitation that order by columns must be selected for IN queries

patch by slebresne; reviewed by thobbs for CASSANDRA-4911


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0b42b0e7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0b42b0e7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0b42b0e7

Branch: refs/heads/trunk
Commit: 0b42b0e76267aa4101b1634bd5d96c9b85997b56
Parents: c36656c
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Tue Dec 17 16:57:08 2013 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Fri Feb 14 13:57:10 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/cql3/ResultSet.java    | 46 +++++++---
 .../cql3/statements/SelectStatement.java        | 90 +++++++++-----------
 .../cassandra/cql3/statements/Selection.java    | 15 +++-
 4 files changed, 85 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b42b0e7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d08cb93..c306b43 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -32,6 +32,7 @@
  * Avoid repairing already repaired data (CASSANDRA-5351)
  * Reject counter updates with USING TTL/TIMESTAMP (CASSANDRA-6649)
  * Replace index_interval with min/max_index_interval (CASSANDRA-6379)
+ * Lift limitation that order by columns must be selected for IN queries (CASSANDRA-4911)
 
 
 2.0.6

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b42b0e7/src/java/org/apache/cassandra/cql3/ResultSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ResultSet.java b/src/java/org/apache/cassandra/cql3/ResultSet.java
index ece56a9..d49a3be 100644
--- a/src/java/org/apache/cassandra/cql3/ResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/ResultSet.java
@@ -60,14 +60,14 @@ public class ResultSet
 
     public void addRow(List<ByteBuffer> row)
     {
-        assert row.size() == metadata.columnCount;
+        assert row.size() == metadata.valueCount();
         rows.add(row);
     }
 
     public void addColumnValue(ByteBuffer value)
     {
-        if (rows.isEmpty() || lastRow().size() == metadata.columnCount)
-            rows.add(new ArrayList<ByteBuffer>(metadata.columnCount));
+        if (rows.isEmpty() || lastRow().size() == metadata.valueCount())
+            rows.add(new ArrayList<ByteBuffer>(metadata.valueCount()));
 
         lastRow().add(value);
     }
@@ -123,8 +123,9 @@ public class ResultSet
                 // The 2 following ones shouldn't be needed in CQL3
                 UTF8, UTF8);
 
-        for (ColumnSpecification spec : metadata.names)
+        for (int i = 0; i < metadata.columnCount; i++)
         {
+            ColumnSpecification spec = metadata.names.get(i);
             ByteBuffer colName = ByteBufferUtil.bytes(spec.name.toString());
             schema.name_types.put(colName, UTF8);
             AbstractType<?> normalizedType = spec.type instanceof ReversedType ? ((ReversedType)spec.type).baseType : spec.type;
@@ -135,8 +136,8 @@ public class ResultSet
         List<CqlRow> cqlRows = new ArrayList<CqlRow>(rows.size());
         for (List<ByteBuffer> row : rows)
         {
-            List<Column> thriftCols = new ArrayList<Column>(metadata.names.size());
-            for (int i = 0; i < metadata.names.size(); i++)
+            List<Column> thriftCols = new ArrayList<Column>(metadata.columnCount);
+            for (int i = 0; i < metadata.columnCount; i++)
             {
                 Column col = new Column(ByteBufferUtil.bytes(metadata.names.get(i).name.toString()));
                 col.setValue(row.get(i));
@@ -214,8 +215,10 @@ public class ResultSet
             dest.writeInt(rs.rows.size());
             for (List<ByteBuffer> row : rs.rows)
             {
-                for (ByteBuffer bb : row)
-                    CBUtil.writeValue(bb, dest);
+                // Note that we do only want to serialize only the first columnCount values, even if the row
+                // as more: see comment on Metadata.names field.
+                for (int i = 0; i < rs.metadata.columnCount; i++)
+                    CBUtil.writeValue(row.get(i), dest);
             }
         }
 
@@ -224,8 +227,8 @@ public class ResultSet
             int size = Metadata.codec.encodedSize(rs.metadata, version) + 4;
             for (List<ByteBuffer> row : rs.rows)
             {
-                for (ByteBuffer bb : row)
-                    size += CBUtil.sizeOfValue(bb);
+                for (int i = 0; i < rs.metadata.columnCount; i++)
+                    size += CBUtil.sizeOfValue(row.get(i));
             }
             return size;
         }
@@ -238,6 +241,10 @@ public class ResultSet
         public static final Metadata EMPTY = new Metadata(EnumSet.of(Flag.NO_METADATA), 0);
 
         public final EnumSet<Flag> flags;
+        // Please note that columnCount can actually be smaller than names, even if names is not null. This is
+        // used to include columns in the resultSet that we need to do post-query re-orderings
+        // (SelectStatement.orderResults) but that shouldn't be sent to the user as they haven't been requested
+        // (CASSANDRA-4911). So the serialization code will exclude any columns in name whose index is >= columnCount.
         public final List<ColumnSpecification> names;
         public final int columnCount;
         public PagingState pagingState;
@@ -263,6 +270,19 @@ public class ResultSet
             this.columnCount = columnCount;
         }
 
+        // The maximum number of values that the ResultSet can hold. This can be bigger than columnCount due to CASSANDRA-4911
+        public int valueCount()
+        {
+            return names == null ? columnCount : names.size();
+        }
+
+        public void addNonSerializedColumn(ColumnSpecification name)
+        {
+            // See comment above. Because columnCount doesn't account the newly added name, it
+            // won't be serialized.
+            names.add(name);
+        }
+
         private boolean allInSameCF()
         {
             if (names == null)
@@ -381,8 +401,9 @@ public class ResultSet
                         CBUtil.writeString(m.names.get(0).cfName, dest);
                     }
 
-                    for (ColumnSpecification name : m.names)
+                    for (int i = 0; i < m.columnCount; i++)
                     {
+                        ColumnSpecification name = m.names.get(i);
                         if (!globalTablesSpec)
                         {
                             CBUtil.writeString(name.ksName, dest);
@@ -412,8 +433,9 @@ public class ResultSet
                         size += CBUtil.sizeOfString(m.names.get(0).cfName);
                     }
 
-                    for (ColumnSpecification name : m.names)
+                    for (int i = 0; i < m.columnCount; i++)
                     {
+                        ColumnSpecification name = m.names.get(i);
                         if (!globalTablesSpec)
                         {
                             size += CBUtil.sizeOfString(name.ksName);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b42b0e7/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 60d13f4..e2ccda1 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -77,7 +77,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
     private boolean isKeyRange;
     private boolean keyIsInRelation;
     private boolean usesSecondaryIndexing;
-    private boolean lastClusteringIsIn;
+    private boolean needOrderOnLastClustering;
 
     private Map<ColumnIdentifier, Integer> orderingIndexes;
 
@@ -939,10 +939,11 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
 
         /*
          * We need to do post-query ordering in 2 cases:
-         *   1) if the last clustering key is restricted by a IN.
-         *   2) if the row key is restricted by a IN and there is some ORDER BY values
+         *   1) if the last clustering column is restricted by a IN and has no explicit ORDER BY on it.
+         *   2) if the partition key is restricted by a IN and there is some ORDER BY values
          */
-        if (!(lastClusteringIsIn || (keyIsInRelation && parameters.orderings.size() > 0)))
+        boolean needOrderOnPartitionKey = keyIsInRelation && !parameters.orderings.isEmpty();
+        if (!needOrderOnLastClustering && !needOrderOnPartitionKey)
             return;
 
         assert orderingIndexes != null;
@@ -950,16 +951,8 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         List<Integer> idToSort = new ArrayList<Integer>();
         List<Comparator<ByteBuffer>> sorters = new ArrayList<Comparator<ByteBuffer>>();
 
-        // If the restriction for the last clustering key is an IN, respect requested order
-        if (lastClusteringIsIn)
-        {
-            List<ColumnDefinition> cc = cfm.clusteringColumns();
-            idToSort.add(orderingIndexes.get(cc.get(cc.size() - 1).name));
-            Restriction last = columnRestrictions[columnRestrictions.length - 1];
-            sorters.add(makeComparatorFor(last.values(variables), isReversed));
-        }
-
-        // Then add the order by
+        // Note that we add the ORDER BY sorters first as they should prevail over ordering
+        // on the last clustering restriction.
         for (ColumnIdentifier identifier : parameters.orderings.keySet())
         {
             ColumnDefinition orderingColumn = cfm.getColumnDefinition(identifier);
@@ -967,6 +960,14 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
             sorters.add(orderingColumn.type);
         }
 
+        if (needOrderOnLastClustering)
+        {
+            List<ColumnDefinition> cc = cfm.clusteringColumns();
+            idToSort.add(orderingIndexes.get(cc.get(cc.size() - 1).name));
+            Restriction last = columnRestrictions[columnRestrictions.length - 1];
+            sorters.add(makeComparatorFor(last.values(variables), isReversed));
+        }
+
         Comparator<List<ByteBuffer>> comparator = idToSort.size() == 1
                                                 ? new SingleColumnComparator(idToSort.get(0), sorters.get(0))
                                                 : new CompositeComparator(sorters, idToSort);
@@ -1234,7 +1235,16 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                         throw new InvalidRequestException(String.format("PRIMARY KEY part %s cannot be restricted by IN relation", cdef.name));
                     else if (stmt.selectACollection())
                         throw new InvalidRequestException(String.format("Cannot restrict PRIMARY KEY part %s by IN relation as a collection is selected by the query", cdef.name));
-                    stmt.lastClusteringIsIn = true;
+                    // We will return rows in the order of the IN, unless that clustering column has a specific order set on.
+                    if (parameters.orderings.get(cdef.name) == null)
+                    {
+                        stmt.needOrderOnLastClustering = true;
+                        stmt.orderingIndexes = new HashMap<ColumnIdentifier, Integer>();
+                        int index = indexOf(cdef, stmt.selection);
+                        if (index < 0)
+                            index = stmt.selection.addColumnForOrdering(cdef);
+                        stmt.orderingIndexes.put(cdef.name, index);
+                    }
                 }
 
                 previous = cdef;
@@ -1267,12 +1277,12 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                 if (stmt.isKeyRange)
                     throw new InvalidRequestException("ORDER BY is only supported when the partition key is restricted by an EQ or an IN.");
 
-                // If we order an IN query, we'll have to do a manual sort post-query. Currently, this sorting requires that we
-                // have queried the column on which we sort (TODO: we should update it to add the column on which we sort to the one
-                // queried automatically, and then removing it from the resultSet afterwards if needed)
-                if (stmt.keyIsInRelation)
+                // If we order post-query (see orderResults), the sorted column needs to be in the ResultSet for sorting, even if we don't
+                // ultimately ship them to the client (CASSANDRA-4911).
+                if (stmt.keyIsInRelation || stmt.needOrderOnLastClustering)
                 {
-                    stmt.orderingIndexes = new HashMap<ColumnIdentifier, Integer>();
+                    if (stmt.orderingIndexes == null)
+                        stmt.orderingIndexes = new HashMap<ColumnIdentifier, Integer>();
                     for (ColumnIdentifier column : stmt.parameters.orderings.keySet())
                     {
                         final ColumnDefinition def = cfm.getColumnDefinition(column);
@@ -1284,27 +1294,10 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                                 throw new InvalidRequestException(String.format("Order by on unknown column %s", column));
                         }
 
-                        if (selectClause.isEmpty()) // wildcard
-                        {
-                            stmt.orderingIndexes.put(def.name, indexOf(def, cfm.allColumnsInSelectOrder()));
-                        }
-                        else
-                        {
-                            boolean hasColumn = false;
-                            for (int i = 0; i < selectClause.size(); i++)
-                            {
-                                RawSelector selector = selectClause.get(i);
-                                if (def.name.equals(selector.selectable))
-                                {
-                                    stmt.orderingIndexes.put(def.name, i);
-                                    hasColumn = true;
-                                    break;
-                                }
-                            }
-
-                            if (!hasColumn)
-                                throw new InvalidRequestException("ORDER BY could not be used on columns missing in select clause.");
-                        }
+                        int index = indexOf(def, stmt.selection);
+                        if (index < 0)
+                            index = stmt.selection.addColumnForOrdering(def);
+                        stmt.orderingIndexes.put(def.name, index);
                     }
                 }
 
@@ -1353,16 +1346,6 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                 stmt.isReversed = isReversed;
             }
 
-            if (stmt.lastClusteringIsIn)
-            {
-                // This means we'll have to do post-query reordering, so update the orderingIndexes
-                if (stmt.orderingIndexes == null)
-                    stmt.orderingIndexes = new HashMap<ColumnIdentifier, Integer>();
-
-                ColumnDefinition last = cfm.clusteringColumns().get(cfm.clusteringColumns().size() - 1);
-                stmt.orderingIndexes.put(last.name, indexOf(last, stmt.selection.getColumnsList().iterator()));
-            }
-
             // Make sure this queries is allowed (note: non key range non indexed cannot involve filtering underneath)
             if (!parameters.allowFiltering && (stmt.isKeyRange || stmt.usesSecondaryIndexing))
             {
@@ -1377,6 +1360,11 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
             return new ParsedStatement.Prepared(stmt, names);
         }
 
+        private int indexOf(ColumnDefinition def, Selection selection)
+        {
+            return indexOf(def, selection.getColumnsList().iterator());
+        }
+
         private int indexOf(final ColumnDefinition def, Iterator<ColumnDefinition> defs)
         {
             return Iterators.indexOf(defs, new Predicate<ColumnDefinition>()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b42b0e7/src/java/org/apache/cassandra/cql3/statements/Selection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/Selection.java b/src/java/org/apache/cassandra/cql3/statements/Selection.java
index 0d9c355..9c2eb0c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/Selection.java
+++ b/src/java/org/apache/cassandra/cql3/statements/Selection.java
@@ -43,21 +43,21 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 public abstract class Selection
 {
     private final List<ColumnDefinition> columnsList;
-    private final List<ColumnSpecification> metadata;
+    private final ResultSet.Metadata metadata;
     private final boolean collectTimestamps;
     private final boolean collectTTLs;
 
     protected Selection(List<ColumnDefinition> columnsList, List<ColumnSpecification> metadata, boolean collectTimestamps, boolean collectTTLs)
     {
         this.columnsList = columnsList;
-        this.metadata = metadata;
+        this.metadata = new ResultSet.Metadata(metadata);
         this.collectTimestamps = collectTimestamps;
         this.collectTTLs = collectTTLs;
     }
 
     public ResultSet.Metadata getResultMetadata()
     {
-        return new ResultSet.Metadata(metadata);
+        return metadata;
     }
 
     public static Selection wildcard(CFMetaData cfm)
@@ -72,6 +72,13 @@ public abstract class Selection
         return new SimpleSelection(columnsList);
     }
 
+    public int addColumnForOrdering(ColumnDefinition c)
+    {
+        columnsList.add(c);
+        metadata.addNonSerializedColumn(c);
+        return columnsList.size() - 1;
+    }
+
     private static boolean isUsingFunction(List<RawSelector> rawSelectors)
     {
         for (RawSelector rawSelector : rawSelectors)
@@ -273,7 +280,7 @@ public abstract class Selection
 
         private ResultSetBuilder(long now)
         {
-            this.resultSet = new ResultSet(metadata);
+            this.resultSet = new ResultSet(getResultMetadata(), new ArrayList<List<ByteBuffer>>());
             this.timestamps = collectTimestamps ? new long[columnsList.size()] : null;
             this.ttls = collectTTLs ? new int[columnsList.size()] : null;
             this.now = now;