You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:47:49 UTC

[25/51] [partial] cassandra git commit: Storage engine refactor, a.k.a CASSANDRA-8099

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
index 0243b0d..add4445 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
@@ -18,15 +18,13 @@
 package org.apache.cassandra.db.index.composites;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.*;
+import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.index.SecondaryIndex;
-import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 
 /**
@@ -48,67 +46,90 @@ import org.apache.cassandra.utils.concurrent.OpOrder;
  */
 public class CompositesIndexOnClusteringKey extends CompositesIndex
 {
-    public static CellNameType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef)
+    public static void addClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata, ColumnDefinition columnDef)
     {
-        // Index cell names are rk ck_0 ... ck_{i-1} ck_{i+1} ck_n, so n
-        // components total (where n is the number of clustering keys)
-        int ckCount = baseMetadata.clusteringColumns().size();
-        List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(ckCount);
-        types.add(SecondaryIndex.keyComparator);
+        indexMetadata.addClusteringColumn("partition_key", SecondaryIndex.keyComparator);
+
+        List<ColumnDefinition> cks = baseMetadata.clusteringColumns();
         for (int i = 0; i < columnDef.position(); i++)
-            types.add(baseMetadata.clusteringColumns().get(i).type);
-        for (int i = columnDef.position() + 1; i < ckCount; i++)
-            types.add(baseMetadata.clusteringColumns().get(i).type);
-        return new CompoundDenseCellNameType(types);
+        {
+            ColumnDefinition def = cks.get(i);
+            indexMetadata.addClusteringColumn(def.name, def.type);
+        }
+        for (int i = columnDef.position() + 1; i < cks.size(); i++)
+        {
+            ColumnDefinition def = cks.get(i);
+            indexMetadata.addClusteringColumn(def.name, def.type);
+        }
     }
 
-    protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Cell cell)
+    protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath path)
     {
-        return cell.name().get(columnDef.position());
+        return clustering.get(columnDef.position());
     }
 
-    protected Composite makeIndexColumnPrefix(ByteBuffer rowKey, Composite columnName)
+    protected CBuilder buildIndexClusteringPrefix(ByteBuffer rowKey, ClusteringPrefix prefix, CellPath path)
     {
-        int count = Math.min(baseCfs.metadata.clusteringColumns().size(), columnName.size());
-        CBuilder builder = getIndexComparator().prefixBuilder();
+        CBuilder builder = CBuilder.create(getIndexComparator());
         builder.add(rowKey);
-        for (int i = 0; i < Math.min(columnDef.position(), count); i++)
-            builder.add(columnName.get(i));
-        for (int i = columnDef.position() + 1; i < count; i++)
-            builder.add(columnName.get(i));
-        return builder.build();
+        for (int i = 0; i < Math.min(columnDef.position(), prefix.size()); i++)
+            builder.add(prefix.get(i));
+        for (int i = columnDef.position() + 1; i < prefix.size(); i++)
+            builder.add(prefix.get(i));
+        return builder;
     }
 
-    public IndexedEntry decodeEntry(DecoratedKey indexedValue, Cell indexEntry)
+    public IndexedEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry)
     {
         int ckCount = baseCfs.metadata.clusteringColumns().size();
 
-        CBuilder builder = baseCfs.getComparator().builder();
+        Clustering clustering = indexEntry.clustering();
+        CBuilder builder = CBuilder.create(baseCfs.getComparator());
         for (int i = 0; i < columnDef.position(); i++)
-            builder.add(indexEntry.name().get(i + 1));
+            builder.add(clustering.get(i + 1));
 
         builder.add(indexedValue.getKey());
 
         for (int i = columnDef.position() + 1; i < ckCount; i++)
-            builder.add(indexEntry.name().get(i));
+            builder.add(clustering.get(i));
 
-        return new IndexedEntry(indexedValue, indexEntry.name(), indexEntry.timestamp(), indexEntry.name().get(0), builder.build());
+        return new IndexedEntry(indexedValue, clustering, indexEntry.primaryKeyLivenessInfo().timestamp(), clustering.get(0), builder.build());
     }
 
     @Override
-    public boolean indexes(CellName name)
+    protected boolean indexPrimaryKeyColumn()
     {
-        // For now, assume this is only used in CQL3 when we know name has enough component.
         return true;
     }
 
-    public boolean isStale(IndexedEntry entry, ColumnFamily data, long now)
+    @Override
+    public boolean indexes(ColumnDefinition c)
+    {
+        // Actual indexing for this index type is done through maybeIndex
+        return false;
+    }
+
+    public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec)
+    {
+        return !data.hasLiveData(nowInSec);
+    }
+
+    @Override
+    public void maybeIndex(ByteBuffer partitionKey, Clustering clustering, long timestamp, int ttl, OpOrder.Group opGroup, int nowInSec)
+    {
+        if (clustering != Clustering.STATIC_CLUSTERING && clustering.get(columnDef.position()) != null)
+            insert(partitionKey, clustering, null, SimpleLivenessInfo.forUpdate(timestamp, ttl, nowInSec, indexCfs.metadata), opGroup);
+    }
+
+    @Override
+    public void maybeDelete(ByteBuffer partitionKey, Clustering clustering, DeletionTime deletion, OpOrder.Group opGroup)
     {
-        return data.hasOnlyTombstones(now);
+        if (clustering.get(columnDef.position()) != null && !deletion.isLive())
+            delete(partitionKey, clustering, null, null, deletion, opGroup);
     }
 
     @Override
