You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:47:53 UTC

[29/51] [partial] cassandra git commit: Storage engine refactor, a.k.a CASSANDRA-8099

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/composites/CompoundSparseCellNameType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/composites/CompoundSparseCellNameType.java b/src/java/org/apache/cassandra/db/composites/CompoundSparseCellNameType.java
deleted file mode 100644
index c88c6f4..0000000
--- a/src/java/org/apache/cassandra/db/composites/CompoundSparseCellNameType.java
+++ /dev/null
@@ -1,330 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.composites;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.CQL3Row;
-import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.memory.AbstractAllocator;
-
-public class CompoundSparseCellNameType extends AbstractCompoundCellNameType
-{
-    public static final ColumnIdentifier rowMarkerId = new ColumnIdentifier(ByteBufferUtil.EMPTY_BYTE_BUFFER, UTF8Type.instance);
-    private static final CellName rowMarkerNoPrefix = new CompoundSparseCellName(rowMarkerId, false);
-
-    // For CQL3 columns, this is always UTF8Type. However, for compatibility with super columns, we need to allow it to be non-UTF8.
-    private final AbstractType<?> columnNameType;
-    protected final Map<ByteBuffer, ColumnIdentifier> internedIds;
-
-    private final Composite staticPrefix;
-
-    public CompoundSparseCellNameType(List<AbstractType<?>> types)
-    {
-        this(types, UTF8Type.instance);
-    }
-
-    public CompoundSparseCellNameType(List<AbstractType<?>> types, AbstractType<?> columnNameType)
-    {
-        this(new CompoundCType(types), columnNameType);
-    }
-
-    private CompoundSparseCellNameType(CompoundCType clusteringType, AbstractType<?> columnNameType)
-    {
-        this(clusteringType, columnNameType, makeCType(clusteringType, columnNameType, null), new HashMap<ByteBuffer, ColumnIdentifier>());
-    }
-
-    private CompoundSparseCellNameType(CompoundCType clusteringType, AbstractType<?> columnNameType, CompoundCType fullType, Map<ByteBuffer, ColumnIdentifier> internedIds)
-    {
-        super(clusteringType, fullType);
-        this.columnNameType = columnNameType;
-        this.internedIds = internedIds;
-        this.staticPrefix = makeStaticPrefix(clusteringType.size());
-    }
-
-    private static Composite makeStaticPrefix(int size)
-    {
-        ByteBuffer[] elements = new ByteBuffer[size];
-        for (int i = 0; i < size; i++)
-            elements[i] = ByteBufferUtil.EMPTY_BYTE_BUFFER;
-
-        return new CompoundComposite(elements, size, true)
-        {
-            @Override
-            public boolean isStatic()
-            {
-                return true;
-            }
-
-            @Override
-            public long unsharedHeapSize()
-            {
-                // We'll share this for a given type.
-                return 0;
-            }
-
-            @Override
-            public Composite copy(CFMetaData cfm, AbstractAllocator allocator)
-            {
-                return this;
-            }
-        };
-    }
-
-    protected static CompoundCType makeCType(CompoundCType clusteringType, AbstractType<?> columnNameType, ColumnToCollectionType collectionType)
-    {
-        List<AbstractType<?>> allSubtypes = new ArrayList<AbstractType<?>>(clusteringType.size() + (collectionType == null ? 1 : 2));
-        for (int i = 0; i < clusteringType.size(); i++)
-            allSubtypes.add(clusteringType.subtype(i));
-        allSubtypes.add(columnNameType);
-        if (collectionType != null)
-            allSubtypes.add(collectionType);
-        return new CompoundCType(allSubtypes);
-    }
-
-    public CellNameType setSubtype(int position, AbstractType<?> newType)
-    {
-        if (position < clusteringSize)
-            return new CompoundSparseCellNameType(clusteringType.setSubtype(position, newType), columnNameType, fullType.setSubtype(position, newType), internedIds);
-
-        if (position == clusteringSize)
-            throw new IllegalArgumentException();
-
-        throw new IndexOutOfBoundsException();
-    }
-
-    @Override
-    public CellNameType addOrUpdateCollection(ColumnIdentifier columnName, CollectionType newCollection)
-    {
-        return new WithCollection(clusteringType, ColumnToCollectionType.getInstance(Collections.singletonMap(columnName.bytes, newCollection)), internedIds);
-    }
-
-    public boolean isDense()
-    {
-        return false;
-    }
-
-    public boolean supportCollections()
-    {
-        return true;
-    }
-
-    public Composite staticPrefix()
-    {
-        return staticPrefix;
-    }
-
-    public CellName create(Composite prefix, ColumnDefinition column)
-    {
-        return create(prefix, column.name, column.isStatic());
-    }
-
-    private CellName create(Composite prefix, ColumnIdentifier columnName, boolean isStatic)
-    {
-        if (isStatic)
-            prefix = staticPrefix();
-
-        assert prefix.size() == clusteringSize;
-
-        if (prefix.isEmpty())
-            return new CompoundSparseCellName(columnName, isStatic);
-
-        assert prefix instanceof CompoundComposite;
-        CompoundComposite lc = (CompoundComposite)prefix;
-        return new CompoundSparseCellName(lc.elements, clusteringSize, columnName, isStatic);
-    }
-
-    public CellName rowMarker(Composite prefix)
-    {
-        assert !prefix.isStatic(); // static columns don't really create rows, they shouldn't have a row marker
-        if (prefix.isEmpty())
-            return rowMarkerNoPrefix;
-
-        return create(prefix, rowMarkerId, false);
-    }
-
-    protected ColumnIdentifier idFor(ByteBuffer bb)
-    {
-        ColumnIdentifier id = internedIds.get(bb);
-        return id == null ? new ColumnIdentifier(bb, columnNameType) : id;
-    }
-
-    protected Composite makeWith(ByteBuffer[] components, int size, Composite.EOC eoc, boolean isStatic)
-    {
-        if (size < clusteringSize + 1 || eoc != Composite.EOC.NONE)
-            return new CompoundComposite(components, size, isStatic).withEOC(eoc);
-
-        return new CompoundSparseCellName(components, clusteringSize, idFor(components[clusteringSize]), isStatic);
-    }
-
-    protected Composite copyAndMakeWith(ByteBuffer[] components, int size, Composite.EOC eoc, boolean isStatic)
-    {
-        if (size < clusteringSize + 1 || eoc != Composite.EOC.NONE)
-            return new CompoundComposite(Arrays.copyOfRange(components, 0, size), size, isStatic).withEOC(eoc);
-
-        ByteBuffer[] clusteringColumns = Arrays.copyOfRange(components, 0, clusteringSize);
-        return new CompoundSparseCellName(clusteringColumns, idFor(components[clusteringSize]), isStatic);
-    }
-
-    public void addCQL3Column(ColumnIdentifier id)
-    {
-        internedIds.put(id.bytes, id);
-    }
-
-    public void removeCQL3Column(ColumnIdentifier id)
-    {
-        internedIds.remove(id.bytes);
-    }
-
-    public CQL3Row.Builder CQL3RowBuilder(CFMetaData metadata, long now)
-    {
-        return makeSparseCQL3RowBuilder(metadata, this, now);
-    }
-
-    public static class WithCollection extends CompoundSparseCellNameType
-    {
-        private final ColumnToCollectionType collectionType;
-
-        public WithCollection(List<AbstractType<?>> types, ColumnToCollectionType collectionType)
-        {
-            this(new CompoundCType(types), collectionType);
-        }
-
-        WithCollection(CompoundCType clusteringType, ColumnToCollectionType collectionType)
-        {
-            this(clusteringType, collectionType, new HashMap<ByteBuffer, ColumnIdentifier>());
-        }
-
-        private WithCollection(CompoundCType clusteringType, ColumnToCollectionType collectionType, Map<ByteBuffer, ColumnIdentifier> internedIds)
-        {
-            this(clusteringType, makeCType(clusteringType, UTF8Type.instance, collectionType), collectionType, internedIds);
-        }
-
-        private WithCollection(CompoundCType clusteringType, CompoundCType fullCType, ColumnToCollectionType collectionType, Map<ByteBuffer, ColumnIdentifier> internedIds)
-        {
-            super(clusteringType, UTF8Type.instance, fullCType, internedIds);
-            this.collectionType = collectionType;
-        }
-
-        @Override
-        public CellNameType setSubtype(int position, AbstractType<?> newType)
-        {
-            if (position < clusteringSize)
-                return new WithCollection(clusteringType.setSubtype(position, newType), collectionType, internedIds);
-
-            throw position >= fullType.size() ? new IndexOutOfBoundsException() : new IllegalArgumentException();
-        }
-
-        @Override
-        public CellNameType addOrUpdateCollection(ColumnIdentifier columnName, CollectionType newCollection)
-        {
-            Map<ByteBuffer, CollectionType> newMap = new HashMap<>(collectionType.defined);
-            newMap.put(columnName.bytes, newCollection);
-            return new WithCollection(clusteringType, ColumnToCollectionType.getInstance(newMap), internedIds);
-        }
-
-        @Override
-        public CellName create(Composite prefix, ColumnDefinition column, ByteBuffer collectionElement)
-        {
-            if (column.isStatic())
-                prefix = staticPrefix();
-
-            assert prefix.size() == clusteringSize;
-
-            if (prefix.isEmpty())
-                return new CompoundSparseCellName.WithCollection(column.name, collectionElement, column.isStatic());
-
-            assert prefix instanceof CompoundComposite;
-            CompoundComposite lc = (CompoundComposite)prefix;
-            return new CompoundSparseCellName.WithCollection(lc.elements, clusteringSize, column.name, collectionElement, column.isStatic());
-        }
-
-        @Override
-        public int compare(Composite c1, Composite c2)
-        {
-            if (c1.isStatic() != c2.isStatic())
-            {
-                // Static sorts before non-static no matter what, except for empty which
-                // always sort first
-                if (c1.isEmpty())
-                    return c2.isEmpty() ? 0 : -1;
-                if (c2.isEmpty())
-                    return 1;
-                return c1.isStatic() ? -1 : 1;
-            }
-
-            int s1 = c1.size();
-            int s2 = c2.size();
-            int minSize = Math.min(s1, s2);
-
-            ByteBuffer previous = null;
-            for (int i = 0; i < minSize; i++)
-            {
-                AbstractType<?> comparator = subtype(i);
-                ByteBuffer value1 = c1.get(i);
-                ByteBuffer value2 = c2.get(i);
-
-                int cmp = comparator.compareCollectionMembers(value1, value2, previous);
-                if (cmp != 0)
-                    return cmp;
-
-                previous = value1;
-            }
-
-            if (s1 == s2)
-                return c1.eoc().compareTo(c2.eoc());
-            return s1 < s2 ? c1.eoc().prefixComparisonResult : -c2.eoc().prefixComparisonResult;
-        }
-
-        @Override
-        public boolean hasCollections()
-        {
-            return true;
-        }
-
-        @Override
-        public ColumnToCollectionType collectionType()
-        {
-            return collectionType;
-        }
-
-        @Override
-        protected Composite makeWith(ByteBuffer[] components, int size, Composite.EOC eoc, boolean isStatic)
-        {
-            if (size < fullSize)
-                return super.makeWith(components, size, eoc, isStatic);
-
-            return new CompoundSparseCellName.WithCollection(components, clusteringSize, idFor(components[clusteringSize]), components[fullSize - 1], isStatic);
-        }
-
-        protected Composite copyAndMakeWith(ByteBuffer[] components, int size, Composite.EOC eoc, boolean isStatic)
-        {
-            if (size < fullSize)
-                return super.copyAndMakeWith(components, size, eoc, isStatic);
-
-            ByteBuffer[] clusteringColumns = Arrays.copyOfRange(components, 0, clusteringSize);
-            return new CompoundSparseCellName.WithCollection(clusteringColumns, idFor(components[clusteringSize]), components[clusteringSize + 1], isStatic);
-        }
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/composites/SimpleCType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/composites/SimpleCType.java b/src/java/org/apache/cassandra/db/composites/SimpleCType.java
deleted file mode 100644
index 7ee45ac..0000000
--- a/src/java/org/apache/cassandra/db/composites/SimpleCType.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.composites;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import org.apache.cassandra.db.marshal.AbstractType;
-
-/**
- * A not truly-composite CType.
- */
-public class SimpleCType extends AbstractCType
-{
-    protected final AbstractType<?> type;
-
-    public SimpleCType(AbstractType<?> type)
-    {
-        super(type.isByteOrderComparable());
-        this.type = type;
-    }
-
-    public boolean isCompound()
-    {
-        return false;
-    }
-
-    public int size()
-    {
-        return 1;
-    }
-
-    public int compare(Composite c1, Composite c2)
-    {
-        if (isByteOrderComparable)
-            return AbstractSimpleCellNameType.compareUnsigned(c1, c2);
-
-        assert !(c1.isEmpty() | c2.isEmpty());
-        // This method assumes that simple composites never have an EOC != NONE. This assumption
-        // stands in particular on the fact that a Composites.EMPTY never has a non-NONE EOC. If
-        // this ever change, we'll need to update this.
-        return type.compare(c1.get(0), c2.get(0));
-    }
-
-    public AbstractType<?> subtype(int i)
-    {
-        if (i != 0)
-            throw new IndexOutOfBoundsException();
-        return type;
-    }
-
-    public Composite fromByteBuffer(ByteBuffer bytes)
-    {
-        return !bytes.hasRemaining() ? Composites.EMPTY : new SimpleComposite(bytes);
-    }
-
-    public CBuilder builder()
-    {
-        return new SimpleCBuilder(this);
-    }
-
-    public CType setSubtype(int position, AbstractType<?> newType)
-    {
-        if (position != 0)
-            throw new IndexOutOfBoundsException();
-        return new SimpleCType(newType);
-    }
-
-    // Use sparingly, it defeats the purpose
-    public AbstractType<?> asAbstractType()
-    {
-        return type;
-    }
-
-    public static class SimpleCBuilder implements CBuilder
-    {
-        private final CType type;
-        private ByteBuffer value;
-
-        public SimpleCBuilder(CType type)
-        {
-            this.type = type;
-        }
-
-        public int remainingCount()
-        {
-            return value == null ? 1 : 0;
-        }
-
-        public CBuilder add(ByteBuffer value)
-        {
-            if (this.value != null)
-                throw new IllegalStateException();
-            this.value = value;
-            return this;
-        }
-
-        public CBuilder add(Object value)
-        {
-            return add(((AbstractType)type.subtype(0)).decompose(value));
-        }
-
-        public Composite build()
-        {
-            if (value == null || !value.hasRemaining())
-                return Composites.EMPTY;
-
-            // If we're building a dense cell name, then we can directly allocate the
-            // CellName object as it's complete.
-            if (type instanceof CellNameType && ((CellNameType)type).isDense())
-                return new SimpleDenseCellName(value);
-
-            return new SimpleComposite(value);
-        }
-
-        public Composite buildWith(ByteBuffer value)
-        {
-            if (this.value != null)
-                throw new IllegalStateException();
-
-            if (value == null || !value.hasRemaining())
-                return Composites.EMPTY;
-
-            // If we're building a dense cell name, then we can directly allocate the
-            // CellName object as it's complete.
-            if (type instanceof CellNameType && ((CellNameType)type).isDense())
-                return new SimpleDenseCellName(value);
-
-            return new SimpleComposite(value);
-        }
-
-        public Composite buildWith(List<ByteBuffer> values)
-        {
-            if (values.size() > 1)
-                throw new IllegalStateException();
-            if (values.isEmpty())
-                return Composites.EMPTY;
-            return buildWith(values.get(0));
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/composites/SimpleComposite.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/composites/SimpleComposite.java b/src/java/org/apache/cassandra/db/composites/SimpleComposite.java
deleted file mode 100644
index 3c80d9f..0000000
--- a/src/java/org/apache/cassandra/db/composites/SimpleComposite.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.composites;
-
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.utils.memory.AbstractAllocator;
-import org.apache.cassandra.utils.ObjectSizes;
-
-/**
- * A "simple" (not-truly-composite) Composite.
- */
-public class SimpleComposite extends AbstractComposite
-{
-    private static final long EMPTY_SIZE = ObjectSizes.measure(new SimpleComposite(ByteBuffer.allocate(1)));
-
-    protected final ByteBuffer element;
-
-    SimpleComposite(ByteBuffer element)
-    {
-        // We have to be careful with empty ByteBuffers as we shouldn't store them.
-        // To avoid errors (and so isEmpty() works as we intend), we don't allow simpleComposite with
-        // an empty element (but it's ok for CompoundComposite, it's a row marker in that case).
-        assert element.hasRemaining();
-        this.element = element;
-    }
-
-    public int size()
-    {
-        return 1;
-    }
-
-    public ByteBuffer get(int i)
-    {
-        if (i != 0)
-            throw new IndexOutOfBoundsException();
-
-        return element;
-    }
-
-    @Override
-    public Composite withEOC(EOC newEoc)
-    {
-        // EOC makes no sense for not truly composites.
-        return this;
-    }
-
-    @Override
-    public ByteBuffer toByteBuffer()
-    {
-        return element;
-    }
-
-    public long unsharedHeapSize()
-    {
-        return EMPTY_SIZE + ObjectSizes.sizeOnHeapOf(element);
-    }
-
-    public Composite copy(CFMetaData cfm, AbstractAllocator allocator)
-    {
-        return new SimpleComposite(allocator.clone(element));
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/composites/SimpleDenseCellName.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/composites/SimpleDenseCellName.java b/src/java/org/apache/cassandra/db/composites/SimpleDenseCellName.java
deleted file mode 100644
index 2ca7d23..0000000
--- a/src/java/org/apache/cassandra/db/composites/SimpleDenseCellName.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.composites;
-
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.utils.memory.AbstractAllocator;
-import org.apache.cassandra.utils.ObjectSizes;
-
-public class SimpleDenseCellName extends SimpleComposite implements CellName
-{
-    private static final long EMPTY_SIZE = ObjectSizes.measure(new SimpleDenseCellName(ByteBuffer.allocate(1)));
-
-    // Not meant to be used directly, you should use the CellNameType method instead
-    SimpleDenseCellName(ByteBuffer element)
-    {
-        super(element);
-    }
-
-    public int clusteringSize()
-    {
-        return 1;
-    }
-
-    public ColumnIdentifier cql3ColumnName(CFMetaData metadata)
-    {
-        return null;
-    }
-
-    public ByteBuffer collectionElement()
-    {
-        return null;
-    }
-
-    public boolean isCollectionCell()
-    {
-        return false;
-    }
-
-    public boolean isSameCQL3RowAs(CellNameType type, CellName other)
-    {
-        // Dense cell imply one cell by CQL row so no other cell will be the same row.
-        return type.compare(this, other) == 0;
-    }
-
-    @Override
-    public long unsharedHeapSize()
-    {
-        return EMPTY_SIZE + ObjectSizes.sizeOnHeapOf(element);
-    }
-
-    @Override
-    public long unsharedHeapSizeExcludingData()
-    {
-        return EMPTY_SIZE + ObjectSizes.sizeOnHeapExcludingData(element);
-    }
-
-    // If cellnames were sharing some prefix components, this will break it, so
-    // we might want to try to do better.
-    @Override
-    public CellName copy(CFMetaData cfm, AbstractAllocator allocator)
-    {
-        return new SimpleDenseCellName(allocator.clone(element));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/composites/SimpleDenseCellNameType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/composites/SimpleDenseCellNameType.java b/src/java/org/apache/cassandra/db/composites/SimpleDenseCellNameType.java
deleted file mode 100644
index 3db4bc4..0000000
--- a/src/java/org/apache/cassandra/db/composites/SimpleDenseCellNameType.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.composites;
-
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.CQL3Row;
-import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.db.marshal.AbstractType;
-
-public class SimpleDenseCellNameType extends AbstractSimpleCellNameType
-{
-    public SimpleDenseCellNameType(AbstractType<?> type)
-    {
-        super(type);
-    }
-
-    public int clusteringPrefixSize()
-    {
-        return 1;
-    }
-
-    public CBuilder prefixBuilder()
-    {
-        // Simple dense is "all" prefix
-        return builder();
-    }
-
-    public CellNameType setSubtype(int position, AbstractType<?> newType)
-    {
-        if (position != 0)
-            throw new IllegalArgumentException();
-        return new SimpleDenseCellNameType(newType);
-    }
-
-    public boolean isDense()
-    {
-        return true;
-    }
-
-    public CellName create(Composite prefix, ColumnDefinition column)
-    {
-        assert prefix.size() == 1;
-        // We ignore the column because it's just the COMPACT_VALUE name which is not store in the cell name
-        return new SimpleDenseCellName(prefix.get(0));
-    }
-
-    @Override
-    public Composite fromByteBuffer(ByteBuffer bb)
-    {
-        return !bb.hasRemaining()
-             ? Composites.EMPTY
-             : new SimpleDenseCellName(bb);
-    }
-
-    public void addCQL3Column(ColumnIdentifier id) {}
-    public void removeCQL3Column(ColumnIdentifier id) {}
-
-    public CQL3Row.Builder CQL3RowBuilder(CFMetaData metadata, long now)
-    {
-        return makeDenseCQL3RowBuilder(now);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/composites/SimpleSparseCellName.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/composites/SimpleSparseCellName.java b/src/java/org/apache/cassandra/db/composites/SimpleSparseCellName.java
deleted file mode 100644
index c6351f1..0000000
--- a/src/java/org/apache/cassandra/db/composites/SimpleSparseCellName.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.composites;
-
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.utils.memory.AbstractAllocator;
-import org.apache.cassandra.utils.ObjectSizes;
-
-public class SimpleSparseCellName extends AbstractComposite implements CellName
-{
-    private static final long EMPTY_SIZE = ObjectSizes.measure(new SimpleSparseCellName(null));
-
-    private final ColumnIdentifier columnName;
-
-    // Not meant to be used directly, you should use the CellNameType method instead
-    SimpleSparseCellName(ColumnIdentifier columnName)
-    {
-        this.columnName = columnName;
-    }
-
-    public int size()
-    {
-        return 1;
-    }
-
-    public ByteBuffer get(int i)
-    {
-        if (i != 0)
-            throw new IndexOutOfBoundsException();
-
-        return columnName.bytes;
-    }
-
-    @Override
-    public Composite withEOC(EOC newEoc)
-    {
-        // EOC makes no sense for not truly composites.
-        return this;
-    }
-
-    @Override
-    public ByteBuffer toByteBuffer()
-    {
-        return columnName.bytes;
-    }
-
-    public int clusteringSize()
-    {
-        return 0;
-    }
-
-    public ColumnIdentifier cql3ColumnName(CFMetaData metadata)
-    {
-        return columnName;
-    }
-
-    public ByteBuffer collectionElement()
-    {
-        return null;
-    }
-
-    public boolean isCollectionCell()
-    {
-        return false;
-    }
-
-    public boolean isSameCQL3RowAs(CellNameType type, CellName other)
-    {
-        return true;
-    }
-
-    public long unsharedHeapSizeExcludingData()
-    {
-        return EMPTY_SIZE + columnName.unsharedHeapSizeExcludingData();
-    }
-
-    public long unsharedHeapSize()
-    {
-        return EMPTY_SIZE + columnName.unsharedHeapSize();
-    }
-
-    public CellName copy(CFMetaData cfm, AbstractAllocator allocator)
-    {
-        return new SimpleSparseCellName(columnName.clone(allocator));
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/composites/SimpleSparseCellNameType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/composites/SimpleSparseCellNameType.java b/src/java/org/apache/cassandra/db/composites/SimpleSparseCellNameType.java
deleted file mode 100644
index 5ce0deb..0000000
--- a/src/java/org/apache/cassandra/db/composites/SimpleSparseCellNameType.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.composites;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.CQL3Row;
-import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.db.marshal.AbstractType;
-
-public class SimpleSparseCellNameType extends AbstractSimpleCellNameType
-{
-    // Simple sparse means static thrift CF or non-clustered CQL3. This means that cell names will mainly
-    // be those that have been declared and we can intern the whole CellName instances.
-    private final Map<ByteBuffer, CellName> internedNames;
-
-    public SimpleSparseCellNameType(AbstractType<?> type)
-    {
-        this(type, new HashMap<ByteBuffer, CellName>());
-    }
-
-    private SimpleSparseCellNameType(AbstractType<?> type, Map<ByteBuffer, CellName> internedNames)
-    {
-        super(type);
-        this.internedNames = internedNames;
-    }
-
-    public int clusteringPrefixSize()
-    {
-        return 0;
-    }
-
-    public CellNameType setSubtype(int position, AbstractType<?> newType)
-    {
-        if (position != 0)
-            throw new IllegalArgumentException();
-        return new SimpleSparseCellNameType(newType, internedNames);
-    }
-
-    public CBuilder prefixBuilder()
-    {
-        return Composites.EMPTY_BUILDER;
-    }
-
-    public boolean isDense()
-    {
-        return false;
-    }
-
-    public CellName create(Composite prefix, ColumnDefinition column)
-    {
-        assert prefix.isEmpty();
-        CellName cn = internedNames.get(column.name.bytes);
-        return cn == null ? new SimpleSparseCellName(column.name) : cn;
-    }
-
-    @Override
-    public Composite fromByteBuffer(ByteBuffer bb)
-    {
-        if (!bb.hasRemaining())
-            return Composites.EMPTY;
-
-        CellName cn = internedNames.get(bb);
-        return cn == null ? new SimpleSparseCellName(new ColumnIdentifier(bb, type)) : cn;
-    }
-
-    public void addCQL3Column(ColumnIdentifier id)
-    {
-        internedNames.put(id.bytes, new SimpleSparseInternedCellName(id));
-    }
-
-    public void removeCQL3Column(ColumnIdentifier id)
-    {
-        internedNames.remove(id.bytes);
-    }
-
-    public CQL3Row.Builder CQL3RowBuilder(CFMetaData metadata, long now)
-    {
-        return makeSparseCQL3RowBuilder(metadata, this, now);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/composites/SimpleSparseInternedCellName.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/composites/SimpleSparseInternedCellName.java b/src/java/org/apache/cassandra/db/composites/SimpleSparseInternedCellName.java
deleted file mode 100644
index c613720..0000000
--- a/src/java/org/apache/cassandra/db/composites/SimpleSparseInternedCellName.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.composites;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.utils.memory.AbstractAllocator;
-
-public class SimpleSparseInternedCellName extends SimpleSparseCellName
-{
-
-    // Not meant to be used directly, you should use the CellNameType method instead
-    SimpleSparseInternedCellName(ColumnIdentifier columnName)
-    {
-        super(columnName);
-    }
-
-    @Override
-    public long unsharedHeapSizeExcludingData()
-    {
-        return 0;
-    }
-
-    @Override
-    public long unsharedHeapSize()
-    {
-        return 0;
-    }
-
-    @Override
-    public CellName copy(CFMetaData cfm, AbstractAllocator allocator)
-    {
-        // We're interning those instance in SparceCellNameType so don't need to copy.
-        return this;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/context/CounterContext.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/context/CounterContext.java b/src/java/org/apache/cassandra/db/context/CounterContext.java
index ffffbb1..2a6c5ff 100644
--- a/src/java/org/apache/cassandra/db/context/CounterContext.java
+++ b/src/java/org/apache/cassandra/db/context/CounterContext.java
@@ -111,7 +111,12 @@ public class CounterContext
 
     /**
      * Creates a counter context with a single local shard.
-     * For use by tests of compatibility with pre-2.1 counters only.
+     * This is only used in a PartitionUpdate until the update has gone through
+     * CounterMutation.apply(), at which point all the local shard are replaced by
+     * global ones. In other words, local shards should never hit the disk or
+     * memtables. And we use this so that if an update statement has multiple increment
+     * of the same counter we properly add them rather than keeping only one of them.
+     * (this is also used for tests of compatibility with pre-2.1 counters)
      */
     public ByteBuffer createLocal(long count)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java b/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java
new file mode 100644
index 0000000..46d10df
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.filter;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.ReversedType;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+public abstract class AbstractClusteringIndexFilter implements ClusteringIndexFilter
+{
+    protected enum Kind
+    {
+        SLICE (ClusteringIndexSliceFilter.deserializer),
+        NAMES (ClusteringIndexNamesFilter.deserializer);
+
+        private final InternalDeserializer deserializer;
+
+        private Kind(InternalDeserializer deserializer)
+        {
+            this.deserializer = deserializer;
+        }
+    }
+
+    static final Serializer serializer = new FilterSerializer();
+
+    abstract Kind kind();
+
+    protected final boolean reversed;
+
+    protected AbstractClusteringIndexFilter(boolean reversed)
+    {
+        this.reversed = reversed;
+    }
+
+    public boolean isReversed()
+    {
+        return reversed;
+    }
+
+    protected abstract void serializeInternal(DataOutputPlus out, int version) throws IOException;
+    protected abstract long serializedSizeInternal(int version, TypeSizes sizes);
+
+    protected void appendOrderByToCQLString(CFMetaData metadata, StringBuilder sb)
+    {
+        if (reversed)
+        {
+            sb.append(" ORDER BY (");
+            int i = 0;
+            for (ColumnDefinition column : metadata.clusteringColumns())
+                sb.append(i++ == 0 ? "" : ", ").append(column.name).append(column.type instanceof ReversedType ? " ASC" : " DESC");
+            sb.append(")");
+        }
+    }
+
+    private static class FilterSerializer implements Serializer
+    {
+        public void serialize(ClusteringIndexFilter pfilter, DataOutputPlus out, int version) throws IOException
+        {
+            AbstractClusteringIndexFilter filter = (AbstractClusteringIndexFilter)pfilter;
+
+            out.writeByte(filter.kind().ordinal());
+            out.writeBoolean(filter.isReversed());
+
+            filter.serializeInternal(out, version);
+        }
+
+        public ClusteringIndexFilter deserialize(DataInput in, int version, CFMetaData metadata) throws IOException
+        {
+            Kind kind = Kind.values()[in.readUnsignedByte()];
+            boolean reversed = in.readBoolean();
+
+            return kind.deserializer.deserialize(in, version, metadata, reversed);
+        }
+
+        public long serializedSize(ClusteringIndexFilter pfilter, int version)
+        {
+            AbstractClusteringIndexFilter filter = (AbstractClusteringIndexFilter)pfilter;
+
+            TypeSizes sizes = TypeSizes.NATIVE;
+            return 1
+                 + sizes.sizeof(filter.isReversed())
+                 + filter.serializedSizeInternal(version, sizes);
+        }
+    }
+
+    protected static abstract class InternalDeserializer
+    {
+        public abstract ClusteringIndexFilter deserialize(DataInput in, int version, CFMetaData metadata, boolean reversed) throws IOException;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/filter/ClusteringIndexFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ClusteringIndexFilter.java b/src/java/org/apache/cassandra/db/filter/ClusteringIndexFilter.java
new file mode 100644
index 0000000..54feb85
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/filter/ClusteringIndexFilter.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.filter;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.partitions.CachedPartition;
+import org.apache.cassandra.db.partitions.Partition;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+/**
+ * A filter that selects a subset of the rows of a given partition by using the "clustering index".
+ * <p>
+ * In CQL terms, this correspond to the clustering columns selection and correspond to what
+ * the storage engine can do without filtering (and without 2ndary indexes). This does not include
+ * the restrictions on non-PK columns which can be found in {@link RowFilter}.
+ */
+public interface ClusteringIndexFilter
+{
+    public static Serializer serializer = AbstractClusteringIndexFilter.serializer;
+
+    /**
+     * Whether the filter query rows in reversed clustering order or not.
+     *
+     * @return whether the filter query rows in reversed clustering order or not.
+     */
+    public boolean isReversed();
+
+    /**
+     * Returns a filter for continuing the paging of this filter given the last returned clustering prefix.
+     *
+     * @param comparator the comparator for the table this is a filter for.
+     * @param lastReturned the last clustering that was returned for the query we are paging for. The
+     * resulting filter will be such that results coming after {@code lastReturned} are returned
+     * (where coming after means "greater than" if the filter is not reversed, "lesser than" otherwise;
+     * futher, whether the comparison is strict or not depends on {@code inclusive}).
+     * @param inclusive whether or not we want to include the {@code lastReturned} in the newly returned
+     * page of results.
+     *
+     * @return a new filter that selects results coming after {@code lastReturned}.
+     */
+    public ClusteringIndexFilter forPaging(ClusteringComparator comparator, Clustering lastReturned, boolean inclusive);
+
+    /**
+     * Returns whether we can guarantee that a given cached partition contains all the data selected by this filter.
+     *
+     * @param partition the cached partition. This method assumed that the rows of this partition contains all the table columns.
+     *
+     * @return whether we can guarantee that all data selected by this filter are in {@code partition}.
+     */
+    public boolean isFullyCoveredBy(CachedPartition partition);
+
+    /**
+     * Whether this filter selects the head of a partition (i.e. it isn't reversed and selects all rows up to a certain point).
+     *
+     * @return whether this filter selects the head of a partition.
+     */
+    public boolean isHeadFilter();
+
+    /**
+     * Whether this filter selects all the row of a partition (it's an "identity" filter).
+     *
+     * @return whether this filter selects all the row of a partition (it's an "identity" filter).
+     */
+    public boolean selectsAllPartition();
+
+    /**
+     * Whether a given row is selected by this filter.
+     *
+     * @param clustering the clustering of the row to test the selection of.
+     *
+     * @return whether the row with clustering {@code clustering} is selected by this filter.
+     */
+    public boolean selects(Clustering clustering);
+
+    /**
+     * Returns an iterator that only returns the rows of the provided iterator that this filter selects.
+     * <p>
+     * This method is the "dumb" counterpart to {@link #filter(SliceableUnfilteredRowIterator)} in that it has no way to quickly get
+     * to what is actually selected, so it simply iterate over it all and filters out what shouldn't be returned. This should
+     * be avoided in general, we should make sure to have {@code SliceableUnfilteredRowIterator} when we have filtering to do, but this
+     * currently only used in {@link SinglePartitionReadCommand#getThroughCache} when we know this won't be a performance problem.
+     * Another difference with {@link #filter(SliceableUnfilteredRowIterator)} is that this method also filter the queried
+     * columns in the returned result, while the former assumes that the provided iterator has already done it.
+     *
+     * @param columnFilter the columns to include in the rows of the result iterator.
+     * @param iterator the iterator for which we should filter rows.
+     *
+     * @return an iterator that only returns the rows (or rather Unfilted) from {@code iterator} that are selected by this filter.
+     */
+    public UnfilteredRowIterator filterNotIndexed(ColumnFilter columnFilter, UnfilteredRowIterator iterator);
+
+    /**
+     * Returns an iterator that only returns the rows of the provided sliceable iterator that this filter selects.
+     *
+     * @param iterator the sliceable iterator for which we should filter rows.
+     *
+     * @return an iterator that only returns the rows (or rather unfiltered) from {@code iterator} that are selected by this filter.
+     */
+    public UnfilteredRowIterator filter(SliceableUnfilteredRowIterator iterator);
+
+    /**
+     * Given a partition, returns a row iterator for the rows of this partition that are selected by this filter.
+     *
+     * @param columnFilter the columns to include in the rows of the result iterator.
+     * @param partition the partition containing the rows to filter.
+     *
+     * @return a unfiltered row iterator returning those rows (or rather Unfiltered) from {@code partition} that are selected by this filter.
+     */
+    // TODO: we could get rid of that if Partition was exposing a SliceableUnfilteredRowIterator (instead of the two searchIterator() and
+    // unfilteredIterator() methods). However, for AtomicBtreePartition this would require changes to Btree so we'll leave that for later.
+    public UnfilteredRowIterator getUnfilteredRowIterator(ColumnFilter columnFilter, Partition partition);
+
+    /**
+     * Whether the provided sstable may contain data that is selected by this filter (based on the sstable metadata).
+     *
+     * @param sstable the sstable for which we want to test the need for inclusion.
+     *
+     * @return whether {@code sstable} should be included to answer this filter.
+     */
+    public boolean shouldInclude(SSTableReader sstable);
+
+    public String toString(CFMetaData metadata);
+    public String toCQLString(CFMetaData metadata);
+
+    public interface Serializer
+    {
+        public void serialize(ClusteringIndexFilter filter, DataOutputPlus out, int version) throws IOException;
+        public ClusteringIndexFilter deserialize(DataInput in, int version, CFMetaData metadata) throws IOException;
+        public long serializedSize(ClusteringIndexFilter filter, int version);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java b/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
new file mode 100644
index 0000000..1839d3e
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.filter;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.SearchIterator;
+
+/**
+ * A filter selecting rows given their clustering value.
+ */
+public class ClusteringIndexNamesFilter extends AbstractClusteringIndexFilter
+{
+    static final InternalDeserializer deserializer = new NamesDeserializer();
+
+    // This could be empty if selectedColumns only has static columns (in which case the filter still
+    // selects the static row)
+    private final NavigableSet<Clustering> clusterings;
+
+    // clusterings is always in clustering order (because we need it that way in some methods), but we also
+    // sometimes need those clustering in "query" order (i.e. in reverse clustering order if the query is
+    // reversed), so we keep that too for simplicity.
+    private final NavigableSet<Clustering> clusteringsInQueryOrder;
+
+    public ClusteringIndexNamesFilter(NavigableSet<Clustering> clusterings, boolean reversed)
+    {
+        super(reversed);
+        assert !clusterings.contains(Clustering.STATIC_CLUSTERING);
+        this.clusterings = clusterings;
+        this.clusteringsInQueryOrder = reversed ? clusterings.descendingSet() : clusterings;
+    }
+
+    /**
+     * The set of requested rows.
+     *
+     * Please note that this can be empty if only the static row is requested.
+     *
+     * @return the set of requested clustering in clustering order (note that
+     * this is always in clustering order even if the query is reversed).
+     */
+    public NavigableSet<Clustering> requestedRows()
+    {
+        return clusterings;
+    }
+
+    public boolean selectsAllPartition()
+    {
+        return false;
+    }
+
+    public boolean selects(Clustering clustering)
+    {
+        return clusterings.contains(clustering);
+    }
+
+    public ClusteringIndexNamesFilter forPaging(ClusteringComparator comparator, Clustering lastReturned, boolean inclusive)
+    {
+        // TODO: Consider removal of the initial check.
+        int cmp = comparator.compare(lastReturned, clusteringsInQueryOrder.first());
+        if (cmp < 0 || (inclusive && cmp == 0))
+            return this;
+
+        NavigableSet<Clustering> newClusterings = reversed ?
+                                                  clusterings.headSet(lastReturned, inclusive) :
+                                                  clusterings.tailSet(lastReturned, inclusive);
+
+        return new ClusteringIndexNamesFilter(newClusterings, reversed);
+    }
+
+    public boolean isFullyCoveredBy(CachedPartition partition)
+    {
+        // 'partition' contains all columns, so it covers our filter if our last clusterings
+        // is smaller than the last in the cache
+        return clusterings.comparator().compare(clusterings.last(), partition.lastRow().clustering()) <= 0;
+    }
+
+    public boolean isHeadFilter()
+    {
+        return false;
+    }
+
+    // Given another iterator, only return the rows that match this filter
+    public UnfilteredRowIterator filterNotIndexed(ColumnFilter columnFilter, UnfilteredRowIterator iterator)
+    {
+        // Note that we don't filter markers because that's a bit trickier (we don't know in advance until when
+        // the range extend) and it's harmless to left them.
+        return new FilteringRowIterator(iterator)
+        {
+            @Override
+            public FilteringRow makeRowFilter()
+            {
+                return FilteringRow.columnsFilteringRow(columnFilter);
+            }
+
+            @Override
+            protected boolean includeRow(Row row)
+            {
+                return clusterings.contains(row.clustering());
+            }
+        };
+    }
+
+    public UnfilteredRowIterator filter(final SliceableUnfilteredRowIterator iter)
+    {
+        // Please note that this method assumes that rows from 'iter' already have their columns filtered, i.e. that
+        // they only include columns that we select.
+        return new WrappingUnfilteredRowIterator(iter)
+        {
+            private final Iterator<Clustering> clusteringIter = clusteringsInQueryOrder.iterator();
+            private Iterator<Unfiltered> currentClustering;
+            private Unfiltered next;
+
+            @Override
+            public boolean hasNext()
+            {
+                if (next != null)
+                    return true;
+
+                if (currentClustering != null && currentClustering.hasNext())
+                {
+                    next = currentClustering.next();
+                    return true;
+                }
+
+                while (clusteringIter.hasNext())
+                {
+                    Clustering nextClustering = clusteringIter.next();
+                    currentClustering = iter.slice(Slice.make(nextClustering));
+                    if (currentClustering.hasNext())
+                    {
+                        next = currentClustering.next();
+                        return true;
+                    }
+                }
+                return false;
+            }
+
+            @Override
+            public Unfiltered next()
+            {
+                if (next == null && !hasNext())
+                    throw new NoSuchElementException();
+
+                Unfiltered toReturn = next;
+                next = null;
+                return toReturn;
+            }
+        };
+    }
+
+    public UnfilteredRowIterator getUnfilteredRowIterator(final ColumnFilter columnFilter, final Partition partition)
+    {
+        final SearchIterator<Clustering, Row> searcher = partition.searchIterator(columnFilter, reversed);
+        return new AbstractUnfilteredRowIterator(partition.metadata(),
+                                        partition.partitionKey(),
+                                        partition.partitionLevelDeletion(),
+                                        columnFilter.fetchedColumns(),
+                                        searcher.next(Clustering.STATIC_CLUSTERING),
+                                        reversed,
+                                        partition.stats())
+        {
+            private final Iterator<Clustering> clusteringIter = clusteringsInQueryOrder.iterator();
+
+            protected Unfiltered computeNext()
+            {
+                while (clusteringIter.hasNext() && searcher.hasNext())
+                {
+                    Row row = searcher.next(clusteringIter.next());
+                    if (row != null)
+                        return row;
+                }
+                return endOfData();
+            }
+        };
+    }
+
+    public boolean shouldInclude(SSTableReader sstable)
+    {
+        // TODO: we could actually exclude some sstables
+        return true;
+    }
+
+    public String toString(CFMetaData metadata)
+    {
+        StringBuilder sb = new StringBuilder();
+        sb.append("names(");
+        int i = 0;
+        for (Clustering clustering : clusterings)
+            sb.append(i++ == 0 ? "" : ", ").append(clustering.toString(metadata));
+        if (reversed)
+            sb.append(", reversed");
+        return sb.append(")").toString();
+    }
+
+    public String toCQLString(CFMetaData metadata)
+    {
+        if (clusterings.isEmpty())
+            return "";
+
+        StringBuilder sb = new StringBuilder();
+        sb.append("(").append(ColumnDefinition.toCQLString(metadata.clusteringColumns())).append(")");
+        sb.append(clusterings.size() == 1 ? " = " : " IN (");
+        int i = 0;
+        for (Clustering clustering : clusterings)
+            sb.append(i++ == 0 ? "" : ", ").append("(").append(clustering.toCQLString(metadata)).append(")");
+        sb.append(clusterings.size() == 1 ? "" : ")");
+
+        appendOrderByToCQLString(metadata, sb);
+        return sb.toString();
+    }
+
+    Kind kind()
+    {
+        return Kind.NAMES;
+    }
+
+    protected void serializeInternal(DataOutputPlus out, int version) throws IOException
+    {
+        ClusteringComparator comparator = (ClusteringComparator)clusterings.comparator();
+        out.writeInt(clusterings.size());
+        for (Clustering clustering : clusterings)
+            Clustering.serializer.serialize(clustering, out, version, comparator.subtypes());
+    }
+
+    protected long serializedSizeInternal(int version, TypeSizes sizes)
+    {
+        long size = 0;
+        ClusteringComparator comparator = (ClusteringComparator)clusterings.comparator();
+        for (Clustering clustering : clusterings)
+            size += Clustering.serializer.serializedSize(clustering, version, comparator.subtypes(), sizes);
+        return size;
+    }
+
+    private static class NamesDeserializer extends InternalDeserializer
+    {
+        public ClusteringIndexFilter deserialize(DataInput in, int version, CFMetaData metadata, boolean reversed) throws IOException
+        {
+            ClusteringComparator comparator = metadata.comparator;
+            NavigableSet<Clustering> clusterings = new TreeSet<>(comparator);
+            int size = in.readInt();
+            for (int i = 0; i < size; i++)
+                clusterings.add(Clustering.serializer.deserialize(in, version, comparator.subtypes()).takeAlias());
+
+            return new ClusteringIndexNamesFilter(clusterings, reversed);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java b/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java
new file mode 100644
index 0000000..9e58542
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.filter;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.util.List;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.partitions.CachedPartition;
+import org.apache.cassandra.db.partitions.Partition;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+/**
+ * A filter over a single partition.
+ */
+public class ClusteringIndexSliceFilter extends AbstractClusteringIndexFilter
+{
+    static final InternalDeserializer deserializer = new SliceDeserializer();
+
+    private final Slices slices;
+
+    public ClusteringIndexSliceFilter(Slices slices, boolean reversed)
+    {
+        super(reversed);
+        this.slices = slices;
+    }
+
+    public Slices requestedSlices()
+    {
+        return slices;
+    }
+
+    public boolean selectsAllPartition()
+    {
+        return slices.size() == 1 && !slices.hasLowerBound() && !slices.hasUpperBound();
+    }
+
+    public boolean selects(Clustering clustering)
+    {
+        return slices.selects(clustering);
+    }
+
+    public ClusteringIndexSliceFilter forPaging(ClusteringComparator comparator, Clustering lastReturned, boolean inclusive)
+    {
+        Slices newSlices = slices.forPaging(comparator, lastReturned, inclusive, reversed);
+        return slices == newSlices
+             ? this
+             : new ClusteringIndexSliceFilter(newSlices, reversed);
+    }
+
+    public boolean isFullyCoveredBy(CachedPartition partition)
+    {
+        // Partition is guaranteed to cover the whole filter if it includes the filter start and finish bounds.
+
+        // (note that since partition is the head of a partition, to have no lower bound is ok)
+        if (!slices.hasUpperBound() || partition.isEmpty())
+            return false;
+
+        return partition.metadata().comparator.compare(slices.get(slices.size() - 1).end(), partition.lastRow().clustering()) <= 0;
+    }
+
+    public boolean isHeadFilter()
+    {
+        return !reversed && slices.size() == 1 && !slices.hasLowerBound();
+    }
+
+    // Given another iterator, only return the rows that match this filter
+    public UnfilteredRowIterator filterNotIndexed(final ColumnFilter columnFilter, UnfilteredRowIterator iterator)
+    {
+        final Slices.InOrderTester tester = slices.inOrderTester(reversed);
+
+        // Note that we don't filter markers because that's a bit trickier (we don't know in advance until when
+        // the range extend) and it's harmless to leave them.
+        return new FilteringRowIterator(iterator)
+        {
+            @Override
+            public FilteringRow makeRowFilter()
+            {
+                return FilteringRow.columnsFilteringRow(columnFilter);
+            }
+
+            @Override
+            protected boolean includeRow(Row row)
+            {
+                return tester.includes(row.clustering());
+            }
+
+            @Override
+            public boolean hasNext()
+            {
+                return !tester.isDone() && super.hasNext();
+            }
+        };
+    }
+
+    public UnfilteredRowIterator filter(SliceableUnfilteredRowIterator iterator)
+    {
+        // Please note that this method assumes that rows from 'iter' already have their columns filtered, i.e. that
+        // they only include columns that we select.
+        return slices.makeSliceIterator(iterator);
+    }
+
+    public UnfilteredRowIterator getUnfilteredRowIterator(ColumnFilter columnFilter, Partition partition)
+    {
+        return partition.unfilteredIterator(columnFilter, slices, reversed);
+    }
+
+    public boolean shouldInclude(SSTableReader sstable)
+    {
+        List<ByteBuffer> minClusteringValues = sstable.getSSTableMetadata().minClusteringValues;
+        List<ByteBuffer> maxClusteringValues = sstable.getSSTableMetadata().maxClusteringValues;
+
+        if (minClusteringValues.isEmpty() || maxClusteringValues.isEmpty())
+            return true;
+
+        return slices.intersects(minClusteringValues, maxClusteringValues);
+    }
+
+    public String toString(CFMetaData metadata)
+    {
+        return String.format("slice(slices=%s, reversed=%b)", slices, reversed);
+    }
+
+    public String toCQLString(CFMetaData metadata)
+    {
+        StringBuilder sb = new StringBuilder();
+
+        if (!selectsAllPartition())
+            sb.append(slices.toCQLString(metadata));
+
+        appendOrderByToCQLString(metadata, sb);
+
+        return sb.toString();
+    }
+
+    Kind kind()
+    {
+        return Kind.SLICE;
+    }
+
+    protected void serializeInternal(DataOutputPlus out, int version) throws IOException
+    {
+        Slices.serializer.serialize(slices, out, version);
+    }
+
+    protected long serializedSizeInternal(int version, TypeSizes sizes)
+    {
+        return Slices.serializer.serializedSize(slices, version, sizes);
+    }
+
+    private static class SliceDeserializer extends InternalDeserializer
+    {
+        public ClusteringIndexFilter deserialize(DataInput in, int version, CFMetaData metadata, boolean reversed) throws IOException
+        {
+            Slices slices = Slices.serializer.deserialize(in, version, metadata);
+            return new ClusteringIndexSliceFilter(slices, reversed);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnCounter.java b/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
deleted file mode 100644
index 0d5acd1..0000000
--- a/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.cassandra.db.filter;
-
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.DeletionInfo;
-
-public class ColumnCounter
-{
-    protected int live;
-    protected int tombstones;
-    protected final long timestamp;
-
-    public ColumnCounter(long timestamp)
-    {
-        this.timestamp = timestamp;
-    }
-
-    /**
-     * @return true if the cell counted as a live cell or a valid tombstone; false if it got immediately discarded for
-     *         being shadowed by a range- or a partition tombstone
-     */
-    public boolean count(Cell cell, DeletionInfo.InOrderTester tester)
-    {
-        // The cell is shadowed by a higher-level deletion, and won't be retained.
-        // For the purposes of this counter, we don't care if it's a tombstone or not.
-        if (tester.isDeleted(cell))
-            return false;
-
-        if (cell.isLive(timestamp))
-            live++;
-        else
-            tombstones++;
-
-        return true;
-    }
-
-    public int live()
-    {
-        return live;
-    }
-
-    public int tombstones()
-    {
-        return tombstones;
-    }
-
-    public ColumnCounter countAll(ColumnFamily container)
-    {
-        if (container == null)
-            return this;
-
-        DeletionInfo.InOrderTester tester = container.inOrderDeletionTester();
-        for (Cell c : container)
-            count(c, tester);
-        return this;
-    }
-
-    public static class GroupByPrefix extends ColumnCounter
-    {
-        protected final CellNameType type;
-        protected final int toGroup;
-        protected CellName previous;
-
-        /**
-         * A column counter that count only 1 for all the columns sharing a
-         * given prefix of the key.
-         *
-         * @param type the type of the column name. This can be null if {@code
-         *             toGroup} is 0, otherwise it should be a composite.
-         * @param toGroup the number of composite components on which to group
-         *                column. If 0, all columns are grouped, otherwise we group
-         *                those for which the {@code toGroup} first component are equals.
-         */
-        public GroupByPrefix(long timestamp, CellNameType type, int toGroup)
-        {
-            super(timestamp);
-            this.type = type;
-            this.toGroup = toGroup;
-
-            assert toGroup == 0 || type != null;
-        }
-
-        @Override
-        public boolean count(Cell cell, DeletionInfo.InOrderTester tester)
-        {
-            if (tester.isDeleted(cell))
-                return false;
-
-            if (!cell.isLive(timestamp))
-            {
-                tombstones++;
-                return true;
-            }
-
-            if (toGroup == 0)
-            {
-                live = 1;
-                return true;
-            }
-
-            CellName current = cell.name();
-            assert current.size() >= toGroup;
-
-            if (previous != null)
-            {
-                boolean isSameGroup = previous.isStatic() == current.isStatic();
-                if (isSameGroup)
-                {
-                    for (int i = 0; i < toGroup; i++)
-                    {
-                        if (type.subtype(i).compare(previous.get(i), current.get(i)) != 0)
-                        {
-                            isSameGroup = false;
-                            break;
-                        }
-                    }
-                }
-
-                if (isSameGroup)
-                    return true;
-
-                // We want to count the static group as 1 (CQL) row only if it's the only
-                // group in the partition. So, since we have already counted it at this point,
-                // just don't count the 2nd group if there is one and the first one was static
-                if (previous.isStatic())
-                {
-                    previous = current;
-                    return true;
-                }
-            }
-
-            live++;
-            previous = current;
-
-            return true;
-        }
-    }
-
-    /**
-     * Similar to GroupByPrefix, but designed to handle counting cells in reverse order.
-     */
-    public static class GroupByPrefixReversed extends GroupByPrefix
-    {
-        public GroupByPrefixReversed(long timestamp, CellNameType type, int toGroup)
-        {
-            super(timestamp, type, toGroup);
-        }
-
-        @Override
-        public boolean count(Cell cell, DeletionInfo.InOrderTester tester)
-        {
-            if (tester.isDeleted(cell))
-                return false;
-
-            if (!cell.isLive(timestamp))
-            {
-                tombstones++;
-                return true;
-            }
-
-            if (toGroup == 0)
-            {
-                live = 1;
-                return true;
-            }
-
-            CellName current = cell.name();
-            assert current.size() >= toGroup;
-
-            if (previous == null)
-            {
-                // This is the first group we've seen.  If it happens to be static, we still want to increment the
-                // count because a) there are no-static rows (statics are always last in reversed order), and b) any
-                // static cells we see after this will not increment the count
-                previous = current;
-                live++;
-            }
-            else if (!current.isStatic())  // ignore statics if we've seen any other statics or any other groups
-            {
-                for (int i = 0; i < toGroup; i++)
-                {
-                    if (type.subtype(i).compare(previous.get(i), current.get(i)) != 0)
-                    {
-                        // it's a new group
-                        live++;
-                        previous = current;
-                        return true;
-                    }
-                }
-            }
-
-            return true;
-        }
-    }
-}