You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:47:51 UTC

[27/51] [partial] cassandra git commit: Storage engine refactor, a.k.a CASSANDRA-8099

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java b/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
deleted file mode 100644
index a541d5e..0000000
--- a/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.filter;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.util.Comparator;
-import java.util.Iterator;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.composites.CType;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.util.FileDataInput;
-
-/**
- * Given an implementation-specific description of what columns to look for, provides methods
- * to extract the desired columns from a Memtable, SSTable, or SuperColumn.  Either the get*ColumnIterator
- * methods will be called, or filterSuperColumn, but not both on the same object.  QueryFilter
- * takes care of putting the two together if subcolumn filtering needs to be done, based on the
- * querypath that it knows (but that IFilter implementations are oblivious to).
- */
-public interface IDiskAtomFilter
-{
-    /**
-     * returns an iterator that returns columns from the given columnFamily
-     * matching the Filter criteria in sorted order.
-     */
-    public Iterator<Cell> getColumnIterator(ColumnFamily cf);
-
-    public OnDiskAtomIterator getColumnIterator(DecoratedKey key, ColumnFamily cf);
-
-    /**
-     * Get an iterator that returns columns from the given SSTable using the opened file
-     * matching the Filter criteria in sorted order.
-     * @param sstable
-     * @param file Already opened file data input, saves us opening another one
-     * @param key The key of the row we are about to iterate over
-     */
-    public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry);
-
-    /**
-     * returns an iterator that returns columns from the given SSTable
-     * matching the Filter criteria in sorted order.
-     */
-    public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, DecoratedKey key);
-
-    /**
-     * collects columns from reducedColumns into returnCF.  Termination is determined
-     * by the filter code, which should have some limit on the number of columns
-     * to avoid running out of memory on large rows.
-     */
-    public void collectReducedColumns(ColumnFamily container, Iterator<Cell> reducedColumns, DecoratedKey key, int gcBefore, long now);
-
-    public Comparator<Cell> getColumnComparator(CellNameType comparator);
-
-    public boolean isReversed();
-    public void updateColumnsLimit(int newLimit);
-
-    public int getLiveCount(ColumnFamily cf, long now);
-    public ColumnCounter columnCounter(CellNameType comparator, long now);
-
-    public IDiskAtomFilter cloneShallow();
-    public boolean maySelectPrefix(CType type, Composite prefix);
-
-    public boolean shouldInclude(SSTableReader sstable);
-
-    public boolean countCQL3Rows(CellNameType comparator);
-
-    public boolean isHeadFilter();
-
-    /**
-     * Whether the provided cf, that is assumed to contain the head of the
-     * partition, contains enough data to cover this filter.
-     */
-    public boolean isFullyCoveredBy(ColumnFamily cf, long now);
-
-    public static class Serializer implements IVersionedSerializer<IDiskAtomFilter>
-    {
-        private final CellNameType type;
-
-        public Serializer(CellNameType type)
-        {
-            this.type = type;
-        }
-
-        public void serialize(IDiskAtomFilter filter, DataOutputPlus out, int version) throws IOException
-        {
-            if (filter instanceof SliceQueryFilter)
-            {
-                out.writeByte(0);
-                type.sliceQueryFilterSerializer().serialize((SliceQueryFilter)filter, out, version);
-            }
-            else
-            {
-                out.writeByte(1);
-                type.namesQueryFilterSerializer().serialize((NamesQueryFilter)filter, out, version);
-            }
-        }
-
-        public IDiskAtomFilter deserialize(DataInput in, int version) throws IOException
-        {
-            int b = in.readByte();
-            if (b == 0)
-            {
-                return type.sliceQueryFilterSerializer().deserialize(in, version);
-            }
-            else
-            {
-                assert b == 1;
-                return type.namesQueryFilterSerializer().deserialize(in, version);
-            }
-        }
-
-        public long serializedSize(IDiskAtomFilter filter, int version)
-        {
-            int size = 1;
-            if (filter instanceof SliceQueryFilter)
-                size += type.sliceQueryFilterSerializer().serializedSize((SliceQueryFilter)filter, version);
-            else
-                size += type.namesQueryFilterSerializer().serializedSize((NamesQueryFilter)filter, version);
-            return size;
-        }
-    }
-
-    public Iterator<RangeTombstone> getRangeTombstoneIterator(ColumnFamily source);
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
deleted file mode 100644
index c8f63bb..0000000
--- a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
+++ /dev/null
@@ -1,301 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.filter;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.commons.lang3.StringUtils;
-import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.Iterators;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.composites.CType;
-import org.apache.cassandra.io.ISerializer;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.utils.SearchIterator;
-
-public class NamesQueryFilter implements IDiskAtomFilter
-{
-    public final SortedSet<CellName> columns;
-
-    // If true, getLiveCount will always return either 0 or 1. This uses the fact that we know 
-    // CQL3 will never use a name filter with cell names spanning multiple CQL3 rows.
-    private final boolean countCQL3Rows;
-
-    public NamesQueryFilter(SortedSet<CellName> columns)
-    {
-        this(columns, false);
-    }
-
-    public NamesQueryFilter(SortedSet<CellName> columns, boolean countCQL3Rows)
-    {
-        this.columns = columns;
-        this.countCQL3Rows = countCQL3Rows;
-    }
-
-    public NamesQueryFilter cloneShallow()
-    {
-        // NQF is immutable as far as shallow cloning is concerned, so save the allocation.
-        return this;
-    }
-
-    public NamesQueryFilter withUpdatedColumns(SortedSet<CellName> newColumns)
-    {
-       return new NamesQueryFilter(newColumns, countCQL3Rows);
-    }
-
-    @SuppressWarnings("unchecked")
-    public Iterator<Cell> getColumnIterator(ColumnFamily cf)
-    {
-        assert cf != null;
-        return (Iterator<Cell>) (Iterator<?>) new ByNameColumnIterator(columns.iterator(), null, cf);
-    }
-
-    public OnDiskAtomIterator getColumnIterator(DecoratedKey key, ColumnFamily cf)
-    {
-        assert cf != null;
-        return new ByNameColumnIterator(columns.iterator(), key, cf);
-    }
-
-    public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, DecoratedKey key)
-    {
-        return sstable.iterator(key, columns);
-    }
-
-    public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry)
-    {
-        return sstable.iterator(file, key, columns, indexEntry);
-    }
-
-    public void collectReducedColumns(ColumnFamily container, Iterator<Cell> reducedColumns, DecoratedKey key, int gcBefore, long now)
-    {
-        DeletionInfo.InOrderTester tester = container.inOrderDeletionTester();
-        while (reducedColumns.hasNext())
-            container.maybeAppendColumn(reducedColumns.next(), tester, gcBefore);
-    }
-
-    public Comparator<Cell> getColumnComparator(CellNameType comparator)
-    {
-        return comparator.columnComparator(false);
-    }
-
-    @Override
-    public String toString()
-    {
-        return "NamesQueryFilter(" +
-               "columns=" + StringUtils.join(columns, ",") +
-               ')';
-    }
-
-    public boolean isReversed()
-    {
-        return false;
-    }
-
-    public void updateColumnsLimit(int newLimit)
-    {
-    }
-
-    public int getLiveCount(ColumnFamily cf, long now)
-    {
-        // Note: we could use columnCounter() but we save the object allocation as it's simple enough
-
-        if (countCQL3Rows)
-            return cf.hasOnlyTombstones(now) ? 0 : 1;
-
-        int count = 0;
-        for (Cell cell : cf)
-        {
-            if (cell.isLive(now))
-                count++;
-        }
-        return count;
-    }
-
-    public boolean maySelectPrefix(CType type, Composite prefix)
-    {
-        for (CellName column : columns)
-        {
-            if (prefix.isPrefixOf(type, column))
-                return true;
-        }
-        return false;
-    }
-
-    public boolean shouldInclude(SSTableReader sstable)
-    {
-        return true;
-    }
-
-    public boolean isFullyCoveredBy(ColumnFamily cf, long now)
-    {
-        // cf will cover all the requested columns if the range it covers include
-        // all said columns
-        CellName first = cf.iterator(ColumnSlice.ALL_COLUMNS_ARRAY).next().name();
-        CellName last = cf.reverseIterator(ColumnSlice.ALL_COLUMNS_ARRAY).next().name();
-
-        return cf.getComparator().compare(first, columns.first()) <= 0
-            && cf.getComparator().compare(columns.last(), last) <= 0;
-    }
-
-    public boolean isHeadFilter()
-    {
-        return false;
-    }
-
-    public boolean countCQL3Rows(CellNameType comparator)
-    {
-        return countCQL3Rows;
-    }
-
-    public boolean countCQL3Rows()
-    {
-        return countCQL3Rows(null);
-    }
-
-    public ColumnCounter columnCounter(CellNameType comparator, long now)
-    {
-        return countCQL3Rows
-             ? new ColumnCounter.GroupByPrefix(now, null, 0)
-             : new ColumnCounter(now);
-    }
-
-    private static class ByNameColumnIterator extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator
-    {
-        private final ColumnFamily cf;
-        private final DecoratedKey key;
-        private final Iterator<CellName> names;
-        private final SearchIterator<CellName, Cell> cells;
-
-        public ByNameColumnIterator(Iterator<CellName> names, DecoratedKey key, ColumnFamily cf)
-        {
-            this.names = names;
-            this.cf = cf;
-            this.key = key;
-            this.cells = cf.searchIterator();
-        }
-
-        protected OnDiskAtom computeNext()
-        {
-            while (names.hasNext() && cells.hasNext())
-            {
-                CellName current = names.next();
-                Cell cell = cells.next(current);
-                if (cell != null)
-                    return cell;
-            }
-            return endOfData();
-        }
-
-        public ColumnFamily getColumnFamily()
-        {
-            return cf;
-        }
-
-        public DecoratedKey getKey()
-        {
-            return key;
-        }
-
-        public void close() throws IOException { }
-    }
-
-    public static class Serializer implements IVersionedSerializer<NamesQueryFilter>
-    {
-        private CellNameType type;
-
-        public Serializer(CellNameType type)
-        {
-            this.type = type;
-        }
-
-        public void serialize(NamesQueryFilter f, DataOutputPlus out, int version) throws IOException
-        {
-            out.writeInt(f.columns.size());
-            ISerializer<CellName> serializer = type.cellSerializer();
-            for (CellName cName : f.columns)
-            {
-                serializer.serialize(cName, out);
-            }
-            out.writeBoolean(f.countCQL3Rows);
-        }
-
-        public NamesQueryFilter deserialize(DataInput in, int version) throws IOException
-        {
-            int size = in.readInt();
-            SortedSet<CellName> columns = new TreeSet<>(type);
-            ISerializer<CellName> serializer = type.cellSerializer();
-            for (int i = 0; i < size; ++i)
-                columns.add(serializer.deserialize(in));
-            boolean countCQL3Rows = in.readBoolean();
-            return new NamesQueryFilter(columns, countCQL3Rows);
-        }
-
-        public long serializedSize(NamesQueryFilter f, int version)
-        {
-            TypeSizes sizes = TypeSizes.NATIVE;
-            int size = sizes.sizeof(f.columns.size());
-            ISerializer<CellName> serializer = type.cellSerializer();
-            for (CellName cName : f.columns)
-                size += serializer.serializedSize(cName, sizes);
-            size += sizes.sizeof(f.countCQL3Rows);
-            return size;
-        }
-    }
-
-    public Iterator<RangeTombstone> getRangeTombstoneIterator(final ColumnFamily source)
-    {
-        if (!source.deletionInfo().hasRanges())
-            return Iterators.emptyIterator();
-
-        return new AbstractIterator<RangeTombstone>()
-        {
-            private final Iterator<CellName> names = columns.iterator();
-            private RangeTombstone lastFindRange;
-
-            protected RangeTombstone computeNext()
-            {
-                while (names.hasNext())
-                {
-                    CellName next = names.next();
-                    if (lastFindRange != null && lastFindRange.includes(source.getComparator(), next))
-                        return lastFindRange;
-
-                    // We keep the last range around as since names are in sort order, it's
-                    // possible it will match the next name too.
-                    lastFindRange = source.deletionInfo().rangeCovering(next);
-                    if (lastFindRange != null)
-                        return lastFindRange;
-                }
-                return endOfData();
-            }
-        };
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/filter/QueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/QueryFilter.java b/src/java/org/apache/cassandra/db/filter/QueryFilter.java
deleted file mode 100644
index 15ee33d..0000000
--- a/src/java/org/apache/cassandra/db/filter/QueryFilter.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.filter;
-
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.SortedSet;
-
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.DeletionInfo;
-import org.apache.cassandra.db.OnDiskAtom;
-import org.apache.cassandra.db.RangeTombstone;
-import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.utils.MergeIterator;
-
-public class QueryFilter
-{
-    public final DecoratedKey key;
-    public final String cfName;
-    public final IDiskAtomFilter filter;
-    public final long timestamp;
-
-    public QueryFilter(DecoratedKey key, String cfName, IDiskAtomFilter filter, long timestamp)
-    {
-        this.key = key;
-        this.cfName = cfName;
-        this.filter = filter;
-        this.timestamp = timestamp;
-    }
-
-    public Iterator<Cell> getIterator(ColumnFamily cf)
-    {
-        assert cf != null;
-        return filter.getColumnIterator(cf);
-    }
-
-    public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable)
-    {
-        return filter.getSSTableColumnIterator(sstable, key);
-    }
-
-    public void collateOnDiskAtom(ColumnFamily returnCF,
-                                  List<? extends Iterator<? extends OnDiskAtom>> toCollate,
-                                  int gcBefore)
-    {
-        collateOnDiskAtom(returnCF, toCollate, filter, this.key, gcBefore, timestamp);
-    }
-
-    public static void collateOnDiskAtom(ColumnFamily returnCF,
-                                         List<? extends Iterator<? extends OnDiskAtom>> toCollate,
-                                         IDiskAtomFilter filter,
-                                         DecoratedKey key,
-                                         int gcBefore,
-                                         long timestamp)
-    {
-        List<Iterator<Cell>> filteredIterators = new ArrayList<>(toCollate.size());
-        for (Iterator<? extends OnDiskAtom> iter : toCollate)
-            filteredIterators.add(gatherTombstones(returnCF, iter));
-        collateColumns(returnCF, filteredIterators, filter, key, gcBefore, timestamp);
-    }
-
-    // When there is only a single source of atoms, we can skip the collate step
-    public void collateOnDiskAtom(ColumnFamily returnCF, Iterator<? extends OnDiskAtom> toCollate, int gcBefore)
-    {
-        filter.collectReducedColumns(returnCF, gatherTombstones(returnCF, toCollate), this.key, gcBefore, timestamp);
-    }
-
-    public void collateColumns(ColumnFamily returnCF, List<? extends Iterator<Cell>> toCollate, int gcBefore)
-    {
-        collateColumns(returnCF, toCollate, filter, this.key, gcBefore, timestamp);
-    }
-
-    public static void collateColumns(ColumnFamily returnCF,
-                                      List<? extends Iterator<Cell>> toCollate,
-                                      IDiskAtomFilter filter,
-                                      DecoratedKey key,
-                                      int gcBefore,
-                                      long timestamp)
-    {
-        Comparator<Cell> comparator = filter.getColumnComparator(returnCF.getComparator());
-
-        Iterator<Cell> reduced = toCollate.size() == 1
-                               ? toCollate.get(0)
-                               : MergeIterator.get(toCollate, comparator, getReducer(comparator));
-
-        filter.collectReducedColumns(returnCF, reduced, key, gcBefore, timestamp);
-    }
-
-    private static MergeIterator.Reducer<Cell, Cell> getReducer(final Comparator<Cell> comparator)
-    {
-        // define a 'reduced' iterator that merges columns w/ the same name, which
-        // greatly simplifies computing liveColumns in the presence of tombstones.
-        return new MergeIterator.Reducer<Cell, Cell>()
-        {
-            Cell current;
-
-            public void reduce(Cell next)
-            {
-                assert current == null || comparator.compare(current, next) == 0;
-                current = current == null ? next : current.reconcile(next);
-            }
-
-            protected Cell getReduced()
-            {
-                assert current != null;
-                Cell toReturn = current;
-                current = null;
-                return toReturn;
-            }
-
-            @Override
-            public boolean trivialReduceIsTrivial()
-            {
-                return true;
-            }
-        };
-    }
-
-    /**
-     * Given an iterator of on disk atom, returns an iterator that filters the tombstone range
-     * markers adding them to {@code returnCF} and returns the normal column.
-     */
-    public static Iterator<Cell> gatherTombstones(final ColumnFamily returnCF, final Iterator<? extends OnDiskAtom> iter)
-    {
-        return new Iterator<Cell>()
-        {
-            private Cell next;
-
-            public boolean hasNext()
-            {
-                if (next != null)
-                    return true;
-
-                getNext();
-                return next != null;
-            }
-
-            public Cell next()
-            {
-                if (next == null)
-                    getNext();
-
-                assert next != null;
-                Cell toReturn = next;
-                next = null;
-                return toReturn;
-            }
-
-            private void getNext()
-            {
-                while (iter.hasNext())
-                {
-                    OnDiskAtom atom = iter.next();
-
-                    if (atom instanceof Cell)
-                    {
-                        next = (Cell)atom;
-                        break;
-                    }
-                    else
-                    {
-                        returnCF.addAtom(atom);
-                    }
-                }
-            }
-
-            public void remove()
-            {
-                throw new UnsupportedOperationException();
-            }
-        };
-    }
-
-    public String getColumnFamilyName()
-    {
-        return cfName;
-    }
-
-    /**
-     * @return a QueryFilter object to satisfy the given slice criteria:
-     * @param key the row to slice
-     * @param cfName column family to query
-     * @param start column to start slice at, inclusive; empty for "the first column"
-     * @param finish column to stop slice at, inclusive; empty for "the last column"
-     * @param reversed true to start with the largest column (as determined by configured sort order) instead of smallest
-     * @param limit maximum number of non-deleted columns to return
-     * @param timestamp time to use for determining expiring columns' state
-     */
-    public static QueryFilter getSliceFilter(DecoratedKey key,
-                                             String cfName,
-                                             Composite start,
-                                             Composite finish,
-                                             boolean reversed,
-                                             int limit,
-                                             long timestamp)
-    {
-        return new QueryFilter(key, cfName, new SliceQueryFilter(start, finish, reversed, limit), timestamp);
-    }
-
-    /**
-     * return a QueryFilter object that includes every column in the row.
-     * This is dangerous on large rows; avoid except for test code.
-     */
-    public static QueryFilter getIdentityFilter(DecoratedKey key, String cfName, long timestamp)
-    {
-        return new QueryFilter(key, cfName, new IdentityQueryFilter(), timestamp);
-    }
-
-    /**
-     * @return a QueryFilter object that will return columns matching the given names
-     * @param key the row to slice
-     * @param cfName column family to query
-     * @param columns the column names to restrict the results to, sorted in comparator order
-     */
-    public static QueryFilter getNamesFilter(DecoratedKey key, String cfName, SortedSet<CellName> columns, long timestamp)
-    {
-        return new QueryFilter(key, cfName, new NamesQueryFilter(columns), timestamp);
-    }
-
-    @Override
-    public String toString()
-    {
-        return getClass().getSimpleName() + "(key=" + key + ", cfName=" + cfName + (filter == null ? "" : ", filter=" + filter) + ")";
-    }
-
-    public boolean shouldInclude(SSTableReader sstable)
-    {
-        return filter.shouldInclude(sstable);
-    }
-
-    public void delete(DeletionInfo target, ColumnFamily source)
-    {
-        target.add(source.deletionInfo().getTopLevelDeletion());
-        // source is the CF currently in the memtable, and it can be large compared to what the filter selects,
-        // so only consider those range tombstones that the filter do select.
-        for (Iterator<RangeTombstone> iter = filter.getRangeTombstoneIterator(source); iter.hasNext(); )
-            target.add(iter.next(), source.getComparator());
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/filter/RowFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java b/src/java/org/apache/cassandra/db/filter/RowFilter.java
new file mode 100644
index 0000000..aff8d16
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java
@@ -0,0 +1,784 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.filter;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.base.Objects;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.cql3.statements.RequestValidations.*;
+
+/**
+ * A filter on which rows a given query should include or exclude.
+ * <p>
+ * This corresponds to the restrictions on rows that are not handled by the query
+ * {@link ClusteringIndexFilter}. Some of the expressions of this filter may
+ * be handled by a 2ndary index, and the rest is simply filtered out from the
+ * result set (the later can only happen if the query was using ALLOW FILTERING).
+ */
+public abstract class RowFilter implements Iterable<RowFilter.Expression>
+{
+    public static final Serializer serializer = new Serializer();
+    public static final RowFilter NONE = new CQLFilter(Collections.<Expression>emptyList());
+
+    protected final List<Expression> expressions;
+
+    protected RowFilter(List<Expression> expressions)
+    {
+        this.expressions = expressions;
+    }
+
+    public static RowFilter create()
+    {
+        return new CQLFilter(new ArrayList<Expression>());
+    }
+
+    public static RowFilter create(int capacity)
+    {
+        return new CQLFilter(new ArrayList<Expression>(capacity));
+    }
+
+    public static RowFilter forThrift(int capacity)
+    {
+        return new ThriftFilter(new ArrayList<Expression>(capacity));
+    }
+
+    public void add(ColumnDefinition def, Operator op, ByteBuffer value)
+    {
+        expressions.add(new SimpleExpression(def, op, value));
+    }
+
+    public void addMapEquality(ColumnDefinition def, ByteBuffer key, Operator op, ByteBuffer value)
+    {
+        expressions.add(new MapEqualityExpression(def, key, op, value));
+    }
+
+    public void addThriftExpression(CFMetaData metadata, ByteBuffer name, Operator op, ByteBuffer value)
+    {
+        assert (this instanceof ThriftFilter);
+        expressions.add(new ThriftExpression(metadata, name, op, value));
+    }
+
+    /**
+     * Filters the provided iterator so that only the row satisfying the expression of this filter
+     * are included in the resulting iterator.
+     *
+     * @param iter the iterator to filter
+     * @param nowInSec the time of query in seconds.
+     * @return the filtered iterator.
+     */
+    public abstract UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec);
+
+    /**
+     * Returns this filter but without the provided expression. This method
+     * *assumes* that the filter contains the provided expression.
+     */
+    public RowFilter without(Expression expression)
+    {
+        assert expressions.contains(expression);
+        if (expressions.size() == 1)
+            return RowFilter.NONE;
+
+        List<Expression> newExpressions = new ArrayList<>(expressions.size() - 1);
+        for (Expression e : expressions)
+            if (!e.equals(expression))
+                newExpressions.add(e);
+
+        return withNewExpressions(newExpressions);
+    }
+
+    protected abstract RowFilter withNewExpressions(List<Expression> expressions);
+
+    public boolean isEmpty()
+    {
+        return expressions.isEmpty();
+    }
+
+    public Iterator<Expression> iterator()
+    {
+        return expressions.iterator();
+    }
+
+    private static Clustering makeCompactClustering(CFMetaData metadata, ByteBuffer name)
+    {
+        assert metadata.isCompactTable();
+        if (metadata.isCompound())
+        {
+            List<ByteBuffer> values = CompositeType.splitName(name);
+            return new SimpleClustering(values.toArray(new ByteBuffer[metadata.comparator.size()]));
+        }
+        else
+        {
+            return new SimpleClustering(name);
+        }
+    }
+
+    @Override
+    public String toString()
+    {
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < expressions.size(); i++)
+        {
+            if (i > 0)
+                sb.append(" AND ");
+            sb.append(expressions.get(i));
+        }
+        return sb.toString();
+    }
+
+    private static class CQLFilter extends RowFilter
+    {
+        private CQLFilter(List<Expression> expressions)
+        {
+            super(expressions);
+        }
+
+        public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, final int nowInSec)
+        {
+            if (expressions.isEmpty())
+                return iter;
+
+            return new WrappingUnfilteredPartitionIterator(iter)
+            {
+                @Override
+                public UnfilteredRowIterator computeNext(final UnfilteredRowIterator iter)
+                {
+                    return new FilteringRowIterator(iter)
+                    {
+                        // We filter tombstones when passing the row to isSatisfiedBy so that the method doesn't have to bother with them.
+                        // (we should however not filter them in the output of the method, hence it's not used as row filter for the
+                        // FilteringRowIterator)
+                        private final TombstoneFilteringRow filter = new TombstoneFilteringRow(nowInSec);
+
+                        protected boolean includeRow(Row row)
+                        {
+                            return CQLFilter.this.isSatisfiedBy(iter.partitionKey(), filter.setTo(row));
+                        }
+                    };
+                }
+            };
+        }
+
+        /**
+         * Returns whether the provided row (with it's partition key) satisfies
+         * this row filter or not (that is, if it satisfies all of its expressions).
+         */
+        private boolean isSatisfiedBy(DecoratedKey partitionKey, Row row)
+        {
+            for (Expression e : expressions)
+                if (!e.isSatisfiedBy(partitionKey, row))
+                    return false;
+
+            return true;
+        }
+
+        protected RowFilter withNewExpressions(List<Expression> expressions)
+        {
+            return new CQLFilter(expressions);
+        }
+    }
+
+    private static class ThriftFilter extends RowFilter
+    {
+        private ThriftFilter(List<Expression> expressions)
+        {
+            super(expressions);
+        }
+
+        public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, final int nowInSec)
+        {
+            if (expressions.isEmpty())
+                return iter;
+
+            return new WrappingUnfilteredPartitionIterator(iter)
+            {
+                @Override
+                public UnfilteredRowIterator computeNext(final UnfilteredRowIterator iter)
+                {
+                    // Thrift does not filter rows, it filters entire partition if any of the expression is not
+                    // satisfied, which forces us to materialize the result (in theory we could materialize only
+                    // what we need which might or might not be everything, but we keep it simple since in practice
+                    // it's not worth that it has ever been).
+                    ArrayBackedPartition result = ArrayBackedPartition.create(iter);
+
+                    // The partition needs to have a row for every expression, and the expression needs to be valid.
+                    for (Expression expr : expressions)
+                    {
+                        assert expr instanceof ThriftExpression;
+                        Row row = result.getRow(makeCompactClustering(iter.metadata(), expr.column().name.bytes));
+                        if (row == null || !expr.isSatisfiedBy(iter.partitionKey(), row))
+                            return null;
+                    }
+                    // If we get there, it means all expressions where satisfied, so return the original result
+                    return result.unfilteredIterator();
+                }
+            };
+        }
+
+        protected RowFilter withNewExpressions(List<Expression> expressions)
+        {
+            return new ThriftFilter(expressions);
+        }
+    }
+
+    public static abstract class Expression
+    {
+        private static final Serializer serializer = new Serializer();
+
+        // Note: the order of this enum matter, it's used for serialization
+        protected enum Kind { SIMPLE, MAP_EQUALITY, THRIFT_DYN_EXPR }
+
+        abstract Kind kind();
+        protected final ColumnDefinition column;
+        protected final Operator operator;
+        protected final ByteBuffer value;
+
+        protected Expression(ColumnDefinition column, Operator operator, ByteBuffer value)
+        {
+            this.column = column;
+            this.operator = operator;
+            this.value = value;
+        }
+
+        public ColumnDefinition column()
+        {
+            return column;
+        }
+
+        public Operator operator()
+        {
+            return operator;
+        }
+
+        /**
+         * Checks if the operator of this <code>IndexExpression</code> is a <code>CONTAINS</code> operator.
+         *
+         * @return <code>true</code> if the operator of this <code>IndexExpression</code> is a <code>CONTAINS</code>
+         * operator, <code>false</code> otherwise.
+         */
+        public boolean isContains()
+        {
+            return Operator.CONTAINS == operator;
+        }
+
+        /**
+         * Checks if the operator of this <code>IndexExpression</code> is a <code>CONTAINS_KEY</code> operator.
+         *
+         * @return <code>true</code> if the operator of this <code>IndexExpression</code> is a <code>CONTAINS_KEY</code>
+         * operator, <code>false</code> otherwise.
+         */
+        public boolean isContainsKey()
+        {
+            return Operator.CONTAINS_KEY == operator;
+        }
+
+        /**
+         * If this expression is used to query an index, the value to use as
+         * partition key for that index query.
+         */
+        public ByteBuffer getIndexValue()
+        {
+            return value;
+        }
+
+        public void validateForIndexing() throws InvalidRequestException
+        {
+            checkNotNull(value, "Unsupported null value for indexed column %s", column.name);
+            checkBindValueSet(value, "Unsupported unset value for indexed column %s", column.name);
+            checkFalse(value.remaining() > FBUtilities.MAX_UNSIGNED_SHORT, "Index expression values may not be larger than 64K");
+        }
+
+        /**
+         * Returns whether the provided row satisfied this expression or not.
+         *
+         * @param partitionKey the partition key for row to check.
+         * @param row the row to check. It should *not* contain deleted cells
+         * (i.e. it should come from a RowIterator).
+         * @return whether the row is satisfied by this expression.
+         */
+        public abstract boolean isSatisfiedBy(DecoratedKey partitionKey, Row row);
+
+        protected ByteBuffer getValue(DecoratedKey partitionKey, Row row)
+        {
+            switch (column.kind)
+            {
+                case PARTITION_KEY:
+                    return column.isOnAllComponents()
+                         ? partitionKey.getKey()
+                         : CompositeType.extractComponent(partitionKey.getKey(), column.position());
+                case CLUSTERING_COLUMN:
+                    return row.clustering().get(column.position());
+                default:
+                    Cell cell = row.getCell(column);
+                    return cell == null ? null : cell.value();
+            }
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o)
+                return true;
+
+            if (!(o instanceof Expression))
+                return false;
+
+            Expression that = (Expression)o;
+
+            return Objects.equal(this.kind(), that.kind())
+                && Objects.equal(this.column.name, that.column.name)
+                && Objects.equal(this.operator, that.operator)
+                && Objects.equal(this.value, that.value);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return Objects.hashCode(column.name, operator, value);
+        }
+
+        private static class Serializer
+        {
+            public void serialize(Expression expression, DataOutputPlus out, int version) throws IOException
+            {
+                ByteBufferUtil.writeWithShortLength(expression.column.name.bytes, out);
+                expression.operator.writeTo(out);
+
+                if (version >= MessagingService.VERSION_30)
+                    out.writeByte(expression.kind().ordinal());
+
+                switch (expression.kind())
+                {
+                    case SIMPLE:
+                        ByteBufferUtil.writeWithShortLength(((SimpleExpression)expression).value, out);
+                        break;
+                    case MAP_EQUALITY:
+                        MapEqualityExpression mexpr = (MapEqualityExpression)expression;
+                        if (version < MessagingService.VERSION_30)
+                        {
+                            ByteBufferUtil.writeWithShortLength(mexpr.getIndexValue(), out);
+                        }
+                        else
+                        {
+                            ByteBufferUtil.writeWithShortLength(mexpr.key, out);
+                            ByteBufferUtil.writeWithShortLength(mexpr.value, out);
+                        }
+                        break;
+                    case THRIFT_DYN_EXPR:
+                        ByteBufferUtil.writeWithShortLength(((ThriftExpression)expression).value, out);
+                        break;
+                }
+            }
+
+            public Expression deserialize(DataInput in, int version, CFMetaData metadata) throws IOException
+            {
+                ByteBuffer name = ByteBufferUtil.readWithShortLength(in);
+                Operator operator = Operator.readFrom(in);
+
+                ColumnDefinition column = metadata.getColumnDefinition(name);
+                if (!metadata.isCompactTable() && column == null)
+                    throw new RuntimeException("Unknown (or dropped) column " + UTF8Type.instance.getString(name) + " during deserialization");
+
+                Kind kind;
+                if (version >= MessagingService.VERSION_30)
+                {
+                    kind = Kind.values()[in.readByte()];
+                }
+                else
+                {
+                    if (column == null)
+                        kind = Kind.THRIFT_DYN_EXPR;
+                    else if (column.type instanceof MapType && operator == Operator.EQ)
+                        kind = Kind.MAP_EQUALITY;
+                    else
+                        kind = Kind.SIMPLE;
+                }
+
+                switch (kind)
+                {
+                    case SIMPLE:
+                        return new SimpleExpression(column, operator, ByteBufferUtil.readWithShortLength(in));
+                    case MAP_EQUALITY:
+                        ByteBuffer key, value;
+                        if (version < MessagingService.VERSION_30)
+                        {
+                            ByteBuffer composite = ByteBufferUtil.readWithShortLength(in);
+                            key = CompositeType.extractComponent(composite, 0);
+                            value = CompositeType.extractComponent(composite, 0);
+                        }
+                        else
+                        {
+                            key = ByteBufferUtil.readWithShortLength(in);
+                            value = ByteBufferUtil.readWithShortLength(in);
+                        }
+                        return new MapEqualityExpression(column, key, operator, value);
+                    case THRIFT_DYN_EXPR:
+                        return new ThriftExpression(metadata, name, operator, ByteBufferUtil.readWithShortLength(in));
+                }
+                throw new AssertionError();
+            }
+
+            public long serializedSize(Expression expression, int version)
+            {
+                TypeSizes sizes = TypeSizes.NATIVE;
+                long size = ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes, sizes)
+                          + expression.operator.serializedSize();
+
+                switch (expression.kind())
+                {
+                    case SIMPLE:
+                        size += ByteBufferUtil.serializedSizeWithShortLength(((SimpleExpression)expression).value, sizes);
+                        break;
+                    case MAP_EQUALITY:
+                        MapEqualityExpression mexpr = (MapEqualityExpression)expression;
+                        if (version < MessagingService.VERSION_30)
+                            size += ByteBufferUtil.serializedSizeWithShortLength(mexpr.getIndexValue(), sizes);
+                        else
+                            size += ByteBufferUtil.serializedSizeWithShortLength(mexpr.key, sizes)
+                                  + ByteBufferUtil.serializedSizeWithShortLength(mexpr.value, sizes);
+                        break;
+                    case THRIFT_DYN_EXPR:
+                        size += ByteBufferUtil.serializedSizeWithShortLength(((ThriftExpression)expression).value, sizes);
+                        break;
+                }
+                return size;
+            }
+        }
+    }
+
+    /**
+     * An expression of the form 'column' 'op' 'value'.
+     */
+    private static class SimpleExpression extends Expression
+    {
+        public SimpleExpression(ColumnDefinition column, Operator operator, ByteBuffer value)
+        {
+            super(column, operator, value);
+        }
+
+        public boolean isSatisfiedBy(DecoratedKey partitionKey, Row row)
+        {
+            // We support null conditions for LWT (in ColumnCondition) but not for RowFilter.
+            // TODO: we should try to merge both code someday.
+            assert value != null;
+
+            if (row.isStatic() != column.isStatic())
+                return true;
+
+            switch (operator)
+            {
+                case EQ:
+                case LT:
+                case LTE:
+                case GTE:
+                case GT:
+                case NEQ:
+                    {
+                        assert !column.isComplex() : "Only CONTAINS and CONTAINS_KEY are supported for 'complex' types";
+                        ByteBuffer foundValue = getValue(partitionKey, row);
+                        // Note that CQL expression are always of the form 'x < 4', i.e. the tested value is on the left.
+                        return foundValue != null && operator.isSatisfiedBy(column.type, foundValue, value);
+                    }
+                case CONTAINS:
+                    assert column.type.isCollection();
+                    CollectionType<?> type = (CollectionType<?>)column.type;
+                    if (column.isComplex())
+                    {
+                        Iterator<Cell> iter = row.getCells(column);
+                        while (iter.hasNext())
+                        {
+                            Cell cell = iter.next();
+                            if (type.kind == CollectionType.Kind.SET)
+                            {
+                                if (type.nameComparator().compare(cell.path().get(0), value) == 0)
+                                    return true;
+                            }
+                            else
+                            {
+                                if (type.valueComparator().compare(cell.value(), value) == 0)
+                                    return true;
+                            }
+                        }
+                        return false;
+                    }
+                    else
+                    {
+                        ByteBuffer foundValue = getValue(partitionKey, row);
+                        if (foundValue == null)
+                            return false;
+
+                        switch (type.kind)
+                        {
+                            case LIST:
+                                ListType<?> listType = (ListType<?>)type;
+                                return listType.compose(foundValue).contains(listType.getElementsType().compose(value));
+                            case SET:
+                                SetType<?> setType = (SetType<?>)type;
+                                return setType.compose(foundValue).contains(setType.getElementsType().compose(value));
+                            case MAP:
+                                MapType<?,?> mapType = (MapType<?, ?>)type;
+                                return mapType.compose(foundValue).containsValue(mapType.getValuesType().compose(value));
+                        }
+                        throw new AssertionError();
+                    }
+                case CONTAINS_KEY:
+                    assert column.type.isCollection() && column.type instanceof MapType;
+                    MapType<?, ?> mapType = (MapType<?, ?>)column.type;
+                    if (column.isComplex())
+                    {
+                         return row.getCell(column, CellPath.create(value)) != null;
+                    }
+                    else
+                    {
+                        ByteBuffer foundValue = getValue(partitionKey, row);
+                        return foundValue != null && mapType.getSerializer().getSerializedValue(foundValue, value, mapType.getKeysType()) != null;
+                    }
+
+                case IN:
+                    // It wouldn't be terribly hard to support this (though doing so would imply supporting
+                    // IN for 2ndary index) but currently we don't.
+                    throw new AssertionError();
+            }
+            throw new AssertionError();
+        }
+
+        @Override
+        public String toString()
+        {
+            AbstractType<?> type = column.type;
+            switch (operator)
+            {
+                case CONTAINS:
+                    assert type instanceof CollectionType;
+                    CollectionType<?> ct = (CollectionType<?>)type;
+                    type = ct.kind == CollectionType.Kind.SET ? ct.nameComparator() : ct.valueComparator();
+                    break;
+                case CONTAINS_KEY:
+                    assert type instanceof MapType;
+                    type = ((MapType<?, ?>)type).nameComparator();
+                    break;
+                case IN:
+                    type = ListType.getInstance(type, false);
+                    break;
+                default:
+                    break;
+            }
+            return String.format("%s %s %s", column.name, operator, type.getString(value));
+        }
+
+        @Override
+        Kind kind()
+        {
+            return Kind.SIMPLE;
+        }
+    }
+
+    /**
+     * An expression of the form 'column' ['key'] = 'value' (which is only
+     * supported when 'column' is a map).
+     */
+    private static class MapEqualityExpression extends Expression
+    {
+        private final ByteBuffer key;
+
+        public MapEqualityExpression(ColumnDefinition column, ByteBuffer key, Operator operator, ByteBuffer value)
+        {
+            super(column, operator, value);
+            assert column.type instanceof MapType && operator == Operator.EQ;
+            this.key = key;
+        }
+
+        @Override
+        public void validateForIndexing() throws InvalidRequestException
+        {
+            super.validateForIndexing();
+            checkNotNull(key, "Unsupported null value for key of map column %s", column.name);
+            checkBindValueSet(key, "Unsupported unset value for key of map column %s", column.name);
+        }
+
+        @Override
+        public ByteBuffer getIndexValue()
+        {
+            return CompositeType.build(key, value);
+        }
+
+        public boolean isSatisfiedBy(DecoratedKey partitionKey, Row row)
+        {
+            assert key != null;
+            // We support null conditions for LWT (in ColumnCondition) but not for RowFilter.
+            // TODO: we should try to merge both code someday.
+            assert value != null;
+
+            if (row.isStatic() != column.isStatic())
+                return true;
+
+            MapType<?, ?> mt = (MapType<?, ?>)column.type;
+            if (column.isComplex())
+            {
+                Cell cell = row.getCell(column, CellPath.create(key));
+                return cell != null && mt.valueComparator().compare(cell.value(), value) == 0;
+            }
+            else
+            {
+                ByteBuffer serializedMap = getValue(partitionKey, row);
+                if (serializedMap == null)
+                    return false;
+
+                ByteBuffer foundValue = mt.getSerializer().getSerializedValue(serializedMap, key, mt.getKeysType());
+                return foundValue != null && mt.valueComparator().compare(foundValue, value) == 0;
+            }
+        }
+
+        @Override
+        public String toString()
+        {
+            MapType<?, ?> mt = (MapType<?, ?>)column.type;
+            return String.format("%s[%s] = %s", column.name, mt.nameComparator().getString(key), mt.valueComparator().getString(value));
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o)
+                return true;
+
+            if (!(o instanceof MapEqualityExpression))
+                return false;
+
+            MapEqualityExpression that = (MapEqualityExpression)o;
+
+            return Objects.equal(this.column.name, that.column.name)
+                && Objects.equal(this.operator, that.operator)
+                && Objects.equal(this.key, that.key)
+                && Objects.equal(this.value, that.value);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return Objects.hashCode(column.name, operator, key, value);
+        }
+
+        @Override
+        Kind kind()
+        {
+            return Kind.MAP_EQUALITY;
+        }
+    }
+
+    /**
+     * An expression of the form 'name' = 'value', but where 'name' is actually the
+     * clustering value for a compact table. This is only for thrift.
+     */
+    private static class ThriftExpression extends Expression
+    {
+        private final CFMetaData metadata;
+
+        public ThriftExpression(CFMetaData metadata, ByteBuffer name, Operator operator, ByteBuffer value)
+        {
+            super(makeDefinition(metadata, name), operator, value);
+            assert metadata.isCompactTable();
+            this.metadata = metadata;
+        }
+
+        private static ColumnDefinition makeDefinition(CFMetaData metadata, ByteBuffer name)
+        {
+            ColumnDefinition def = metadata.getColumnDefinition(name);
+            if (def != null)
+                return def;
+
+            // In thrift, we actually allow expression on non-defined columns for the sake of filtering. To accomodate
+            // this we create a "fake" definition. This is messy but it works so is probably good enough.
+            return ColumnDefinition.regularDef(metadata, name, metadata.compactValueColumn().type, null);
+        }
+
+        public boolean isSatisfiedBy(DecoratedKey partitionKey, Row row)
+        {
+            assert value != null;
+
+            // On thrift queries, even if the column expression is a "static" one, we'll have convert it as a "dynamic"
+            // one in ThriftResultsMerger, so we always expect it to be a dynamic one. Further, we expect this is only
+            // called when the row clustering does match the column (see ThriftFilter above).
+            assert row.clustering().equals(makeCompactClustering(metadata, column.name.bytes));
+            Cell cell = row.getCell(metadata.compactValueColumn());
+            return cell != null && operator.isSatisfiedBy(column.type, cell.value(), value);
+        }
+
+        @Override
+        public String toString()
+        {
+            return String.format("%s %s %s", column.name, operator, column.type.getString(value));
+        }
+
+        @Override
+        Kind kind()
+        {
+            return Kind.THRIFT_DYN_EXPR;
+        }
+    }
+
+    public static class Serializer
+    {
+        public void serialize(RowFilter filter, DataOutputPlus out, int version) throws IOException
+        {
+            out.writeBoolean(filter instanceof ThriftFilter);
+            out.writeShort(filter.expressions.size());
+            for (Expression expr : filter.expressions)
+                Expression.serializer.serialize(expr, out, version);
+        }
+
+        public RowFilter deserialize(DataInput in, int version, CFMetaData metadata) throws IOException
+        {
+            boolean forThrift = in.readBoolean();
+            int size = in.readUnsignedShort();
+            List<Expression> expressions = new ArrayList<>(size);
+            for (int i = 0; i < size; i++)
+                expressions.add(Expression.serializer.deserialize(in, version, metadata));
+            return forThrift
+                 ? new ThriftFilter(expressions)
+                 : new CQLFilter(expressions);
+        }
+
+        public long serializedSize(RowFilter filter, int version)
+        {
+            TypeSizes sizes = TypeSizes.NATIVE;
+            long size = 1 // forThrift
+                      + sizes.sizeof((short)filter.expressions.size());
+            for (Expression expr : filter.expressions)
+                size += Expression.serializer.serializedSize(expr, version);
+            return size;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
deleted file mode 100644
index 4571161..0000000
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ /dev/null
@@ -1,583 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.filter;
-
-import java.nio.ByteBuffer;
-import java.io.DataInput;
-import java.io.IOException;
-import java.util.*;
-
-import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.Iterators;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.composites.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.service.ClientWarn;
-import org.apache.cassandra.tracing.Tracing;
-
-public class SliceQueryFilter implements IDiskAtomFilter
-{
-    private static final Logger logger = LoggerFactory.getLogger(SliceQueryFilter.class);
-
-    /**
-     * A special value for compositesToGroup that indicates that partitioned tombstones should not be included in results
-     * or count towards the limit.  See CASSANDRA-8490 for more details on why this is needed (and done this way).
-     **/
-    public static final int IGNORE_TOMBSTONED_PARTITIONS = -2;
-
-    public final ColumnSlice[] slices;
-    public final boolean reversed;
-    public volatile int count;
-    public final int compositesToGroup;
-
-    // Not serialized, just a ack for range slices to find the number of live column counted, even when we group
-    private ColumnCounter columnCounter;
-
-    public SliceQueryFilter(Composite start, Composite finish, boolean reversed, int count)
-    {
-        this(new ColumnSlice(start, finish), reversed, count);
-    }
-
-    public SliceQueryFilter(Composite start, Composite finish, boolean reversed, int count, int compositesToGroup)
-    {
-        this(new ColumnSlice(start, finish), reversed, count, compositesToGroup);
-    }
-
-    public SliceQueryFilter(ColumnSlice slice, boolean reversed, int count)
-    {
-        this(new ColumnSlice[]{ slice }, reversed, count);
-    }
-
-    public SliceQueryFilter(ColumnSlice slice, boolean reversed, int count, int compositesToGroup)
-    {
-        this(new ColumnSlice[]{ slice }, reversed, count, compositesToGroup);
-    }
-
-    /**
-     * Constructor that accepts multiple slices. All slices are assumed to be in the same direction (forward or
-     * reversed).
-     */
-    public SliceQueryFilter(ColumnSlice[] slices, boolean reversed, int count)
-    {
-        this(slices, reversed, count, -1);
-    }
-
-    public SliceQueryFilter(ColumnSlice[] slices, boolean reversed, int count, int compositesToGroup)
-    {
-        this.slices = slices;
-        this.reversed = reversed;
-        this.count = count;
-        this.compositesToGroup = compositesToGroup;
-    }
-
-    public SliceQueryFilter cloneShallow()
-    {
-        return new SliceQueryFilter(slices, reversed, count, compositesToGroup);
-    }
-
-    public SliceQueryFilter withUpdatedCount(int newCount)
-    {
-        return new SliceQueryFilter(slices, reversed, newCount, compositesToGroup);
-    }
-
-    public SliceQueryFilter withUpdatedSlices(ColumnSlice[] newSlices)
-    {
-        return new SliceQueryFilter(newSlices, reversed, count, compositesToGroup);
-    }
-
-    /** Returns true if the slice includes static columns, false otherwise. */
-    private boolean sliceIncludesStatics(ColumnSlice slice, CFMetaData cfm)
-    {
-        return cfm.hasStaticColumns() &&
-                slice.includes(reversed ? cfm.comparator.reverseComparator() : cfm.comparator, cfm.comparator.staticPrefix().end());
-    }
-
-    public boolean hasStaticSlice(CFMetaData cfm)
-    {
-        for (ColumnSlice slice : slices)
-            if (sliceIncludesStatics(slice, cfm))
-                return true;
-
-        return false;
-    }
-
-    /**
-     * Splits this filter into two SliceQueryFilters: one that slices only the static columns, and one that slices the
-     * remainder of the normal data.
-     *
-     * This should only be called when the filter is reversed and the filter is known to cover static columns (through
-     * hasStaticSlice()).
-     *
-     * @return a pair of (static, normal) SliceQueryFilters
-     */
-    public Pair<SliceQueryFilter, SliceQueryFilter> splitOutStaticSlice(CFMetaData cfm)
-    {
-        assert reversed;
-
-        Composite staticSliceEnd = cfm.comparator.staticPrefix().end();
-        List<ColumnSlice> nonStaticSlices = new ArrayList<>(slices.length);
-        for (ColumnSlice slice : slices)
-        {
-            if (sliceIncludesStatics(slice, cfm))
-                nonStaticSlices.add(new ColumnSlice(slice.start, staticSliceEnd));
-            else
-                nonStaticSlices.add(slice);
-        }
-
-        return Pair.create(
-            new SliceQueryFilter(staticSliceEnd, Composites.EMPTY, true, count, compositesToGroup),
-            new SliceQueryFilter(nonStaticSlices.toArray(new ColumnSlice[nonStaticSlices.size()]), true, count, compositesToGroup));
-    }
-
-    public SliceQueryFilter withUpdatedStart(Composite newStart, CFMetaData cfm)
-    {
-        Comparator<Composite> cmp = reversed ? cfm.comparator.reverseComparator() : cfm.comparator;
-
-        // Check our slices to see if any fall before the new start (in which case they can be removed) or
-        // if they contain the new start (in which case they should start from the page start).  However, if the
-        // slices would include static columns, we need to ensure they are also fetched, and so a separate
-        // slice for the static columns may be required.
-        // Note that if the query is reversed, we can't handle statics by simply adding a separate slice here, so
-        // the reversed case is handled by SliceFromReadCommand instead. See CASSANDRA-8502 for more details.
-        List<ColumnSlice> newSlices = new ArrayList<>();
-        boolean pastNewStart = false;
-        for (ColumnSlice slice : slices)
-        {
-            if (pastNewStart)
-            {
-                newSlices.add(slice);
-                continue;
-            }
-
-            if (slice.isBefore(cmp, newStart))
-            {
-                if (!reversed && sliceIncludesStatics(slice, cfm))
-                    newSlices.add(new ColumnSlice(Composites.EMPTY, cfm.comparator.staticPrefix().end()));
-
-                continue;
-            }
-            else if (slice.includes(cmp, newStart))
-            {
-                if (!reversed && sliceIncludesStatics(slice, cfm) && !newStart.isEmpty())
-                    newSlices.add(new ColumnSlice(Composites.EMPTY, cfm.comparator.staticPrefix().end()));
-
-                newSlices.add(new ColumnSlice(newStart, slice.finish));
-            }
-            else
-            {
-                newSlices.add(slice);
-            }
-
-            pastNewStart = true;
-        }
-        return withUpdatedSlices(newSlices.toArray(new ColumnSlice[newSlices.size()]));
-    }
-
-    public Iterator<Cell> getColumnIterator(ColumnFamily cf)
-    {
-        assert cf != null;
-        return reversed ? cf.reverseIterator(slices) : cf.iterator(slices);
-    }
-
-    public OnDiskAtomIterator getColumnIterator(final DecoratedKey key, final ColumnFamily cf)
-    {
-        assert cf != null;
-        final Iterator<Cell> iter = getColumnIterator(cf);
-
-        return new OnDiskAtomIterator()
-        {
-            public ColumnFamily getColumnFamily()
-            {
-                return cf;
-            }
-
-            public DecoratedKey getKey()
-            {
-                return key;
-            }
-
-            public boolean hasNext()
-            {
-                return iter.hasNext();
-            }
-
-            public OnDiskAtom next()
-            {
-                return iter.next();
-            }
-
-            public void close() throws IOException { }
-
-            public void remove()
-            {
-                throw new UnsupportedOperationException();
-            }
-        };
-    }
-
-    public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, DecoratedKey key)
-    {
-        return sstable.iterator(key, slices, reversed);
-    }
-
-    public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry)
-    {
-        return sstable.iterator(file, key, slices, reversed, indexEntry);
-    }
-
-    public Comparator<Cell> getColumnComparator(CellNameType comparator)
-    {
-        return reversed ? comparator.columnReverseComparator() : comparator.columnComparator(false);
-    }
-
-    public void collectReducedColumns(ColumnFamily container, Iterator<Cell> reducedColumns, DecoratedKey key, int gcBefore, long now)
-    {
-        columnCounter = columnCounter(container.getComparator(), now);
-        DeletionInfo.InOrderTester tester = container.deletionInfo().inOrderTester(reversed);
-
-        while (reducedColumns.hasNext())
-        {
-            Cell cell = reducedColumns.next();
-
-            if (logger.isTraceEnabled())
-                logger.trace("collecting {} of {}: {}", columnCounter.live(), count, cell.getString(container.getComparator()));
-
-            // An expired tombstone will be immediately discarded in memory, and needn't be counted.
-            // Neither should be any cell shadowed by a range- or a partition tombstone.
-            if (cell.getLocalDeletionTime() < gcBefore || !columnCounter.count(cell, tester))
-                continue;
-
-            if (columnCounter.live() > count)
-                break;
-
-            if (respectTombstoneThresholds() && columnCounter.tombstones() > DatabaseDescriptor.getTombstoneFailureThreshold())
-            {
-                Tracing.trace("Scanned over {} tombstones; query aborted (see tombstone_failure_threshold); slices={}",
-                              DatabaseDescriptor.getTombstoneFailureThreshold(), getSlicesInfo(container));
-
-                throw new TombstoneOverwhelmingException(columnCounter.tombstones(),
-                                                         count,
-                                                         container.metadata().ksName,
-                                                         container.metadata().cfName,
-                                                         container.getComparator().getString(cell.name()),
-                                                         getSlicesInfo(container));
-            }
-
-            container.appendColumn(cell);
-        }
-
-        boolean warnTombstones = logger.isWarnEnabled() && respectTombstoneThresholds() && columnCounter.tombstones() > DatabaseDescriptor.getTombstoneWarnThreshold();
-        if (warnTombstones)
-        {
-            String msg = String.format("Read %d live and %d tombstone cells in %s.%s for key: %1.512s (see tombstone_warn_threshold). %d columns were requested, slices=%1.512s",
-                                       columnCounter.live(),
-                                       columnCounter.tombstones(),
-                                       container.metadata().ksName,
-                                       container.metadata().cfName,
-                                       container.metadata().getKeyValidator().getString(key.getKey()),
-                                       count,
-                                       getSlicesInfo(container));
-            ClientWarn.warn(msg);
-            logger.warn(msg);
-        }
-        Tracing.trace("Read {} live and {} tombstone cells{}",
-                      columnCounter.live(),
-                      columnCounter.tombstones(),
-                      warnTombstones ? " (see tombstone_warn_threshold)" : "");
-    }
-
-    private String getSlicesInfo(ColumnFamily container)
-    {
-        StringBuilder sb = new StringBuilder();
-        CellNameType type = container.metadata().comparator;
-        for (ColumnSlice sl : slices)
-        {
-            assert sl != null;
-
-            sb.append('[');
-            sb.append(type.getString(sl.start));
-            sb.append('-');
-            sb.append(type.getString(sl.finish));
-            sb.append(']');
-        }
-        return sb.toString();
-    }
-
-    protected boolean respectTombstoneThresholds()
-    {
-        return true;
-    }
-
-    public int getLiveCount(ColumnFamily cf, long now)
-    {
-        return columnCounter(cf.getComparator(), now).countAll(cf).live();
-    }
-
-    public ColumnCounter columnCounter(CellNameType comparator, long now)
-    {
-        if (compositesToGroup < 0)
-            return new ColumnCounter(now);
-        else if (compositesToGroup == 0)
-            return new ColumnCounter.GroupByPrefix(now, null, 0);
-        else if (reversed)
-            return new ColumnCounter.GroupByPrefixReversed(now, comparator, compositesToGroup);
-        else
-            return new ColumnCounter.GroupByPrefix(now, comparator, compositesToGroup);
-    }
-
-    public void trim(ColumnFamily cf, int trimTo, long now)
-    {
-        // each cell can increment the count by at most one, so if we have fewer cells than trimTo, we can skip trimming
-        if (cf.getColumnCount() < trimTo)
-            return;
-
-        ColumnCounter counter = columnCounter(cf.getComparator(), now);
-
-        Collection<Cell> cells = reversed
-                                   ? cf.getReverseSortedColumns()
-                                   : cf.getSortedColumns();
-
-        DeletionInfo.InOrderTester tester = cf.deletionInfo().inOrderTester(reversed);
-
-        for (Iterator<Cell> iter = cells.iterator(); iter.hasNext(); )
-        {
-            Cell cell = iter.next();
-            counter.count(cell, tester);
-
-            if (counter.live() > trimTo)
-            {
-                iter.remove();
-                while (iter.hasNext())
-                {
-                    iter.next();
-                    iter.remove();
-                }
-            }
-        }
-    }
-
-    public Composite start()
-    {
-        return this.slices[0].start;
-    }
-
-    public Composite finish()
-    {
-        return this.slices[slices.length - 1].finish;
-    }
-
-    public void setStart(Composite start)
-    {
-        assert slices.length == 1;
-        this.slices[0] = new ColumnSlice(start, this.slices[0].finish);
-    }
-
-    public int lastCounted()
-    {
-        // If we have a slice limit set, columnCounter.live() can overcount by one because we have to call
-        // columnCounter.count() before we can tell if we've exceeded the slice limit (and accordingly, should not
-        // add the cells to returned container).  To deal with this overcounting, we take the min of the slice
-        // limit and the counter's count.
-        return columnCounter == null ? 0 : Math.min(columnCounter.live(), count);
-    }
-
-    public int lastTombstones()
-    {
-        return columnCounter == null ? 0 : columnCounter.tombstones();
-    }
-
-    public int lastLive()
-    {
-        return columnCounter == null ? 0 : columnCounter.live();
-    }
-
-    @Override
-    public String toString()
-    {
-        return "SliceQueryFilter [reversed=" + reversed + ", slices=" + Arrays.toString(slices) + ", count=" + count + ", toGroup = " + compositesToGroup + "]";
-    }
-
-    public boolean isReversed()
-    {
-        return reversed;
-    }
-
-    public void updateColumnsLimit(int newLimit)
-    {
-        count = newLimit;
-    }
-
-    public boolean maySelectPrefix(CType type, Composite prefix)
-    {
-        for (ColumnSlice slice : slices)
-            if (slice.includes(type, prefix))
-                return true;
-        return false;
-    }
-
-    public boolean shouldInclude(SSTableReader sstable)
-    {
-        List<ByteBuffer> minColumnNames = sstable.getSSTableMetadata().minColumnNames;
-        List<ByteBuffer> maxColumnNames = sstable.getSSTableMetadata().maxColumnNames;
-        CellNameType comparator = sstable.metadata.comparator;
-
-        if (minColumnNames.isEmpty() || maxColumnNames.isEmpty())
-            return true;
-
-        for (ColumnSlice slice : slices)
-            if (slice.intersects(minColumnNames, maxColumnNames, comparator, reversed))
-                return true;
-
-        return false;
-    }
-
-    public boolean isHeadFilter()
-    {
-        return slices.length == 1 && slices[0].start.isEmpty() && !reversed;
-    }
-
-    public boolean countCQL3Rows(CellNameType comparator)
-    {
-        // If comparator is dense a cell == a CQL3 rows so we're always counting CQL3 rows
-        // in particular. Otherwise, we do so only if we group the cells into CQL rows.
-        return comparator.isDense() || compositesToGroup >= 0;
-    }
-
-    public boolean isFullyCoveredBy(ColumnFamily cf, long now)
-    {
-        // cf is the beginning of a partition. It covers this filter if:
-        //   1) either this filter requests the head of the partition and request less
-        //      than what cf has to offer (note: we do need to use getLiveCount() for that
-        //      as it knows if the filter count cells or CQL3 rows).
-        //   2) the start and finish bound of this filter are included in cf.
-        if (isHeadFilter() && count <= getLiveCount(cf, now))
-            return true;
-
-        if (start().isEmpty() || finish().isEmpty() || !cf.hasColumns())
-            return false;
-
-        Composite low = isReversed() ? finish() : start();
-        Composite high = isReversed() ? start() : finish();
-
-        CellName first = cf.iterator(ColumnSlice.ALL_COLUMNS_ARRAY).next().name();
-        CellName last = cf.reverseIterator(ColumnSlice.ALL_COLUMNS_ARRAY).next().name();
-
-        return cf.getComparator().compare(first, low) <= 0
-            && cf.getComparator().compare(high, last) <= 0;
-    }
-
-    public static class Serializer implements IVersionedSerializer<SliceQueryFilter>
-    {
-        private CType type;
-
-        public Serializer(CType type)
-        {
-            this.type = type;
-        }
-
-        public void serialize(SliceQueryFilter f, DataOutputPlus out, int version) throws IOException
-        {
-            out.writeInt(f.slices.length);
-            for (ColumnSlice slice : f.slices)
-                type.sliceSerializer().serialize(slice, out, version);
-            out.writeBoolean(f.reversed);
-            int count = f.count;
-            out.writeInt(count);
-
-            out.writeInt(f.compositesToGroup);
-        }
-
-        public SliceQueryFilter deserialize(DataInput in, int version) throws IOException
-        {
-            ColumnSlice[] slices;
-            slices = new ColumnSlice[in.readInt()];
-            for (int i = 0; i < slices.length; i++)
-                slices[i] = type.sliceSerializer().deserialize(in, version);
-            boolean reversed = in.readBoolean();
-            int count = in.readInt();
-            int compositesToGroup = in.readInt();
-
-            return new SliceQueryFilter(slices, reversed, count, compositesToGroup);
-        }
-
-        public long serializedSize(SliceQueryFilter f, int version)
-        {
-            TypeSizes sizes = TypeSizes.NATIVE;
-
-            int size = 0;
-            size += sizes.sizeof(f.slices.length);
-            for (ColumnSlice slice : f.slices)
-                size += type.sliceSerializer().serializedSize(slice, version);
-            size += sizes.sizeof(f.reversed);
-            size += sizes.sizeof(f.count);
-
-            size += sizes.sizeof(f.compositesToGroup);
-            return size;
-        }
-    }
-
-    public Iterator<RangeTombstone> getRangeTombstoneIterator(final ColumnFamily source)
-    {
-        final DeletionInfo delInfo = source.deletionInfo();
-        if (!delInfo.hasRanges() || slices.length == 0)
-            return Iterators.emptyIterator();
-
-        return new AbstractIterator<RangeTombstone>()
-        {
-            private int sliceIdx = 0;
-            private Iterator<RangeTombstone> sliceIter = currentRangeIter();
-
-            protected RangeTombstone computeNext()
-            {
-                while (true)
-                {
-                    if (sliceIter.hasNext())
-                        return sliceIter.next();
-
-                    if (!nextSlice())
-                        return endOfData();
-
-                    sliceIter = currentRangeIter();
-                }
-            }
-
-            private Iterator<RangeTombstone> currentRangeIter()
-            {
-                ColumnSlice slice = slices[reversed ? (slices.length - 1 - sliceIdx) : sliceIdx];
-                return reversed ? delInfo.rangeIterator(slice.finish, slice.start)
-                                : delInfo.rangeIterator(slice.start, slice.finish);
-            }
-
-            private boolean nextSlice()
-            {
-                return ++sliceIdx < slices.length;
-            }
-        };
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java b/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java
index 7624e1b..98b539e 100644
--- a/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java
+++ b/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java
@@ -18,49 +18,51 @@
  */
 package org.apache.cassandra.db.filter;
 
-import org.apache.cassandra.db.DecoratedKey;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.*;
 
 public class TombstoneOverwhelmingException extends RuntimeException
 {
-    private final int numTombstones;
-    private final int numRequested;
-    private final String ksName;
-    private final String cfName;
-    private final String lastCellName;
-    private final String slicesInfo;
-    private String partitionKey = null;
-
-    public TombstoneOverwhelmingException(int numTombstones,
-                                          int numRequested,
-                                          String ksName,
-                                          String cfName,
-                                          String lastCellName,
-                                          String slicesInfo)
+    public TombstoneOverwhelmingException(int numTombstones, String query, CFMetaData metadata, DecoratedKey lastPartitionKey, ClusteringPrefix lastClustering)
     {
-        this.numTombstones = numTombstones;
-        this.numRequested = numRequested;
-        this.ksName = ksName;
-        this.cfName = cfName;
-        this.lastCellName = lastCellName;
-        this.slicesInfo = slicesInfo;
+        super(String.format("Scanned over %d tombstones during query '%s' (last scanned row partion key was (%s)); query aborted",
+                            numTombstones, query, makePKString(metadata, lastPartitionKey.getKey(), lastClustering)));
     }
 
-    public void setKey(DecoratedKey key)
+    private static String makePKString(CFMetaData metadata, ByteBuffer partitionKey, ClusteringPrefix clustering)
     {
-        if (key != null)
-            partitionKey = key.toString();
-    }
+        StringBuilder sb = new StringBuilder();
 
-    public String getLocalizedMessage()
-    {
-        return getMessage();
-    }
+        if (clustering.size() > 0)
+            sb.append("(");
 
-    public String getMessage()
-    {
-        return String.format(
-                "Scanned over %d tombstones in %s.%s; %d columns were requested; query aborted " +
-                "(see tombstone_failure_threshold); partitionKey=%s; lastCell=%s; slices=%s",
-                numTombstones, ksName, cfName, numRequested, partitionKey, lastCellName, slicesInfo);
+        // TODO: We should probably make that a lot easier/transparent for partition keys
+        AbstractType<?> pkType = metadata.getKeyValidator();
+        if (pkType instanceof CompositeType)
+        {
+            CompositeType ct = (CompositeType)pkType;
+            ByteBuffer[] values = ct.split(partitionKey);
+            for (int i = 0; i < values.length; i++)
+            {
+                if (i > 0)
+                    sb.append(", ");
+                sb.append(ct.types.get(i).getString(values[i]));
+            }
+        }
+        else
+        {
+            sb.append(pkType.getString(partitionKey));
+        }
+
+        if (clustering.size() > 0)
+            sb.append(")");
+
+        for (int i = 0; i < clustering.size(); i++)
+            sb.append(", ").append(metadata.comparator.subtype(i).getString(clustering.get(i)));
+
+        return sb.toString();
     }
 }