-    public void delete(ByteBuffer rowKey, Cell cell, OpOrder.Group opGroup)
+    public void delete(ByteBuffer rowKey, Clustering clustering, Cell cell, OpOrder.Group opGroup, int nowInSec)
     {
         // We only know that one column of the CQL row has been updated/deleted, but we don't know if the
         // full row has been deleted so we should not do anything. If it ends up that the whole row has

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java
index 1e40710..50e81c4 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer;
 
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.marshal.*;
 
 /**
@@ -38,22 +38,21 @@ public class CompositesIndexOnCollectionKey extends CompositesIndexIncludingColl
         return ((CollectionType)columnDef.type).nameComparator();
     }
 
-    protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Cell cell)
+    protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath path)
     {
-        return cell.name().get(columnDef.position() + 1);
+        return path.get(0);
     }
 
     @Override
     public boolean supportsOperator(Operator operator)
     {
         return operator == Operator.CONTAINS_KEY ||
-                operator == Operator.CONTAINS && columnDef.type instanceof SetType;
+               operator == Operator.CONTAINS && columnDef.type instanceof SetType;
     }
 
-    public boolean isStale(IndexedEntry entry, ColumnFamily data, long now)
+    public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec)
     {
-        CellName name = data.getComparator().create(entry.indexedEntryPrefix, columnDef, entry.indexValue.getKey());
-        Cell cell = data.getColumn(name);
-        return cell == null || !cell.isLive(now);
+        Cell cell = data.getCell(columnDef, CellPath.create(indexValue));
+        return cell == null || !cell.isLive(nowInSec);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKeyAndValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKeyAndValue.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKeyAndValue.java
index 0b7f579..766f803 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKeyAndValue.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKeyAndValue.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.db.index.composites;
 import java.nio.ByteBuffer;
 
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.marshal.*;
 
 /**
@@ -38,50 +38,22 @@ public class CompositesIndexOnCollectionKeyAndValue extends CompositesIndexInclu
         return CompositeType.getInstance(colType.nameComparator(), colType.valueComparator());
     }
 
-    @Override
-    protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Cell cell)
-    {
-        final ByteBuffer key = cell.name().get(columnDef.position() + 1);
-        final ByteBuffer value = cell.value();
-        return CompositeType.build(key, value);
-    }
-
-    @Override
-    public boolean isStale(IndexedEntry entry, ColumnFamily data, long now)
+    protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath path)
     {
-        Cell cell = extractTargetCell(entry, data);
-        if (cellIsDead(cell, now))
-            return true;
-        ByteBuffer indexCollectionValue = extractCollectionValue(entry);
-        ByteBuffer targetCollectionValue = cell.value();
-        AbstractType<?> valueComparator = ((CollectionType)columnDef.type).valueComparator();
-        return valueComparator.compare(indexCollectionValue, targetCollectionValue) != 0;
+        return CompositeType.build(path.get(0), cellValue);
     }
 
-    private Cell extractTargetCell(IndexedEntry entry, ColumnFamily data)
+    public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec)
     {
-        ByteBuffer collectionKey = extractCollectionKey(entry);
-        CellName name = data.getComparator().create(entry.indexedEntryPrefix, columnDef, collectionKey);
-        return data.getColumn(name);
-    }
+        ByteBuffer[] components = ((CompositeType)getIndexKeyComparator()).split(indexValue);
+        ByteBuffer mapKey = components[0];
+        ByteBuffer mapValue = components[1];
 
-    private ByteBuffer extractCollectionKey(IndexedEntry entry)
-    {
-        return extractIndexKeyComponent(entry, 0);
-    }
-
-    private ByteBuffer extractIndexKeyComponent(IndexedEntry entry, int component)
-    {
-        return CompositeType.extractComponent(entry.indexValue.getKey(), component);
-    }
-
-    private ByteBuffer extractCollectionValue(IndexedEntry entry)
-    {
-        return extractIndexKeyComponent(entry, 1);
-    }
+        Cell cell = data.getCell(columnDef, CellPath.create(mapKey));
+        if (cell == null || !cell.isLive(nowInSec))
+            return true;
 
-    private boolean cellIsDead(Cell cell, long now)
-    {
-        return cell == null || !cell.isLive(now);
+        AbstractType<?> valueComparator = ((CollectionType)columnDef.type).valueComparator();
+        return valueComparator.compare(mapValue, cell.value()) != 0;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java
index a11a0d9..5af842c 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java
@@ -18,19 +18,13 @@
 package org.apache.cassandra.db.index.composites;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Iterator;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.CBuilder;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.composites.CompoundDenseCellNameType;
-import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.marshal.*;
 
 /**
@@ -46,15 +40,12 @@ import org.apache.cassandra.db.marshal.*;
  */
 public class CompositesIndexOnCollectionValue extends CompositesIndex
 {
-    public static CellNameType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef)
+    public static void addClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata, ColumnDefinition columnDef)
     {
-        int prefixSize = columnDef.position();
-        List<AbstractType<?>> types = new ArrayList<>(prefixSize + 2);
-        types.add(SecondaryIndex.keyComparator);
-        for (int i = 0; i < prefixSize; i++)
-            types.add(baseMetadata.comparator.subtype(i));
-        types.add(((CollectionType)columnDef.type).nameComparator()); // collection key
-        return new CompoundDenseCellNameType(types);
+        addGenericClusteringColumns(indexMetadata, baseMetadata, columnDef);
+
+        // collection key
+        indexMetadata.addClusteringColumn("cell_path", ((CollectionType)columnDef.type).nameComparator());
     }
 
     @Override
@@ -63,36 +54,32 @@ public class CompositesIndexOnCollectionValue extends CompositesIndex
         return ((CollectionType)columnDef.type).valueComparator();
     }
 
-    protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Cell cell)
+    protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath path)
     {
-        return cell.value();
+        return cellValue;
     }
 
-    protected Composite makeIndexColumnPrefix(ByteBuffer rowKey, Composite cellName)
+    protected CBuilder buildIndexClusteringPrefix(ByteBuffer rowKey, ClusteringPrefix prefix, CellPath path)
     {
-        CBuilder builder = getIndexComparator().prefixBuilder();
+        CBuilder builder = CBuilder.create(getIndexComparator());
         builder.add(rowKey);
-        for (int i = 0; i < Math.min(columnDef.position(), cellName.size()); i++)
-            builder.add(cellName.get(i));
+        for (int i = 0; i < prefix.size(); i++)
+            builder.add(prefix.get(i));
+
+        // When indexing, cell will be present, but when searching, it won't  (CASSANDRA-7525)
+        if (prefix.size() == baseCfs.metadata.clusteringColumns().size() && path != null)
+            builder.add(path.get(0));
 
-        // When indexing, cellName is a full name including the collection
-        // key. When searching, restricted clustering columns are included
-        // but the collection key is not. In this case, don't try to add an
-        // element to the builder for it, as it will just end up null and
-        // error out when retrieving cells from the index cf (CASSANDRA-7525)
-        if (cellName.size() >= columnDef.position() + 1)
-            builder.add(cellName.get(columnDef.position() + 1));
-        return builder.build();
+        return builder;
     }
 
-    public IndexedEntry decodeEntry(DecoratedKey indexedValue, Cell indexEntry)
+    public IndexedEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry)
     {
-        int prefixSize = columnDef.position();
-        CellName name = indexEntry.name();
-        CBuilder builder = baseCfs.getComparator().builder();
-        for (int i = 0; i < prefixSize; i++)
-            builder.add(name.get(i + 1));
-        return new IndexedEntry(indexedValue, name, indexEntry.timestamp(), name.get(0), builder.build(), name.get(prefixSize + 1));
+        Clustering clustering = indexEntry.clustering();
+        CBuilder builder = CBuilder.create(baseCfs.getComparator());
+        for (int i = 0; i < baseCfs.getComparator().size(); i++)
+            builder.add(clustering.get(i + 1));
+        return new IndexedEntry(indexedValue, clustering, indexEntry.primaryKeyLivenessInfo().timestamp(), clustering.get(0), builder.build());
     }
 
     @Override
@@ -101,18 +88,15 @@ public class CompositesIndexOnCollectionValue extends CompositesIndex
         return operator == Operator.CONTAINS && !(columnDef.type instanceof SetType);
     }
 
-    @Override
-    public boolean indexes(CellName name)
-    {
-        AbstractType<?> comp = baseCfs.metadata.getColumnDefinitionComparator(columnDef);
-        return name.size() > columnDef.position()
-            && comp.compare(name.get(columnDef.position()), columnDef.name.bytes) == 0;
-    }
-
-    public boolean isStale(IndexedEntry entry, ColumnFamily data, long now)
+    public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec)
     {
-        CellName name = data.getComparator().create(entry.indexedEntryPrefix, columnDef, entry.indexedEntryCollectionKey);
-        Cell cell = data.getColumn(name);
-        return cell == null || !cell.isLive(now) || ((CollectionType) columnDef.type).valueComparator().compare(entry.indexValue.getKey(), cell.value()) != 0;
+        Iterator<Cell> iter = data.getCells(columnDef);
+        while (iter.hasNext())
+        {
+            Cell cell = iter.next();
+            if (cell.isLive(nowInSec) && ((CollectionType) columnDef.type).valueComparator().compare(indexValue, cell.value()) == 0)
+                return false;
+        }
+        return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
index df43057..d48e58b 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
@@ -18,14 +18,10 @@
 package org.apache.cassandra.db.index.composites;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
 
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.*;
-import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 
@@ -49,57 +45,59 @@ import org.apache.cassandra.utils.concurrent.OpOrder;
  */
 public class CompositesIndexOnPartitionKey extends CompositesIndex
 {
-    public static CellNameType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef)
-    {
-        int ckCount = baseMetadata.clusteringColumns().size();
-        List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(ckCount + 1);
-        types.add(SecondaryIndex.keyComparator);
-        for (int i = 0; i < ckCount; i++)
-            types.add(baseMetadata.comparator.subtype(i));
-        return new CompoundDenseCellNameType(types);
-    }
-
-    protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Cell cell)
+    protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath path)
     {
         CompositeType keyComparator = (CompositeType)baseCfs.metadata.getKeyValidator();
         ByteBuffer[] components = keyComparator.split(rowKey);
         return components[columnDef.position()];
     }
 
-    protected Composite makeIndexColumnPrefix(ByteBuffer rowKey, Composite columnName)
+    protected CBuilder buildIndexClusteringPrefix(ByteBuffer rowKey, ClusteringPrefix prefix, CellPath path)
     {
-        int count = Math.min(baseCfs.metadata.clusteringColumns().size(), columnName.size());
-        CBuilder builder = getIndexComparator().prefixBuilder();
+        CBuilder builder = CBuilder.create(getIndexComparator());
         builder.add(rowKey);
-        for (int i = 0; i < count; i++)
-            builder.add(columnName.get(i));
-        return builder.build();
+        for (int i = 0; i < prefix.size(); i++)
+            builder.add(prefix.get(i));
+        return builder;
     }
 
-    public IndexedEntry decodeEntry(DecoratedKey indexedValue, Cell indexEntry)
+    public IndexedEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry)
     {
         int ckCount = baseCfs.metadata.clusteringColumns().size();
-        CBuilder builder = baseCfs.getComparator().builder();
+        Clustering clustering = indexEntry.clustering();
+        CBuilder builder = CBuilder.create(baseCfs.getComparator());
         for (int i = 0; i < ckCount; i++)
-            builder.add(indexEntry.name().get(i + 1));
+            builder.add(clustering.get(i + 1));
 
-        return new IndexedEntry(indexedValue, indexEntry.name(), indexEntry.timestamp(), indexEntry.name().get(0), builder.build());
+        return new IndexedEntry(indexedValue, clustering, indexEntry.primaryKeyLivenessInfo().timestamp(), clustering.get(0), builder.build());
     }
 
     @Override
-    public boolean indexes(CellName name)
+    protected boolean indexPrimaryKeyColumn()
     {
-        // Since a partition key is always full, we always index it
         return true;
     }
 
-    public boolean isStale(IndexedEntry entry, ColumnFamily data, long now)
+    @Override
+    public boolean indexes(ColumnDefinition c)
+    {
+        // Actual indexing for this index type is done through maybeIndex
+        return false;
+    }
+
+    public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec)
+    {
+        return !data.hasLiveData(nowInSec);
+    }
+
+    @Override
+    public void maybeIndex(ByteBuffer partitionKey, Clustering clustering, long timestamp, int ttl, OpOrder.Group opGroup, int nowInSec)
     {
-        return data.hasOnlyTombstones(now);
+        insert(partitionKey, clustering, null, SimpleLivenessInfo.forUpdate(timestamp, ttl, nowInSec, indexCfs.metadata), opGroup);
     }
 
     @Override
-    public void delete(ByteBuffer rowKey, Cell cell, OpOrder.Group opGroup)
+    public void delete(ByteBuffer rowKey, Clustering clustering, Cell cell, OpOrder.Group opGroup, int nowInSec)
     {
         // We only know that one column of the CQL row has been updated/deleted, but we don't know if the
         // full row has been deleted so we should not do anything. If it ends up that the whole row has

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
index b9dc07f..a88502a 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
@@ -18,15 +18,9 @@
 package org.apache.cassandra.db.index.composites;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.*;
-import org.apache.cassandra.db.index.SecondaryIndex;
-import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.rows.*;
 
 /**
  * Index on a REGULAR column definition on a composite type.
@@ -47,50 +41,35 @@ import org.apache.cassandra.db.marshal.*;
  */
 public class CompositesIndexOnRegular extends CompositesIndex
 {
-    public static CellNameType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef)
+    protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath path)
     {
-        int prefixSize = columnDef.position();
-        List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(prefixSize + 1);
-        types.add(SecondaryIndex.keyComparator);
-        for (int i = 0; i < prefixSize; i++)
-            types.add(baseMetadata.comparator.subtype(i));
-        return new CompoundDenseCellNameType(types);
+        return cellValue;
     }
 
-    protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Cell cell)
+    protected CBuilder buildIndexClusteringPrefix(ByteBuffer rowKey, ClusteringPrefix prefix, CellPath path)
     {
-        return cell.value();
-    }
-
-    protected Composite makeIndexColumnPrefix(ByteBuffer rowKey, Composite cellName)
-    {
-        CBuilder builder = getIndexComparator().prefixBuilder();
+        CBuilder builder = CBuilder.create(getIndexComparator());
         builder.add(rowKey);
-        for (int i = 0; i < Math.min(columnDef.position(), cellName.size()); i++)
-            builder.add(cellName.get(i));
-        return builder.build();
-    }
-
-    public IndexedEntry decodeEntry(DecoratedKey indexedValue, Cell indexEntry)
-    {
-        CBuilder builder = baseCfs.getComparator().builder();
-        for (int i = 0; i < columnDef.position(); i++)
-            builder.add(indexEntry.name().get(i + 1));
-        return new IndexedEntry(indexedValue, indexEntry.name(), indexEntry.timestamp(), indexEntry.name().get(0), builder.build());
+        for (int i = 0; i < prefix.size(); i++)
+            builder.add(prefix.get(i));
+        return builder;
     }
 
-    @Override
-    public boolean indexes(CellName name)
+    public IndexedEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry)
     {
-        AbstractType<?> comp = baseCfs.metadata.getColumnDefinitionComparator(columnDef);
-        return name.size() > columnDef.position()
-            && comp.compare(name.get(columnDef.position()), columnDef.name.bytes) == 0;
+        Clustering clustering = indexEntry.clustering();
+        ClusteringComparator baseComparator = baseCfs.getComparator();
+        CBuilder builder = CBuilder.create(baseComparator);
+        for (int i = 0; i < baseComparator.size(); i++)
+            builder.add(clustering.get(i + 1));
+        return new IndexedEntry(indexedValue, clustering, indexEntry.primaryKeyLivenessInfo().timestamp(), clustering.get(0), builder.build());
     }
 
-    public boolean isStale(IndexedEntry entry, ColumnFamily data, long now)
+    public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec)
     {
-        CellName name = data.getComparator().create(entry.indexedEntryPrefix, columnDef);
-        Cell cell = data.getColumn(name);
-        return cell == null || !cell.isLive(now) || columnDef.type.compare(entry.indexValue.getKey(), cell.value()) != 0;
+        Cell cell = data.getCell(columnDef);
+        return cell == null
+            || !cell.isLive(nowInSec)
+            || columnDef.type.compare(indexValue, cell.value()) != 0;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
index 88453df..f838ff1 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
@@ -17,296 +17,224 @@
  */
 package org.apache.cassandra.db.index.composites;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.db.ArrayBackedSortedColumns;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.IndexExpression;
-import org.apache.cassandra.db.Row;
-import org.apache.cassandra.db.RowPosition;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.composites.Composites;
-import org.apache.cassandra.db.filter.ColumnSlice;
-import org.apache.cassandra.db.filter.ExtendedFilter;
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.filter.SliceQueryFilter;
-import org.apache.cassandra.db.index.SecondaryIndexManager;
-import org.apache.cassandra.db.index.SecondaryIndexSearcher;
-import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.index.*;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 
+
 public class CompositesSearcher extends SecondaryIndexSearcher
 {
     private static final Logger logger = LoggerFactory.getLogger(CompositesSearcher.class);
 
-    public CompositesSearcher(SecondaryIndexManager indexManager, Set<ByteBuffer> columns)
+    public CompositesSearcher(SecondaryIndexManager indexManager, Set<ColumnDefinition> columns)
     {
         super(indexManager, columns);
     }
 
-    @Override
-    public List<Row> search(ExtendedFilter filter)
+    private boolean isMatchingEntry(DecoratedKey partitionKey, CompositesIndex.IndexedEntry entry, ReadCommand command)
     {
-        assert filter.getClause() != null && !filter.getClause().isEmpty();
-        final IndexExpression primary = highestSelectivityPredicate(filter.getClause(), true);
-        final CompositesIndex index = (CompositesIndex)indexManager.getIndexForColumn(primary.column);
-        // TODO: this should perhaps not open and maintain a writeOp for the full duration, but instead only *try* to delete stale entries, without blocking if there's no room
-        // as it stands, we open a writeOp and keep it open for the duration to ensure that should this CF get flushed to make room we don't block the reclamation of any room being made
-        try (OpOrder.Group writeOp = baseCfs.keyspace.writeOrder.start(); OpOrder.Group baseOp = baseCfs.readOrdering.start(); OpOrder.Group indexOp = index.getIndexCfs().readOrdering.start())
-        {
-            return baseCfs.filter(getIndexedIterator(writeOp, filter, primary, index), filter);
-        }
+        return command.selects(partitionKey, entry.indexedEntryClustering);
     }
 
-    private Composite makePrefix(CompositesIndex index, ByteBuffer key, ExtendedFilter filter, boolean isStart)
+    protected UnfilteredPartitionIterator queryDataFromIndex(AbstractSimplePerColumnSecondaryIndex secondaryIdx,
+                                                             final DecoratedKey indexKey,
+                                                             final RowIterator indexHits,
+                                                             final ReadCommand command,
+                                                             final ReadOrderGroup orderGroup)
     {
-        if (key.remaining() == 0)
-            return Composites.EMPTY;
+        assert indexHits.staticRow() == Rows.EMPTY_STATIC_ROW;
 
-        Composite prefix;
-        IDiskAtomFilter columnFilter = filter.columnFilter(key);
-        if (columnFilter instanceof SliceQueryFilter)
-        {
-            SliceQueryFilter sqf = (SliceQueryFilter)columnFilter;
-            Composite columnName = isStart ? sqf.start() : sqf.finish();
-            prefix = columnName.isEmpty() ? index.getIndexComparator().make(key) : index.makeIndexColumnPrefix(key, columnName);
-        }
-        else
-        {
-            prefix = index.getIndexComparator().make(key);
-        }
-        return isStart ? prefix.start() : prefix.end();
-    }
+        assert secondaryIdx instanceof CompositesIndex;
+        final CompositesIndex index = (CompositesIndex)secondaryIdx;
 
-    private ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final OpOrder.Group writeOp, final ExtendedFilter filter, final IndexExpression primary, final CompositesIndex index)
-    {
-        // Start with the most-restrictive indexed clause, then apply remaining clauses
-        // to each row matching that clause.
-        // TODO: allow merge join instead of just one index + loop
-        assert index != null;
-        assert index.getIndexCfs() != null;
-        final DecoratedKey indexKey = index.getIndexKeyFor(primary.value);
-
-        if (logger.isDebugEnabled())
-            logger.debug("Most-selective indexed predicate is {}", index.expressionString(primary));
-
-        /*
-         * XXX: If the range requested is a token range, we'll have to start at the beginning (and stop at the end) of
-         * the indexed row unfortunately (which will be inefficient), because we have not way to intuit the smallest
-         * possible key having a given token. A fix would be to actually store the token along the key in the
-         * indexed row.
-         */
-        final AbstractBounds<RowPosition> range = filter.dataRange.keyRange();
-        ByteBuffer startKey = range.left instanceof DecoratedKey ? ((DecoratedKey)range.left).getKey() : ByteBufferUtil.EMPTY_BYTE_BUFFER;
-        ByteBuffer endKey = range.right instanceof DecoratedKey ? ((DecoratedKey)range.right).getKey() : ByteBufferUtil.EMPTY_BYTE_BUFFER;
-
-        final CellNameType baseComparator = baseCfs.getComparator();
-        final CellNameType indexComparator = index.getIndexCfs().getComparator();
-
-        final Composite startPrefix = makePrefix(index, startKey, filter, true);
-        final Composite endPrefix = makePrefix(index, endKey, filter, false);
-
-        return new ColumnFamilyStore.AbstractScanIterator()
+        return new UnfilteredPartitionIterator()
         {
-            private Composite lastSeenPrefix = startPrefix;
-            private Deque<Cell> indexCells;
-            private int columnsRead = Integer.MAX_VALUE;
-            private int limit = filter.currentLimit();
-            private int columnsCount = 0;
+            private CompositesIndex.IndexedEntry nextEntry;
 
-            // We have to fetch at least two rows to avoid breaking paging if the first row doesn't satisfy all clauses
-            private int indexCellsPerQuery = Math.max(2, Math.min(filter.maxColumns(), filter.maxRows()));
+            private UnfilteredRowIterator next;
 
-            public boolean needsFiltering()
+            public boolean isForThrift()
             {
-                return false;
+                return command.isForThrift();
+            }
+
+            public boolean hasNext()
+            {
+                return prepareNext();
             }
 
-            private Row makeReturn(DecoratedKey key, ColumnFamily data)
+            public UnfilteredRowIterator next()
             {
-                if (data == null)
-                    return endOfData();
+                if (next == null)
+                    prepareNext();
 
-                assert key != null;
-                return new Row(key, data);
+                UnfilteredRowIterator toReturn = next;
+                next = null;
+                return toReturn;
             }
 
-            protected Row computeNext()
+            private boolean prepareNext()
             {
-                /*
-                 * Our internal index code is wired toward internal rows. So we need to accumulate all results for a given
-                 * row before returning from this method. Which unfortunately means that this method has to do what
-                 * CFS.filter does for KeysIndex.
-                 */
-                DecoratedKey currentKey = null;
-                ColumnFamily data = null;
-                Composite previousPrefix = null;
-
-                while (true)
-                {
-                    // Did we get more columns that needed to respect the user limit?
-                    // (but we still need to return what has been fetched already)
-                    if (columnsCount >= limit)
-                        return makeReturn(currentKey, data);
+                if (next != null)
+                    return true;
 
-                    if (indexCells == null || indexCells.isEmpty())
+                if (nextEntry == null)
+                {
+                    if (!indexHits.hasNext())
+                        return false;
+
+                    nextEntry = index.decodeEntry(indexKey, indexHits.next());
+                }
+
+                // Gather all index hits belonging to the same partition and query the data for those hits.
+                // TODO: it's much more efficient to do 1 read for all hits to the same partition than doing
+                // 1 read per index hit. However, this basically mean materializing all hits for a partition
+                // in memory so we should consider adding some paging mechanism. However, index hits should
+                // be relatively small so it's much better than the previous code that was materializing all
+                // *data* for a given partition.
+                NavigableSet<Clustering> clusterings = new TreeSet<>(baseCfs.getComparator());
+                List<CompositesIndex.IndexedEntry> entries = new ArrayList<>();
+                DecoratedKey partitionKey = baseCfs.partitioner.decorateKey(nextEntry.indexedKey);
+
+                while (nextEntry != null && partitionKey.getKey().equals(nextEntry.indexedKey))
+                {
+                    // We're queried a slice of the index, but some hits may not match some of the clustering column constraints
+                    if (isMatchingEntry(partitionKey, nextEntry, command))
                     {
-                        if (columnsRead < indexCellsPerQuery)
-                        {
-                            logger.trace("Read only {} (< {}) last page through, must be done", columnsRead, indexCellsPerQuery);
-                            return makeReturn(currentKey, data);
-                        }
-
-                        if (logger.isTraceEnabled())
-                            logger.trace("Scanning index {} starting with {}",
-                                         index.expressionString(primary), indexComparator.getString(startPrefix));
-
-                        QueryFilter indexFilter = QueryFilter.getSliceFilter(indexKey,
-                                                                             index.getIndexCfs().name,
-                                                                             lastSeenPrefix,
-                                                                             endPrefix,
-                                                                             false,
-                                                                             indexCellsPerQuery,
-                                                                             filter.timestamp);
-                        ColumnFamily indexRow = index.getIndexCfs().getColumnFamily(indexFilter);
-                        if (indexRow == null || !indexRow.hasColumns())
-                            return makeReturn(currentKey, data);
-
-                        Collection<Cell> sortedCells = indexRow.getSortedColumns();
-                        columnsRead = sortedCells.size();
-                        indexCells = new ArrayDeque<>(sortedCells);
-                        Cell firstCell = sortedCells.iterator().next();
-
-                        // Paging is racy, so it is possible the first column of a page is not the last seen one.
-                        if (lastSeenPrefix != startPrefix && lastSeenPrefix.equals(firstCell.name()))
-                        {
-                            // skip the row we already saw w/ the last page of results
-                            indexCells.poll();
-                            logger.trace("Skipping {}", indexComparator.getString(firstCell.name()));
-                        }
+                        clusterings.add(nextEntry.indexedEntryClustering);
+                        entries.add(nextEntry);
                     }
 
-                    while (!indexCells.isEmpty() && columnsCount <= limit)
-                    {
-                        Cell cell = indexCells.poll();
-                        lastSeenPrefix = cell.name();
-                        if (!cell.isLive(filter.timestamp))
-                        {
-                            logger.trace("skipping {}", cell.name());
-                            continue;
-                        }
-
-                        CompositesIndex.IndexedEntry entry = index.decodeEntry(indexKey, cell);
-                        DecoratedKey dk = baseCfs.partitioner.decorateKey(entry.indexedKey);
-
-                        // Are we done for this row?
-                        if (currentKey == null)
-                        {
-                            currentKey = dk;
-                        }
-                        else if (!currentKey.equals(dk))
-                        {
-                            DecoratedKey previousKey = currentKey;
-                            currentKey = dk;
-                            previousPrefix = null;
-
-                            // We're done with the previous row, return it if it had data, continue otherwise
-                            indexCells.addFirst(cell);
-                            if (data == null)
-                                continue;
-                            else
-                                return makeReturn(previousKey, data);
-                        }
-
-                        if (!range.contains(dk))
-                        {
-                            // Either we're not yet in the range cause the range is start excluding, or we're
-                            // past it.
-                            if (!range.right.isMinimum() && range.right.compareTo(dk) < 0)
-                            {
-                                logger.trace("Reached end of assigned scan range");
-                                return endOfData();
-                            }
-                            else
-                            {
-                                logger.debug("Skipping entry {} before assigned scan range", dk.getToken());
-                                continue;
-                            }
-                        }
-
-                        // Check if this entry cannot be a hit due to the original cell filter
-                        Composite start = entry.indexedEntryPrefix;
-                        if (!filter.columnFilter(dk.getKey()).maySelectPrefix(baseComparator, start))
-                            continue;
-
-                        // If we've record the previous prefix, it means we're dealing with an index on the collection value. In
-                        // that case, we can have multiple index prefix for the same CQL3 row. In that case, we want to only add
-                        // the CQL3 row once (because requesting the data multiple time would be inefficient but more importantly
-                        // because we shouldn't count the columns multiple times with the lastCounted() call at the end of this
-                        // method).
-                        if (previousPrefix != null && previousPrefix.equals(start))
-                            continue;
-                        else
-                            previousPrefix = null;
-
-                        if (logger.isTraceEnabled())
-                            logger.trace("Adding index hit to current row for {}", indexComparator.getString(cell.name()));
-
-                        // We always query the whole CQL3 row. In the case where the original filter was a name filter this might be
-                        // slightly wasteful, but this probably doesn't matter in practice and it simplify things.
-                        ColumnSlice dataSlice = new ColumnSlice(start, entry.indexedEntryPrefix.end());
-                        // If the table has static columns, we must fetch them too as they may need to be returned too.
-                        // Note that this is potentially wasteful for 2 reasons:
-                        //  1) we will retrieve the static parts for each indexed row, even if we have more than one row in
-                        //     the same partition. If we were to group data queries to rows on the same slice, which would
-                        //     speed up things in general, we would also optimize here since we would fetch static columns only
-                        //     once for each group.
-                        //  2) at this point we don't know if the user asked for static columns or not, so we might be fetching
-                        //     them for nothing. We would however need to ship the list of "CQL3 columns selected" with getRangeSlice
-                        //     to be able to know that.
-                        // TODO: we should improve both point above
-                        ColumnSlice[] slices = baseCfs.metadata.hasStaticColumns()
-                                             ? new ColumnSlice[]{ baseCfs.metadata.comparator.staticPrefix().slice(), dataSlice }
-                                             : new ColumnSlice[]{ dataSlice };
-                        SliceQueryFilter dataFilter = new SliceQueryFilter(slices, false, Integer.MAX_VALUE, baseCfs.metadata.clusteringColumns().size());
-                        ColumnFamily newData = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, dataFilter, filter.timestamp));
-                        if (newData == null || index.isStale(entry, newData, filter.timestamp))
-                        {
-                            index.delete(entry, writeOp);
-                            continue;
-                        }
-
-                        assert newData != null : "An entry with no data should have been considered stale";
-
-                        // We know the entry is not stale and so the entry satisfy the primary clause. So whether
-                        // or not the data satisfies the other clauses, there will be no point to re-check the
-                        // same CQL3 row if we run into another collection value entry for this row.
-                        if (entry.indexedEntryCollectionKey != null)
-                            previousPrefix = start;
-
-                        if (!filter.isSatisfiedBy(dk, newData, entry.indexedEntryPrefix, entry.indexedEntryCollectionKey))
-                            continue;
-
-                        if (data == null)
-                            data = ArrayBackedSortedColumns.factory.create(baseCfs.metadata);
-                        data.addAll(newData);
-                        columnsCount += dataFilter.lastCounted();
-                    }
-                 }
-             }
+                    nextEntry = indexHits.hasNext() ? index.decodeEntry(indexKey, indexHits.next()) : null;
+                }
+
+                // Because we've eliminated entries that don't match the clustering columns, it's possible we added nothing
+                if (clusterings.isEmpty())
+                    return prepareNext();
+
+                // Query the gathered index hits. We still need to filter stale hits from the resulting query.
+                ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(clusterings, false);
+                SinglePartitionReadCommand dataCmd = new SinglePartitionNamesCommand(baseCfs.metadata,
+                                                                                     command.nowInSec(),
+                                                                                     command.columnFilter(),
+                                                                                     command.rowFilter(),
+                                                                                     DataLimits.NONE,
+                                                                                     partitionKey,
+                                                                                     filter);
+                @SuppressWarnings("resource") // We close right away if empty, and if it's assign to next it will be called either
+                                              // by the next caller of next, or through closing this iterator is this come before.
+                UnfilteredRowIterator dataIter = filterStaleEntries(dataCmd.queryMemtableAndDisk(baseCfs, orderGroup.baseReadOpOrderGroup()),
+                                                                    index,
+                                                                    indexKey.getKey(),
+                                                                    entries,
+                                                                    orderGroup.writeOpOrderGroup(),
+                                                                    command.nowInSec());
+                if (dataIter.isEmpty())
+                {
+                    dataIter.close();
+                    return prepareNext();
+                }
+
+                next = dataIter;
+                return true;
+            }
 
-            public void close() throws IOException {}
+            public void remove()
+            {
+                throw new UnsupportedOperationException();
+            }
+
+            public void close()
+            {
+                indexHits.close();
+                if (next != null)
+                    next.close();
+            }
+        };
+    }
+
+    private UnfilteredRowIterator filterStaleEntries(UnfilteredRowIterator dataIter,
+                                                     final CompositesIndex index,
+                                                     final ByteBuffer indexValue,
+                                                     final List<CompositesIndex.IndexedEntry> entries,
+                                                     final OpOrder.Group writeOp,
+                                                     final int nowInSec)
+    {
+        return new WrappingUnfilteredRowIterator(dataIter)
+        {
+            private int entriesIdx;
+            private Unfiltered next;
+
+            @Override
+            public boolean hasNext()
+            {
+                return prepareNext();
+            }
+
+            @Override
+            public Unfiltered next()
+            {
+                if (next == null)
+                    prepareNext();
+
+                Unfiltered toReturn = next;
+                next = null;
+                return toReturn;
+            }
+
+            private boolean prepareNext()
+            {
+                if (next != null)
+                    return true;
+
+                while (next == null && super.hasNext())
+                {
+                    next = super.next();
+                    if (next.kind() != Unfiltered.Kind.ROW)
+                        return true;
+
+                    Row row = (Row)next;
+                    CompositesIndex.IndexedEntry entry = findEntry(row.clustering(), writeOp, nowInSec);
+                    if (!index.isStale(row, indexValue, nowInSec))
+                        return true;
+
+                    // The entry is stale: delete the entry and ignore otherwise
+                    index.delete(entry, writeOp, nowInSec);
+                    next = null;
+                }
+                return false;
+            }
+
+            private CompositesIndex.IndexedEntry findEntry(Clustering clustering, OpOrder.Group writeOp, int nowInSec)
+            {
+                assert entriesIdx < entries.size();
+                while (entriesIdx < entries.size())
+                {
+                    CompositesIndex.IndexedEntry entry = entries.get(entriesIdx++);
+                    // The entries are in clustering order. So that the requested entry should be the
+                    // next entry, the one at 'entriesIdx'. However, we can have stale entries, entries
+                    // that have no corresponding row in the base table typically because of a range
+                    // tombstone or partition level deletion. Delete such stale entries.
+                    int cmp = metadata().comparator.compare(entry.indexedEntryClustering, clustering);
+                    assert cmp <= 0; // this would means entries are not in clustering order, which shouldn't happen
+                    if (cmp == 0)
+                        return entry;
+                    else
+                        index.delete(entry, writeOp, nowInSec);
+                }
+                // entries correspond to the rows we've queried, so we shouldn't have a row that has no corresponding entry.
+                throw new AssertionError();
+            }
         };
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
index e771d99..7930bd6 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
@@ -20,14 +20,15 @@ package org.apache.cassandra.db.index.keys;
 import java.nio.ByteBuffer;
 import java.util.Set;
 
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNames;
-import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.index.AbstractSimplePerColumnSecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexSearcher;
-import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.utils.concurrent.OpOrder;
 
 /**
  * Implements a secondary index for a column family using a second column family.
@@ -39,41 +40,51 @@ import org.apache.cassandra.exceptions.ConfigurationException;
  */
 public class KeysIndex extends AbstractSimplePerColumnSecondaryIndex
 {
-    protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Cell cell)
+    public static void addIndexClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata, ColumnDefinition cfDef)
     {
-        return cell.value();
+        indexMetadata.addClusteringColumn("partition_key", SecondaryIndex.keyComparator);
     }
 
-    protected CellName makeIndexColumnName(ByteBuffer rowKey, Cell cell)
+    @Override
+    public void indexRow(DecoratedKey key, Row row, OpOrder.Group opGroup, int nowInSec)
     {
-        return CellNames.simpleDense(rowKey);
-    }
+        super.indexRow(key, row, opGroup, nowInSec);
 
-    public SecondaryIndexSearcher createSecondaryIndexSearcher(Set<ByteBuffer> columns)
-    {
-        return new KeysSearcher(baseCfs.indexManager, columns);
+        // This is used when building indexes, in particular when the index is first created. On thrift, this
+        // potentially means the column definition just got created, and so we need to check if's not a "dynamic"
+        // row that actually correspond to the index definition.
+        assert baseCfs.metadata.isCompactTable();
+        if (!row.isStatic())
+        {
+            Clustering clustering = row.clustering();
+            if (clustering.get(0).equals(columnDef.name.bytes))
+            {
+                Cell cell = row.getCell(baseCfs.metadata.compactValueColumn());
+                if (cell != null && cell.isLive(nowInSec))
+                    insert(key.getKey(), clustering, cell, opGroup);
+            }
+        }
     }
 
-    public boolean isIndexEntryStale(ByteBuffer indexedValue, ColumnFamily data, long now)
+    protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath path)
     {
-        Cell cell = data.getColumn(data.getComparator().makeCellName(columnDef.name.bytes));
-        return cell == null || !cell.isLive(now) || columnDef.type.compare(indexedValue, cell.value()) != 0;
+        return cellValue;
     }
 
-    public void validateOptions() throws ConfigurationException
+    protected CBuilder buildIndexClusteringPrefix(ByteBuffer rowKey, ClusteringPrefix prefix, CellPath path)
     {
-        // no options used
+        CBuilder builder = CBuilder.create(getIndexComparator());
+        builder.add(rowKey);
+        return builder;
     }
 
-    public boolean indexes(CellName name)
+    public SecondaryIndexSearcher createSecondaryIndexSearcher(Set<ColumnDefinition> columns)
     {
-        // This consider the full cellName directly
-        AbstractType<?> comparator = baseCfs.metadata.getColumnDefinitionComparator(columnDef);
-        return comparator.compare(columnDef.name.bytes, name.toByteBuffer()) == 0;
+        return new KeysSearcher(baseCfs.indexManager, columns);
     }
 
-    protected AbstractType getExpressionComparator()
+    public void validateOptions() throws ConfigurationException
     {
-        return baseCfs.getComparator().asAbstractType();
+        // no options used
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
index b4fd0ba..6b53640 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
@@ -17,190 +17,169 @@
  */
 package org.apache.cassandra.db.index.keys;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
 import java.util.Set;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.composites.Composites;
-import org.apache.cassandra.db.filter.ExtendedFilter;
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.index.*;
-import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 
 public class KeysSearcher extends SecondaryIndexSearcher
 {
     private static final Logger logger = LoggerFactory.getLogger(KeysSearcher.class);
 
-    public KeysSearcher(SecondaryIndexManager indexManager, Set<ByteBuffer> columns)
+    public KeysSearcher(SecondaryIndexManager indexManager, Set<ColumnDefinition> columns)
     {
         super(indexManager, columns);
     }
 
-    @Override
-    public List<Row> search(ExtendedFilter filter)
+    protected UnfilteredPartitionIterator queryDataFromIndex(final AbstractSimplePerColumnSecondaryIndex index,
+                                                             final DecoratedKey indexKey,
+                                                             final RowIterator indexHits,
+                                                             final ReadCommand command,
+                                                             final ReadOrderGroup orderGroup)
     {
-        assert filter.getClause() != null && !filter.getClause().isEmpty();
-        final IndexExpression primary = highestSelectivityPredicate(filter.getClause(), true);
-        final SecondaryIndex index = indexManager.getIndexForColumn(primary.column);
-        // TODO: this should perhaps not open and maintain a writeOp for the full duration, but instead only *try* to delete stale entries, without blocking if there's no room
-        // as it stands, we open a writeOp and keep it open for the duration to ensure that should this CF get flushed to make room we don't block the reclamation of any room  being made
-        try (OpOrder.Group writeOp = baseCfs.keyspace.writeOrder.start(); OpOrder.Group baseOp = baseCfs.readOrdering.start(); OpOrder.Group indexOp = index.getIndexCfs().readOrdering.start())
+        assert indexHits.staticRow() == Rows.EMPTY_STATIC_ROW;
+
+        return new UnfilteredPartitionIterator()
         {
-            return baseCfs.filter(getIndexedIterator(writeOp, filter, primary, index), filter);
-        }
-    }
+            private UnfilteredRowIterator next;
 
-    private ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final OpOrder.Group writeOp, final ExtendedFilter filter, final IndexExpression primary, final SecondaryIndex index)
-    {
+            public boolean isForThrift()
+            {
+                return command.isForThrift();
+            }
 
-        // Start with the most-restrictive indexed clause, then apply remaining clauses
-        // to each row matching that clause.
-        // TODO: allow merge join instead of just one index + loop
-        assert index != null;
-        assert index.getIndexCfs() != null;
-        final DecoratedKey indexKey = index.getIndexKeyFor(primary.value);
-
-        if (logger.isDebugEnabled())
-            logger.debug("Most-selective indexed predicate is {}",
-                         ((AbstractSimplePerColumnSecondaryIndex) index).expressionString(primary));
-
-        /*
-         * XXX: If the range requested is a token range, we'll have to start at the beginning (and stop at the end) of
-         * the indexed row unfortunately (which will be inefficient), because we have not way to intuit the small
-         * possible key having a given token. A fix would be to actually store the token along the key in the
-         * indexed row.
-         */
-        final AbstractBounds<RowPosition> range = filter.dataRange.keyRange();
-        CellNameType type = index.getIndexCfs().getComparator();
-        final Composite startKey = range.left instanceof DecoratedKey ? type.make(((DecoratedKey)range.left).getKey()) : Composites.EMPTY;
-        final Composite endKey = range.right instanceof DecoratedKey ? type.make(((DecoratedKey)range.right).getKey()) : Composites.EMPTY;
-
-        final CellName primaryColumn = baseCfs.getComparator().cellFromByteBuffer(primary.column);
-
-        return new ColumnFamilyStore.AbstractScanIterator()
-        {
-            private Composite lastSeenKey = startKey;
-            private Iterator<Cell> indexColumns;
-            private int columnsRead = Integer.MAX_VALUE;
+            public boolean hasNext()
+            {
+                return prepareNext();
+            }
 
-            protected Row computeNext()
+            public UnfilteredRowIterator next()
             {
-                int meanColumns = Math.max(index.getIndexCfs().getMeanColumns(), 1);
-                // We shouldn't fetch only 1 row as this provides buggy paging in case the first row doesn't satisfy all clauses
-                int rowsPerQuery = Math.max(Math.min(filter.maxRows(), filter.maxColumns() / meanColumns), 2);
-                while (true)
+                if (next == null)
+                    prepareNext();
+
+                UnfilteredRowIterator toReturn = next;
+                next = null;
+                return toReturn;
+            }
+
+            private boolean prepareNext()
+            {
+                while (next == null && indexHits.hasNext())
                 {
-                    if (indexColumns == null || !indexColumns.hasNext())
+                    Row hit = indexHits.next();
+                    DecoratedKey key = baseCfs.partitioner.decorateKey(hit.clustering().get(0));
+
+                    SinglePartitionReadCommand dataCmd = SinglePartitionReadCommand.create(isForThrift(),
+                                                                                           baseCfs.metadata,
+                                                                                           command.nowInSec(),
+                                                                                           command.columnFilter(),
+                                                                                           command.rowFilter(),
+                                                                                           DataLimits.NONE,
+                                                                                           key,
+                                                                                           command.clusteringIndexFilter(key));
+                    @SuppressWarnings("resource") // filterIfStale closes it's iterator if either it materialize it or if it returns null.
+                                                  // Otherwise, we close right away if empty, and if it's assigned to next it will be called either
+                                                  // by the next caller of next, or through closing this iterator is this come before.
+                    UnfilteredRowIterator dataIter = filterIfStale(dataCmd.queryMemtableAndDisk(baseCfs, orderGroup.baseReadOpOrderGroup()),
+                                                                   index,
+                                                                   hit,
+                                                                   indexKey.getKey(),
+                                                                   orderGroup.writeOpOrderGroup(),
+                                                                   isForThrift(),
+                                                                   command.nowInSec());
+
+                    if (dataIter != null)
                     {
-                        if (columnsRead < rowsPerQuery)
-                        {
-                            logger.trace("Read only {} (< {}) last page through, must be done", columnsRead, rowsPerQuery);
-                            return endOfData();
-                        }
-
-                        if (logger.isTraceEnabled() && (index instanceof AbstractSimplePerColumnSecondaryIndex))
-                            logger.trace("Scanning index {} starting with {}",
-                                         ((AbstractSimplePerColumnSecondaryIndex)index).expressionString(primary), index.getBaseCfs().metadata.getKeyValidator().getString(startKey.toByteBuffer()));
-
-                        QueryFilter indexFilter = QueryFilter.getSliceFilter(indexKey,
-                                                                             index.getIndexCfs().name,
-                                                                             lastSeenKey,
-                                                                             endKey,
-                                                                             false,
-                                                                             rowsPerQuery,
-                                                                             filter.timestamp);
-                        ColumnFamily indexRow = index.getIndexCfs().getColumnFamily(indexFilter);
-                        logger.trace("fetched {}", indexRow);
-                        if (indexRow == null)
-                        {
-                            logger.trace("no data, all done");
-                            return endOfData();
-                        }
-
-                        Collection<Cell> sortedCells = indexRow.getSortedColumns();
-                        columnsRead = sortedCells.size();
-                        indexColumns = sortedCells.iterator();
-                        Cell firstCell = sortedCells.iterator().next();
-
-                        // Paging is racy, so it is possible the first column of a page is not the last seen one.
-                        if (lastSeenKey != startKey && lastSeenKey.equals(firstCell.name()))
-                        {
-                            // skip the row we already saw w/ the last page of results
-                            indexColumns.next();
-                            logger.trace("Skipping {}", baseCfs.metadata.getKeyValidator().getString(firstCell.name().toByteBuffer()));
-                        }
-                        else if (range instanceof Range && indexColumns.hasNext() && firstCell.name().equals(startKey))
-                        {
-                            // skip key excluded by range
-                            indexColumns.next();
-                            logger.trace("Skipping first key as range excludes it");
-                        }
+                        if (dataIter.isEmpty())
+                            dataIter.close();
+                        else
+                            next = dataIter;
                     }
+                }
+                return next != null;
+            }
 
-                    while (indexColumns.hasNext())
-                    {
-                        Cell cell = indexColumns.next();
-                        lastSeenKey = cell.name();
-                        if (!cell.isLive(filter.timestamp))
-                        {
-                            logger.trace("skipping {}", cell.name());
-                            continue;
-                        }
-
-                        DecoratedKey dk = baseCfs.partitioner.decorateKey(lastSeenKey.toByteBuffer());
-                        if (!range.right.isMinimum() && range.right.compareTo(dk) < 0)
-                        {
-                            logger.trace("Reached end of assigned scan range");
-                            return endOfData();
-                        }
-                        if (!range.contains(dk))
-                        {
-                            logger.trace("Skipping entry {} outside of assigned scan range", dk.getToken());
-                            continue;
-                        }
-
-                        logger.trace("Returning index hit for {}", dk);
-                        ColumnFamily data = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, filter.columnFilter(lastSeenKey.toByteBuffer()), filter.timestamp));
-                        // While the column family we'll get in the end should contains the primary clause cell, the initialFilter may not have found it and can thus be null
-                        if (data == null)
-                            data = ArrayBackedSortedColumns.factory.create(baseCfs.metadata);
-
-                        // as in CFS.filter - extend the filter to ensure we include the columns
-                        // from the index expressions, just in case they weren't included in the initialFilter
-                        IDiskAtomFilter extraFilter = filter.getExtraFilter(dk, data);
-                        if (extraFilter != null)
-                        {
-                            ColumnFamily cf = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, extraFilter, filter.timestamp));
-                            if (cf != null)
-                                data.addAll(cf);
-                        }
-
-                        if (((KeysIndex)index).isIndexEntryStale(indexKey.getKey(), data, filter.timestamp))
-                        {
-                            // delete the index entry w/ its own timestamp
-                            Cell dummyCell = new BufferCell(primaryColumn, indexKey.getKey(), cell.timestamp());
-                            ((PerColumnSecondaryIndex)index).delete(dk.getKey(), dummyCell, writeOp);
-                            continue;
-                        }
-                        return new Row(dk, data);
-                    }
-                 }
-             }
+            public void remove()
+            {
+                throw new UnsupportedOperationException();
+            }
 
-            public void close() throws IOException {}
+            public void close()
+            {
+                indexHits.close();
+                if (next != null)
+                    next.close();
+            }
         };
     }
+
+    private UnfilteredRowIterator filterIfStale(UnfilteredRowIterator iterator,
+                                                AbstractSimplePerColumnSecondaryIndex index,
+                                                Row indexHit,
+                                                ByteBuffer indexedValue,
+                                                OpOrder.Group writeOp,
+                                                boolean isForThrift,
+                                                int nowInSec)
+    {
+        if (isForThrift)
+        {
+            // The data we got has gone though ThrifResultsMerger, so we're looking for the row whose clustering
+            // is the indexed name. Ans so we need to materialize the partition.
+            ArrayBackedPartition result = ArrayBackedPartition.create(iterator);
+            iterator.close();
+            Row data = result.getRow(new SimpleClustering(index.indexedColumn().name.bytes));
+            Cell cell = data == null ? null : data.getCell(baseCfs.metadata.compactValueColumn());
+            return deleteIfStale(iterator.partitionKey(), cell, index, indexHit, indexedValue, writeOp, nowInSec)
+                 ? null
+                 : result.unfilteredIterator();
+        }
+        else
+        {
+            assert iterator.metadata().isCompactTable();
+            Row data = iterator.staticRow();
+            Cell cell = data == null ? null : data.getCell(index.indexedColumn());
+            if (deleteIfStale(iterator.partitionKey(), cell, index, indexHit, indexedValue, writeOp, nowInSec))
+            {
+                iterator.close();
+                return null;
+            }
+            else
+            {
+                return iterator;
+            }
+        }
+    }
+
+    private boolean deleteIfStale(DecoratedKey partitionKey,
+                                  Cell cell,
+                                  AbstractSimplePerColumnSecondaryIndex index,
+                                  Row indexHit,
+                                  ByteBuffer indexedValue,
+                                  OpOrder.Group writeOp,
+                                  int nowInSec)
+    {
+        if (cell == null || !cell.isLive(nowInSec) || index.indexedColumn().type.compare(indexedValue, cell.value()) != 0)
+        {
+            // Index is stale, remove the index entry and ignore
+            index.delete(partitionKey.getKey(),
+                         new SimpleClustering(index.indexedColumn().name.bytes),
+                         indexedValue,
+                         null,
+                         new SimpleDeletionTime(indexHit.primaryKeyLivenessInfo().timestamp(), nowInSec),
+                         writeOp);
+            return true;
+        }
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/lifecycle/SSTableIntervalTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/SSTableIntervalTree.java b/src/java/org/apache/cassandra/db/lifecycle/SSTableIntervalTree.java
index ff2abcb..52ac227 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/SSTableIntervalTree.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/SSTableIntervalTree.java
@@ -6,16 +6,16 @@ import java.util.List;
 
 import com.google.common.collect.Iterables;
 
-import org.apache.cassandra.db.RowPosition;
+import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.utils.Interval;
 import org.apache.cassandra.utils.IntervalTree;
 
-public class SSTableIntervalTree extends IntervalTree<RowPosition, SSTableReader, Interval<RowPosition, SSTableReader>>
+public class SSTableIntervalTree extends IntervalTree<PartitionPosition, SSTableReader, Interval<PartitionPosition, SSTableReader>>
 {
     private static final SSTableIntervalTree EMPTY = new SSTableIntervalTree(null);
 
-    SSTableIntervalTree(Collection<Interval<RowPosition, SSTableReader>> intervals)
+    SSTableIntervalTree(Collection<Interval<PartitionPosition, SSTableReader>> intervals)
     {
         super(intervals);
     }
@@ -30,11 +30,11 @@ public class SSTableIntervalTree extends IntervalTree<RowPosition, SSTableReader
         return new SSTableIntervalTree(buildIntervals(sstables));
     }
 
-    public static List<Interval<RowPosition, SSTableReader>> buildIntervals(Iterable<SSTableReader> sstables)
+    public static List<Interval<PartitionPosition, SSTableReader>> buildIntervals(Iterable<SSTableReader> sstables)
     {
-        List<Interval<RowPosition, SSTableReader>> intervals = new ArrayList<>(Iterables.size(sstables));
+        List<Interval<PartitionPosition, SSTableReader>> intervals = new ArrayList<>(Iterables.size(sstables));
         for (SSTableReader sstable : sstables)
-            intervals.add(Interval.<RowPosition, SSTableReader>create(sstable.first, sstable.last, sstable));
+            intervals.add(Interval.<PartitionPosition, SSTableReader>create(sstable.first, sstable.last, sstable));
         return intervals;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/lifecycle/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/View.java b/src/java/org/apache/cassandra/db/lifecycle/View.java
index 0d1100b..f710dda 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/View.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/View.java
@@ -25,7 +25,7 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.*;
 
 import org.apache.cassandra.db.Memtable;
-import org.apache.cassandra.db.RowPosition;
+import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.utils.Interval;
@@ -126,12 +126,12 @@ public class View
         return String.format("View(pending_count=%d, sstables=%s, compacting=%s)", liveMemtables.size() + flushingMemtables.size() - 1, sstables, compacting);
     }
 
-    public List<SSTableReader> sstablesInBounds(AbstractBounds<RowPosition> rowBounds)
+    public List<SSTableReader> sstablesInBounds(AbstractBounds<PartitionPosition> rowBounds)
     {
         if (intervalTree.isEmpty())
             return Collections.emptyList();
-        RowPosition stopInTree = rowBounds.right.isMinimum() ? intervalTree.max() : rowBounds.right;
-        return intervalTree.search(Interval.<RowPosition, SSTableReader>create(rowBounds.left, stopInTree));
+        PartitionPosition stopInTree = rowBounds.right.isMinimum() ? intervalTree.max() : rowBounds.right;
+        return intervalTree.search(Interval.<PartitionPosition, SSTableReader>create(rowBounds.left, stopInTree));
     }
 
     // METHODS TO CONSTRUCT FUNCTIONS FOR MODIFYING A VIEW:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/AbstractType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractType.java b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
index aa25a81..b074b34 100644
--- a/src/java/org/apache/cassandra/db/marshal/AbstractType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
@@ -17,6 +17,8 @@
  */
 package org.apache.cassandra.db.marshal;
 
+import java.io.DataInput;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -27,11 +29,15 @@ import java.util.Map;
 
 import org.apache.cassandra.cql3.CQL3Type;
 import org.apache.cassandra.cql3.Term;
+import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.serializers.TypeSerializer;
 import org.apache.cassandra.serializers.MarshalException;
 
 import org.github.jamm.Unmetered;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
  * Specifies a Comparator for a specific type of ByteBuffer.
@@ -87,6 +93,9 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>
     /** get a string representation of the bytes suitable for log messages */
     public String getString(ByteBuffer bytes)
     {
+        if (bytes == null)
+            return "null";
+
         TypeSerializer<T> serializer = getSerializer();
         serializer.validate(bytes);
 
@@ -132,6 +141,17 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>
         return new CQL3Type.Custom(this);
     }
 
+    /**
+     * Same as compare except that this ignore ReversedType. This is to be use when
+     * comparing 2 values to decide for a CQL condition (see Operator.isSatisfiedBy) as
+     * for CQL, ReversedType is simply an "hint" to the storage engine but it does not
+     * change the meaning of queries per-se.
+     */
+    public int compareForCQL(ByteBuffer v1, ByteBuffer v2)
+    {
+        return compare(v1, v2);
+    }
+
     public abstract TypeSerializer<T> getSerializer();
 
     /* convenience method */
@@ -291,6 +311,50 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>
     }
 
     /**
+    * The length of values for this type if all values are of fixed length, -1 otherwise.
+     */
+    protected int valueLengthIfFixed()
+    {
+        return -1;
+    }
+
+    // This assumes that no empty values are passed
+    public void writeValue(ByteBuffer value, DataOutputPlus out) throws IOException
+    {
+        assert value.hasRemaining();
+        if (valueLengthIfFixed() >= 0)
+            out.write(value);
+        else
+            ByteBufferUtil.writeWithLength(value, out);
+    }
+
+    public long writtenLength(ByteBuffer value, TypeSizes sizes)
+    {
+        assert value.hasRemaining();
+        return valueLengthIfFixed() >= 0
+             ? value.remaining()
+             : sizes.sizeofWithLength(value);
+    }
+
+    public ByteBuffer readValue(DataInput in) throws IOException
+    {
+        int length = valueLengthIfFixed();
+        if (length >= 0)
+            return ByteBufferUtil.read(in, length);
+        else
+            return ByteBufferUtil.readWithLength(in);
+    }
+
+    public void skipValue(DataInput in) throws IOException
+    {
+        int length = valueLengthIfFixed();
+        if (length < 0)
+            length = in.readInt();
+
+        FileUtils.skipBytesFully(in, length);
+    }
+
+    /**
      * This must be overriden by subclasses if necessary so that for any
      * AbstractType, this == TypeParser.parse(toString()).
      *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/BooleanType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/BooleanType.java b/src/java/org/apache/cassandra/db/marshal/BooleanType.java
index bfe8c34..f87eb12 100644
--- a/src/java/org/apache/cassandra/db/marshal/BooleanType.java
+++ b/src/java/org/apache/cassandra/db/marshal/BooleanType.java
@@ -94,4 +94,10 @@ public class BooleanType extends AbstractType<Boolean>
     {
         return BooleanSerializer.instance;
     }
+
+    @Override
+    protected int valueLengthIfFixed()
+    {
+        return 1;
+    }
 }