You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:48:07 UTC

[43/51] [partial] cassandra git commit: Storage engine refactor, a.k.a CASSANDRA-8099

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ClusteringPrefix.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ClusteringPrefix.java b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
new file mode 100644
index 0000000..73cedb8
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.util.*;
+
+import org.apache.cassandra.cache.IMeasurableMemory;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ * A clustering prefix is basically the unit of what a {@link ClusteringComparator} can compare.
+ * <p>
+ * It holds values for the clustering columns of a table (potentially only a prefix of all of them) and it has
+ * a "kind" that allows us to implement slices with inclusive and exclusive bounds.
+ * <p>
+ * In practice, {@code ClusteringPrefix} is just the common parts to its 2 main subtype: {@link Clustering} and
+ * {@link Slice.Bound}, where:
+ *   1) {@code Clustering} represents the clustering values for a row, i.e. the values for it's clustering columns.
+ *   2) {@code Slice.Bound} represents a bound (start or end) of a slice (of rows).
+ * See those classes for more details.
+ */
+public interface ClusteringPrefix extends Aliasable<ClusteringPrefix>, IMeasurableMemory, Clusterable
+{
+    public static final Serializer serializer = new Serializer();
+
+    /**
+     * The kind of clustering prefix this actually is.
+     *
+     * The kind {@code STATIC_CLUSTERING} is only implemented by {@link Clustering.STATIC_CLUSTERING} and {@code CLUSTERING} is
+     * implemented by the {@link Clustering} class. The rest is used by {@link Slice.Bound} and {@link RangeTombstone.Bound}.
+     */
+    public enum Kind
+    {
+        // WARNING: the ordering of that enum matters because we use ordinal() in the serialization
+
+        EXCL_END_BOUND(0, -1),
+        INCL_START_BOUND(1, -1),
+        EXCL_END_INCL_START_BOUNDARY(1, -1),
+        STATIC_CLUSTERING(2, -1),
+        CLUSTERING(3, 0),
+        INCL_END_EXCL_START_BOUNDARY(4, -1),
+        INCL_END_BOUND(4, 1),
+        EXCL_START_BOUND(5, 1);
+
+        private final int comparison;
+
+        // If clusterable c1 has this Kind and is a strict prefix of clusterable c2, then this
+        // is the result of compare(c1, c2). Basically, this is the same as comparing the kind of c1 to
+        // CLUSTERING.
+        public final int prefixComparisonResult;
+
+        private Kind(int comparison, int prefixComparisonResult)
+        {
+            this.comparison = comparison;
+            this.prefixComparisonResult = prefixComparisonResult;
+        }
+
+        /**
+         * Compares the 2 provided kind.
+         * <p>
+         * Note: this should be used instead of {@link #compareTo} when comparing clustering prefixes. We do
+         * not override that latter method because it is final for an enum.
+         */
+        public static int compare(Kind k1, Kind k2)
+        {
+            return Integer.compare(k1.comparison, k2.comparison);
+        }
+
+        /**
+         * Returns the inverse of the current kind.
+         * <p>
+         * This invert both start into end (and vice-versa) and inclusive into exclusive (and vice-versa).
+         *
+         * @return the invert of this kind. For instance, if this kind is an exlusive start, this return
+         * an inclusive end.
+         */
+        public Kind invert()
+        {
+            switch (this)
+            {
+                case EXCL_START_BOUND:              return INCL_END_BOUND;
+                case INCL_START_BOUND:              return EXCL_END_BOUND;
+                case EXCL_END_BOUND:                return INCL_START_BOUND;
+                case INCL_END_BOUND:                return EXCL_START_BOUND;
+                case EXCL_END_INCL_START_BOUNDARY:  return INCL_END_EXCL_START_BOUNDARY;
+                case INCL_END_EXCL_START_BOUNDARY:  return EXCL_END_INCL_START_BOUNDARY;
+                default:                            return this;
+            }
+        }
+
+        public boolean isBound()
+        {
+            switch (this)
+            {
+                case INCL_START_BOUND:
+                case INCL_END_BOUND:
+                case EXCL_START_BOUND:
+                case EXCL_END_BOUND:
+                    return true;
+            }
+            return false;
+        }
+
+        public boolean isBoundary()
+        {
+            switch (this)
+            {
+                case INCL_END_EXCL_START_BOUNDARY:
+                case EXCL_END_INCL_START_BOUNDARY:
+                    return true;
+            }
+            return false;
+        }
+
+        public boolean isStart()
+        {
+            switch (this)
+            {
+                case INCL_START_BOUND:
+                case EXCL_END_INCL_START_BOUNDARY:
+                case INCL_END_EXCL_START_BOUNDARY:
+                case EXCL_START_BOUND:
+                    return true;
+                default:
+                    return false;
+            }
+        }
+
+        public boolean isEnd()
+        {
+            switch (this)
+            {
+                case INCL_END_BOUND:
+                case EXCL_END_INCL_START_BOUNDARY:
+                case INCL_END_EXCL_START_BOUNDARY:
+                case EXCL_END_BOUND:
+                    return true;
+                default:
+                    return false;
+            }
+        }
+
+        public boolean isOpen(boolean reversed)
+        {
+            return reversed ? isEnd() : isStart();
+        }
+
+        public boolean isClose(boolean reversed)
+        {
+            return reversed ? isStart() : isEnd();
+        }
+
+        public Kind closeBoundOfBoundary(boolean reversed)
+        {
+            assert isBoundary();
+            return reversed
+                 ? (this == INCL_END_EXCL_START_BOUNDARY ? EXCL_START_BOUND : INCL_START_BOUND)
+                 : (this == INCL_END_EXCL_START_BOUNDARY ? INCL_END_BOUND : EXCL_END_BOUND);
+        }
+
+        public Kind openBoundOfBoundary(boolean reversed)
+        {
+            assert isBoundary();
+            return reversed
+                 ? (this == INCL_END_EXCL_START_BOUNDARY ? INCL_END_BOUND : EXCL_END_BOUND)
+                 : (this == INCL_END_EXCL_START_BOUNDARY ? EXCL_START_BOUND : INCL_START_BOUND);
+        }
+    }
+
+    public Kind kind();
+
+    /**
+     * The number of values in this prefix.
+     *
+     * There can't be more values that the this is a prefix of has of clustering columns.
+     *
+     * @return the number of values in this prefix.
+     */
+    public int size();
+
+    /**
+     * Retrieves the ith value of this prefix.
+     *
+     * @param i the index of the value to retrieve. Must be such that {@code 0 <= i < size()}.
+     *
+     * @return the ith value of this prefix. Note that a value can be {@code null}.
+     */
+    public ByteBuffer get(int i);
+
+    public void digest(MessageDigest digest);
+
+    // Used to verify if batches goes over a given size
+    public int dataSize();
+
+    public String toString(CFMetaData metadata);
+
+    public void writeTo(Writer writer);
+
+    /**
+     * The values of this prefix as an array.
+     * <p>
+     * Please note that this may or may not require an array creation. So 1) you should *not*
+     * modify the returned array and 2) it's more efficient to use {@link #size()} and
+     * {@link #get} unless you actually need an array.
+     *
+     * @return the values for this prefix as an array.
+     */
+    public ByteBuffer[] getRawValues();
+
+    /**
+     * Interface for writing a clustering prefix.
+     * <p>
+     * Each value for the prefix should simply be written in order.
+     */
+    public interface Writer
+    {
+        /**
+         * Write the next value to the writer.
+         *
+         * @param value the value to write.
+         */
+        public void writeClusteringValue(ByteBuffer value);
+    }
+
+    public static class Serializer
+    {
+        public void serialize(ClusteringPrefix clustering, DataOutputPlus out, int version, List<AbstractType<?>> types) throws IOException
+        {
+            // We shouldn't serialize static clusterings
+            assert clustering.kind() != Kind.STATIC_CLUSTERING;
+            if (clustering.kind() == Kind.CLUSTERING)
+            {
+                out.writeByte(clustering.kind().ordinal());
+                Clustering.serializer.serialize((Clustering)clustering, out, version, types);
+            }
+            else
+            {
+                Slice.Bound.serializer.serialize((Slice.Bound)clustering, out, version, types);
+            }
+        }
+
+        public ClusteringPrefix deserialize(DataInput in, int version, List<AbstractType<?>> types) throws IOException
+        {
+            Kind kind = Kind.values()[in.readByte()];
+            // We shouldn't serialize static clusterings
+            assert kind != Kind.STATIC_CLUSTERING;
+            if (kind == Kind.CLUSTERING)
+                return Clustering.serializer.deserialize(in, version, types);
+            else
+                return Slice.Bound.serializer.deserializeValues(in, kind, version, types);
+        }
+
+        public long serializedSize(ClusteringPrefix clustering, int version, List<AbstractType<?>> types, TypeSizes sizes)
+        {
+            // We shouldn't serialize static clusterings
+            assert clustering.kind() != Kind.STATIC_CLUSTERING;
+            if (clustering.kind() == Kind.CLUSTERING)
+                return 1 + Clustering.serializer.serializedSize((Clustering)clustering, version, types, sizes);
+            else
+                return Slice.Bound.serializer.serializedSize((Slice.Bound)clustering, version, types, sizes);
+        }
+
+        void serializeValuesWithoutSize(ClusteringPrefix clustering, DataOutputPlus out, int version, List<AbstractType<?>> types) throws IOException
+        {
+            if (clustering.size() == 0)
+                return;
+
+            writeHeader(clustering, out);
+            for (int i = 0; i < clustering.size(); i++)
+            {
+                ByteBuffer v = clustering.get(i);
+                if (v == null || !v.hasRemaining())
+                    continue; // handled in the header
+
+                types.get(i).writeValue(v, out);
+            }
+        }
+
+        long valuesWithoutSizeSerializedSize(ClusteringPrefix clustering, int version, List<AbstractType<?>> types, TypeSizes sizes)
+        {
+            if (clustering.size() == 0)
+                return 0;
+
+            long size = headerBytesCount(clustering.size());
+            for (int i = 0; i < clustering.size(); i++)
+            {
+                ByteBuffer v = clustering.get(i);
+                if (v == null || !v.hasRemaining())
+                    continue; // handled in the header
+
+                size += types.get(i).writtenLength(v, sizes);
+            }
+            return size;
+        }
+
+        void deserializeValuesWithoutSize(DataInput in, int size, int version, List<AbstractType<?>> types, ClusteringPrefix.Writer writer) throws IOException
+        {
+            if (size == 0)
+                return;
+
+            int[] header = readHeader(size, in);
+            for (int i = 0; i < size; i++)
+            {
+                if (isNull(header, i))
+                    writer.writeClusteringValue(null);
+                else if (isEmpty(header, i))
+                    writer.writeClusteringValue(ByteBufferUtil.EMPTY_BYTE_BUFFER);
+                else
+                    writer.writeClusteringValue(types.get(i).readValue(in));
+            }
+        }
+
+        private int headerBytesCount(int size)
+        {
+            // For each component, we store 2 bit to know if the component is empty or null (or neither).
+            // We thus handle 4 component per byte
+            return size / 4 + (size % 4 == 0 ? 0 : 1);
+        }
+
+        /**
+         * Whatever the type of a given clustering column is, its value can always be either empty or null. So we at least need to distinguish those
+         * 2 values, and because we want to be able to store fixed width values without appending their (fixed) size first, we need a way to encode
+         * empty values too. So for that, every clustering prefix includes a "header" that contains 2 bits per element in the prefix. For each element,
+         * those 2 bits encode whether the element is null, empty, or none of those.
+         */
+        private void writeHeader(ClusteringPrefix clustering, DataOutputPlus out) throws IOException
+        {
+            int nbBytes = headerBytesCount(clustering.size());
+            for (int i = 0; i < nbBytes; i++)
+            {
+                int b = 0;
+                for (int j = 0; j < 4; j++)
+                {
+                    int c = i * 4 + j;
+                    if (c >= clustering.size())
+                        break;
+
+                    ByteBuffer v = clustering.get(c);
+                    if (v == null)
+                        b |= (1 << (j * 2) + 1);
+                    else if (!v.hasRemaining())
+                        b |= (1 << (j * 2));
+                }
+                out.writeByte((byte)b);
+            }
+        }
+
+        private int[] readHeader(int size, DataInput in) throws IOException
+        {
+            int nbBytes = headerBytesCount(size);
+            int[] header = new int[nbBytes];
+            for (int i = 0; i < nbBytes; i++)
+                header[i] = in.readUnsignedByte();
+            return header;
+        }
+
+        private boolean isNull(int[] header, int i)
+        {
+            int b = header[i / 4];
+            int mask = 1 << ((i % 4) * 2) + 1;
+            return (b & mask) != 0;
+        }
+
+        private boolean isEmpty(int[] header, int i)
+        {
+            int b = header[i / 4];
+            int mask = 1 << ((i % 4) * 2);
+            return (b & mask) != 0;
+        }
+    }
+
+    /**
+     * Helper class that makes the deserialization of clustering prefixes faster.
+     * <p>
+     * The main reason for this is that when we deserialize rows from sstables, there is many cases where we have
+     * a bunch of rows to skip at the beginning of an index block because those rows are before the requested slice.
+     * This class make sure we can answer the question "is the next row on disk before the requested slice" with as
+     * little work as possible. It does that by providing a comparison method that deserialize only what is needed
+     * to decide of the comparison.
+     */
+    public static class Deserializer
+    {
+        private final ClusteringComparator comparator;
+        private final DataInput in;
+        private final SerializationHeader serializationHeader;
+
+        private boolean nextIsRow;
+        private int[] nextHeader;
+
+        private int nextSize;
+        private ClusteringPrefix.Kind nextKind;
+        private int deserializedSize;
+        private final ByteBuffer[] nextValues;
+
+        public Deserializer(ClusteringComparator comparator, DataInput in, SerializationHeader header)
+        {
+            this.comparator = comparator;
+            this.in = in;
+            this.serializationHeader = header;
+            this.nextValues = new ByteBuffer[comparator.size()];
+        }
+
+        public void prepare(int flags) throws IOException
+        {
+            assert !UnfilteredSerializer.isStatic(flags) : "Flags = " + flags;
+            this.nextIsRow = UnfilteredSerializer.kind(flags) == Unfiltered.Kind.ROW;
+            this.nextKind = nextIsRow ? Kind.CLUSTERING : ClusteringPrefix.Kind.values()[in.readByte()];
+            this.nextSize = nextIsRow ? comparator.size() : in.readUnsignedShort();
+            this.nextHeader = serializer.readHeader(nextSize, in);
+            this.deserializedSize = 0;
+        }
+
+        public int compareNextTo(Slice.Bound bound) throws IOException
+        {
+            if (bound == Slice.Bound.TOP)
+                return -1;
+
+            for (int i = 0; i < bound.size(); i++)
+            {
+                if (!hasComponent(i))
+                    return nextKind.prefixComparisonResult;
+
+                int cmp = comparator.compareComponent(i, nextValues[i], bound.get(i));
+                if (cmp != 0)
+                    return cmp;
+            }
+
+            if (bound.size() == nextSize)
+                return nextKind.compareTo(bound.kind());
+
+            // We know that we'll have exited already if nextSize < bound.size
+            return -bound.kind().prefixComparisonResult;
+        }
+
+        private boolean hasComponent(int i) throws IOException
+        {
+            if (i >= nextSize)
+                return false;
+
+            while (deserializedSize <= i)
+                deserializeOne();
+
+            return true;
+        }
+
+        private boolean deserializeOne() throws IOException
+        {
+            if (deserializedSize == nextSize)
+                return false;
+
+            int i = deserializedSize++;
+            nextValues[i] = serializer.isNull(nextHeader, i)
+                          ? null
+                          : (serializer.isEmpty(nextHeader, i) ? ByteBufferUtil.EMPTY_BYTE_BUFFER : serializationHeader.clusteringTypes().get(i).readValue(in));
+            return true;
+        }
+
+        private void deserializeAll() throws IOException
+        {
+            while (deserializeOne())
+                continue;
+        }
+
+        public RangeTombstone.Bound.Kind deserializeNextBound(RangeTombstone.Bound.Writer writer) throws IOException
+        {
+            assert !nextIsRow;
+            deserializeAll();
+            for (int i = 0; i < nextSize; i++)
+                writer.writeClusteringValue(nextValues[i]);
+            writer.writeBoundKind(nextKind);
+            return nextKind;
+        }
+
+        public void deserializeNextClustering(Clustering.Writer writer) throws IOException
+        {
+            assert nextIsRow && nextSize == nextValues.length;
+            deserializeAll();
+            for (int i = 0; i < nextSize; i++)
+                writer.writeClusteringValue(nextValues[i]);
+        }
+
+        public ClusteringPrefix.Kind skipNext() throws IOException
+        {
+            for (int i = deserializedSize; i < nextSize; i++)
+                if (!serializer.isNull(nextHeader, i) && !serializer.isEmpty(nextHeader, i))
+                    serializationHeader.clusteringTypes().get(i).skipValue(in);
+            return nextKind;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/CollationController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CollationController.java b/src/java/org/apache/cassandra/db/CollationController.java
deleted file mode 100644
index 7f6d439..0000000
--- a/src/java/org/apache/cassandra/db/CollationController.java
+++ /dev/null
@@ -1,334 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.Closeable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.TreeSet;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
-
-import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.filter.NamesQueryFilter;
-import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.marshal.CounterColumnType;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.SearchIterator;
-import org.apache.cassandra.utils.memory.HeapAllocator;
-
-public class CollationController
-{
-    private final ColumnFamilyStore cfs;
-    private final QueryFilter filter;
-    private final int gcBefore;
-
-    private int sstablesIterated = 0;
-
-    public CollationController(ColumnFamilyStore cfs, QueryFilter filter, int gcBefore)
-    {
-        this.cfs = cfs;
-        this.filter = filter;
-        this.gcBefore = gcBefore;
-    }
-
-    public ColumnFamily getTopLevelColumns(boolean copyOnHeap)
-    {
-        return filter.filter instanceof NamesQueryFilter
-               && cfs.metadata.getDefaultValidator() != CounterColumnType.instance
-               ? collectTimeOrderedData(copyOnHeap)
-               : collectAllData(copyOnHeap);
-    }
-
-    /**
-     * Collects data in order of recency, using the sstable maxtimestamp data.
-     * Once we have data for all requests columns that is newer than the newest remaining maxtimestamp,
-     * we stop.
-     */
-    private ColumnFamily collectTimeOrderedData(boolean copyOnHeap)
-    {
-        final ColumnFamily container = ArrayBackedSortedColumns.factory.create(cfs.metadata, filter.filter.isReversed());
-        List<OnDiskAtomIterator> iterators = new ArrayList<>();
-        boolean isEmpty = true;
-        Tracing.trace("Acquiring sstable references");
-        ColumnFamilyStore.ViewFragment view = cfs.select(cfs.viewFilter(filter.key));
-        DeletionInfo returnDeletionInfo = container.deletionInfo();
-
-        try
-        {
-            Tracing.trace("Merging memtable contents");
-            for (Memtable memtable : view.memtables)
-            {
-                ColumnFamily cf = memtable.getColumnFamily(filter.key);
-                if (cf != null)
-                {
-                    filter.delete(container.deletionInfo(), cf);
-                    isEmpty = false;
-                    Iterator<Cell> iter = filter.getIterator(cf);
-                    while (iter.hasNext())
-                    {
-                        Cell cell = iter.next();
-                        if (copyOnHeap)
-                            cell = cell.localCopy(cfs.metadata, HeapAllocator.instance);
-                        container.addColumn(cell);
-                    }
-                }
-            }
-
-            // avoid changing the filter columns of the original filter
-            // (reduceNameFilter removes columns that are known to be irrelevant)
-            NamesQueryFilter namesFilter = (NamesQueryFilter) filter.filter;
-            TreeSet<CellName> filterColumns = new TreeSet<>(namesFilter.columns);
-            QueryFilter reducedFilter = new QueryFilter(filter.key, filter.cfName, namesFilter.withUpdatedColumns(filterColumns), filter.timestamp);
-
-            /* add the SSTables on disk */
-            Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
-
-            // read sorted sstables
-            for (SSTableReader sstable : view.sstables)
-            {
-                // if we've already seen a row tombstone with a timestamp greater
-                // than the most recent update to this sstable, we're done, since the rest of the sstables
-                // will also be older
-                if (sstable.getMaxTimestamp() < returnDeletionInfo.getTopLevelDeletion().markedForDeleteAt)
-                    break;
-
-                long currentMaxTs = sstable.getMaxTimestamp();
-                reduceNameFilter(reducedFilter, container, currentMaxTs);
-                if (((NamesQueryFilter) reducedFilter.filter).columns.isEmpty())
-                    break;
-
-                Tracing.trace("Merging data from sstable {}", sstable.descriptor.generation);
-                sstable.incrementReadCount();
-                OnDiskAtomIterator iter = reducedFilter.getSSTableColumnIterator(sstable);
-                iterators.add(iter);
-                isEmpty = false;
-                if (iter.getColumnFamily() != null)
-                {
-                    container.delete(iter.getColumnFamily());
-                    sstablesIterated++;
-                    while (iter.hasNext())
-                        container.addAtom(iter.next());
-                }
-            }
-
-            // we need to distinguish between "there is no data at all for this row" (BF will let us rebuild that efficiently)
-            // and "there used to be data, but it's gone now" (we should cache the empty CF so we don't need to rebuild that slower)
-            if (isEmpty)
-                return null;
-
-            // do a final collate.  toCollate is boilerplate required to provide a CloseableIterator
-            ColumnFamily returnCF = container.cloneMeShallow();
-            Tracing.trace("Collating all results");
-            filter.collateOnDiskAtom(returnCF, container.iterator(), gcBefore);
-
-            // "hoist up" the requested data into a more recent sstable
-            if (sstablesIterated > cfs.getMinimumCompactionThreshold()
-                && !cfs.isAutoCompactionDisabled()
-                && cfs.getCompactionStrategyManager().shouldDefragment())
-            {
-                // !!WARNING!!   if we stop copying our data to a heap-managed object,
-                //               we will need to track the lifetime of this mutation as well
-                Tracing.trace("Defragmenting requested data");
-                final Mutation mutation = new Mutation(cfs.keyspace.getName(), filter.key.getKey(), returnCF.cloneMe());
-                StageManager.getStage(Stage.MUTATION).execute(new Runnable()
-                {
-                    public void run()
-                    {
-                        // skipping commitlog and index updates is fine since we're just de-fragmenting existing data
-                        Keyspace.open(mutation.getKeyspaceName()).apply(mutation, false, false);
-                    }
-                });
-            }
-
-            // Caller is responsible for final removeDeletedCF.  This is important for cacheRow to work correctly:
-            return returnCF;
-        }
-        finally
-        {
-            for (OnDiskAtomIterator iter : iterators)
-                FileUtils.closeQuietly(iter);
-        }
-    }
-
-    /**
-     * remove columns from @param filter where we already have data in @param container newer than @param sstableTimestamp
-     */
-    private void reduceNameFilter(QueryFilter filter, ColumnFamily container, long sstableTimestamp)
-    {
-        if (container == null)
-            return;
-
-        SearchIterator<CellName, Cell> searchIter = container.searchIterator();
-        for (Iterator<CellName> iterator = ((NamesQueryFilter) filter.filter).columns.iterator(); iterator.hasNext() && searchIter.hasNext(); )
-        {
-            CellName filterColumn = iterator.next();
-            Cell cell = searchIter.next(filterColumn);
-            if (cell != null && cell.timestamp() > sstableTimestamp)
-                iterator.remove();
-        }
-    }
-
-    /**
-     * Collects data the brute-force way: gets an iterator for the filter in question
-     * from every memtable and sstable, then merges them together.
-     */
-    private ColumnFamily collectAllData(boolean copyOnHeap)
-    {
-        Tracing.trace("Acquiring sstable references");
-        ColumnFamilyStore.ViewFragment view = cfs.select(cfs.viewFilter(filter.key));
-        List<Iterator<? extends OnDiskAtom>> iterators = new ArrayList<>(Iterables.size(view.memtables) + view.sstables.size());
-        ColumnFamily returnCF = ArrayBackedSortedColumns.factory.create(cfs.metadata, filter.filter.isReversed());
-        DeletionInfo returnDeletionInfo = returnCF.deletionInfo();
-        try
-        {
-            Tracing.trace("Merging memtable tombstones");
-            for (Memtable memtable : view.memtables)
-            {
-                final ColumnFamily cf = memtable.getColumnFamily(filter.key);
-                if (cf != null)
-                {
-                    filter.delete(returnDeletionInfo, cf);
-                    Iterator<Cell> iter = filter.getIterator(cf);
-                    if (copyOnHeap)
-                    {
-                        iter = Iterators.transform(iter, new Function<Cell, Cell>()
-                        {
-                            public Cell apply(Cell cell)
-                            {
-                                return cell.localCopy(cf.metadata, HeapAllocator.instance);
-                            }
-                        });
-                    }
-                    iterators.add(iter);
-                }
-            }
-
-            /*
-             * We can't eliminate full sstables based on the timestamp of what we've already read like
-             * in collectTimeOrderedData, but we still want to eliminate sstable whose maxTimestamp < mostRecentTombstone
-             * we've read. We still rely on the sstable ordering by maxTimestamp since if
-             *   maxTimestamp_s1 > maxTimestamp_s0,
-             * we're guaranteed that s1 cannot have a row tombstone such that
-             *   timestamp(tombstone) > maxTimestamp_s0
-             * since we necessarily have
-             *   timestamp(tombstone) <= maxTimestamp_s1
-             * In other words, iterating in maxTimestamp order allow to do our mostRecentTombstone elimination
-             * in one pass, and minimize the number of sstables for which we read a rowTombstone.
-             */
-            Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
-            List<SSTableReader> skippedSSTables = null;
-            long minTimestamp = Long.MAX_VALUE;
-            int nonIntersectingSSTables = 0;
-
-            for (SSTableReader sstable : view.sstables)
-            {
-                minTimestamp = Math.min(minTimestamp, sstable.getMinTimestamp());
-                // if we've already seen a row tombstone with a timestamp greater
-                // than the most recent update to this sstable, we can skip it
-                if (sstable.getMaxTimestamp() < returnDeletionInfo.getTopLevelDeletion().markedForDeleteAt)
-                    break;
-
-                if (!filter.shouldInclude(sstable))
-                {
-                    nonIntersectingSSTables++;
-                    // sstable contains no tombstone if maxLocalDeletionTime == Integer.MAX_VALUE, so we can safely skip those entirely
-                    if (sstable.getSSTableMetadata().maxLocalDeletionTime != Integer.MAX_VALUE)
-                    {
-                        if (skippedSSTables == null)
-                            skippedSSTables = new ArrayList<>();
-                        skippedSSTables.add(sstable);
-                    }
-                    continue;
-                }
-
-                sstable.incrementReadCount();
-                OnDiskAtomIterator iter = filter.getSSTableColumnIterator(sstable);
-                iterators.add(iter);
-                if (iter.getColumnFamily() != null)
-                {
-                    ColumnFamily cf = iter.getColumnFamily();
-                    returnCF.delete(cf);
-                    sstablesIterated++;
-                }
-            }
-
-            int includedDueToTombstones = 0;
-            // Check for row tombstone in the skipped sstables
-            if (skippedSSTables != null)
-            {
-                for (SSTableReader sstable : skippedSSTables)
-                {
-                    if (sstable.getMaxTimestamp() <= minTimestamp)
-                        continue;
-
-                    sstable.incrementReadCount();
-                    OnDiskAtomIterator iter = filter.getSSTableColumnIterator(sstable);
-                    ColumnFamily cf = iter.getColumnFamily();
-                    // we are only interested in row-level tombstones here, and only if markedForDeleteAt is larger than minTimestamp
-                    if (cf != null && cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt > minTimestamp)
-                    {
-                        includedDueToTombstones++;
-                        iterators.add(iter);
-                        returnCF.delete(cf.deletionInfo().getTopLevelDeletion());
-                        sstablesIterated++;
-                    }
-                    else
-                    {
-                        FileUtils.closeQuietly(iter);
-                    }
-                }
-            }
-
-            if (Tracing.isTracing())
-                Tracing.trace("Skipped {}/{} non-slice-intersecting sstables, included {} due to tombstones",
-                              nonIntersectingSSTables, view.sstables.size(), includedDueToTombstones);
-
-            // we need to distinguish between "there is no data at all for this row" (BF will let us rebuild that efficiently)
-            // and "there used to be data, but it's gone now" (we should cache the empty CF so we don't need to rebuild that slower)
-            if (iterators.isEmpty())
-                return null;
-
-            Tracing.trace("Merging data from memtables and {} sstables", sstablesIterated);
-            filter.collateOnDiskAtom(returnCF, iterators, gcBefore);
-
-            // Caller is responsible for final removeDeletedCF.  This is important for cacheRow to work correctly:
-            return returnCF;
-        }
-        finally
-        {
-            for (Object iter : iterators)
-                if (iter instanceof Closeable)
-                    FileUtils.closeQuietly((Closeable) iter);
-        }
-    }
-
-    public int getSstablesIterated()
-    {
-        return sstablesIterated;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java
deleted file mode 100644
index a7243a2..0000000
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ /dev/null
@@ -1,565 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import com.google.common.collect.ImmutableMap;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
-import org.apache.cassandra.cache.IRowCacheEntry;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.composites.CellNames;
-import org.apache.cassandra.db.filter.ColumnCounter;
-import org.apache.cassandra.db.filter.ColumnSlice;
-import org.apache.cassandra.io.sstable.ColumnNameHelper;
-import org.apache.cassandra.io.sstable.ColumnStats;
-import org.apache.cassandra.io.sstable.SSTable;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.*;
-
-/**
- * A sorted map of columns.
- * This represents the backing map of a colum family.
- *
- * Whether the implementation is thread safe or not is left to the
- * implementing classes.
- */
-public abstract class ColumnFamily implements Iterable<Cell>, IRowCacheEntry
-{
-    /* The column serializer for this Column Family. Create based on config. */
-    public static final ColumnFamilySerializer serializer = new ColumnFamilySerializer();
-
-    protected final CFMetaData metadata;
-
-    protected ColumnFamily(CFMetaData metadata)
-    {
-        assert metadata != null;
-        this.metadata = metadata;
-    }
-
-    public <T extends ColumnFamily> T cloneMeShallow(ColumnFamily.Factory<T> factory, boolean reversedInsertOrder)
-    {
-        T cf = factory.create(metadata, reversedInsertOrder);
-        cf.delete(this);
-        return cf;
-    }
-
-    public ColumnFamily cloneMeShallow()
-    {
-        return cloneMeShallow(false);
-    }
-
-    public ColumnFamily cloneMeShallow(boolean reversed)
-    {
-        return cloneMeShallow(getFactory(), reversed);
-    }
-
-    public ColumnFamilyType getType()
-    {
-        return metadata.cfType;
-    }
-
-    public int liveCQL3RowCount(long now)
-    {
-        ColumnCounter counter = getComparator().isDense()
-                              ? new ColumnCounter(now)
-                              : new ColumnCounter.GroupByPrefix(now, getComparator(), metadata.clusteringColumns().size());
-        return counter.countAll(this).live();
-    }
-
-    /**
-     * Clones the column map.
-     */
-    public abstract ColumnFamily cloneMe();
-
-    public UUID id()
-    {
-        return metadata.cfId;
-    }
-
-    /**
-     * @return The CFMetaData for this row
-     */
-    public CFMetaData metadata()
-    {
-        return metadata;
-    }
-
-    public void addColumn(CellName name, ByteBuffer value, long timestamp)
-    {
-        addColumn(name, value, timestamp, 0);
-    }
-
-    public void addColumn(CellName name, ByteBuffer value, long timestamp, int timeToLive)
-    {
-        assert !metadata().isCounter();
-        Cell cell = AbstractCell.create(name, value, timestamp, timeToLive, metadata());
-        addColumn(cell);
-    }
-
-    public void addCounter(CellName name, long value)
-    {
-        addColumn(new BufferCounterUpdateCell(name, value, FBUtilities.timestampMicros()));
-    }
-
-    public void addTombstone(CellName name, ByteBuffer localDeletionTime, long timestamp)
-    {
-        addColumn(new BufferDeletedCell(name, localDeletionTime, timestamp));
-    }
-
-    public void addTombstone(CellName name, int localDeletionTime, long timestamp)
-    {
-        addColumn(new BufferDeletedCell(name, localDeletionTime, timestamp));
-    }
-
-    public void addAtom(OnDiskAtom atom)
-    {
-        if (atom instanceof Cell)
-        {
-            addColumn((Cell)atom);
-        }
-        else
-        {
-            assert atom instanceof RangeTombstone;
-            delete((RangeTombstone)atom);
-        }
-    }
-
-    /**
-     * Clear this column family, removing all columns and deletion info.
-     */
-    public abstract void clear();
-
-    /**
-     * Returns a {@link DeletionInfo.InOrderTester} for the deletionInfo() of
-     * this column family. Please note that for ThreadSafe implementation of ColumnFamily,
-     * this tester will remain valid even if new tombstones are added to this ColumnFamily
-     * *as long as said addition is done in comparator order*. For AtomicSortedColumns,
-     * the tester will correspond to the state of when this method is called.
-     */
-    public DeletionInfo.InOrderTester inOrderDeletionTester()
-    {
-        return deletionInfo().inOrderTester();
-    }
-
-    /**
-     * Returns the factory used for this ISortedColumns implementation.
-     */
-    public abstract Factory getFactory();
-
-    public abstract DeletionInfo deletionInfo();
-    public abstract void setDeletionInfo(DeletionInfo info);
-
-    public abstract void delete(DeletionInfo info);
-    public abstract void delete(DeletionTime deletionTime);
-    protected abstract void delete(RangeTombstone tombstone);
-
-    public abstract SearchIterator<CellName, Cell> searchIterator();
-
-    /**
-     * Purges top-level and range tombstones whose localDeletionTime is older than gcBefore.
-     * @param gcBefore a timestamp (in seconds) before which tombstones should be purged
-     */
-    public abstract void purgeTombstones(int gcBefore);
-
-    /**
-     * Adds a cell to this cell map.
-     * If a cell with the same name is already present in the map, it will
-     * be replaced by the newly added cell.
-     */
-    public abstract void addColumn(Cell cell);
-
-    /**
-     * Adds a cell if it's non-gc-able and isn't shadowed by a partition/range tombstone with a higher timestamp.
-     * Requires that the cell to add is sorted strictly after the last cell in the container.
-     */
-    public abstract void maybeAppendColumn(Cell cell, DeletionInfo.InOrderTester tester, int gcBefore);
-
-    /**
-     * Appends a cell. Requires that the cell to add is sorted strictly after the last cell in the container.
-     */
-    public abstract void appendColumn(Cell cell);
-
-    /**
-     * Adds all the columns of a given column map to this column map.
-     * This is equivalent to:
-     *   <code>
-     *   for (Cell c : cm)
-     *      addColumn(c, ...);
-     *   </code>
-     *  but is potentially faster.
-     */
-    public abstract void addAll(ColumnFamily cm);
-
-    /**
-     * Get a column given its name, returning null if the column is not
-     * present.
-     */
-    public abstract Cell getColumn(CellName name);
-
-    /**
-     * Returns an iterable with the names of columns in this column map in the same order
-     * as the underlying columns themselves.
-     */
-    public abstract Iterable<CellName> getColumnNames();
-
-    /**
-     * Returns the columns of this column map as a collection.
-     * The columns in the returned collection should be sorted as the columns
-     * in this map.
-     */
-    public abstract Collection<Cell> getSortedColumns();
-
-    /**
-     * Returns the columns of this column map as a collection.
-     * The columns in the returned collection should be sorted in reverse
-     * order of the columns in this map.
-     */
-    public abstract Collection<Cell> getReverseSortedColumns();
-
-    /**
-     * Returns the number of columns in this map.
-     */
-    public abstract int getColumnCount();
-
-    /**
-     * Returns whether or not there are any columns present.
-     */
-    public abstract boolean hasColumns();
-
-    /**
-     * Returns true if this contains no columns or deletion info
-     */
-    public boolean isEmpty()
-    {
-        return deletionInfo().isLive() && !hasColumns();
-    }
-
-    /**
-     * Returns an iterator over the columns of this map that returns only the matching @param slices.
-     * The provided slices must be in order and must be non-overlapping.
-     */
-    public abstract Iterator<Cell> iterator(ColumnSlice[] slices);
-
-    /**
-     * Returns a reversed iterator over the columns of this map that returns only the matching @param slices.
-     * The provided slices must be in reversed order and must be non-overlapping.
-     */
-    public abstract Iterator<Cell> reverseIterator(ColumnSlice[] slices);
-
-    /**
-     * Returns if this map only support inserts in reverse order.
-     */
-    public abstract boolean isInsertReversed();
-
-    /**
-     * If `columns` has any tombstones (top-level or range tombstones), they will be applied to this set of columns.
-     */
-    public void delete(ColumnFamily columns)
-    {
-        delete(columns.deletionInfo());
-    }
-
-    /*
-     * This function will calculate the difference between 2 column families.
-     * The external input is assumed to be a superset of internal.
-     */
-    public ColumnFamily diff(ColumnFamily cfComposite)
-    {
-        assert cfComposite.id().equals(id());
-        ColumnFamily cfDiff = ArrayBackedSortedColumns.factory.create(metadata);
-        cfDiff.delete(cfComposite.deletionInfo());
-
-        // (don't need to worry about cfNew containing Columns that are shadowed by
-        // the delete tombstone, since cfNew was generated by CF.resolve, which
-        // takes care of those for us.)
-        for (Cell cellExternal : cfComposite)
-        {
-            CellName cName = cellExternal.name();
-            Cell cellInternal = getColumn(cName);
-            if (cellInternal == null)
-            {
-                cfDiff.addColumn(cellExternal);
-            }
-            else
-            {
-                Cell cellDiff = cellInternal.diff(cellExternal);
-                if (cellDiff != null)
-                {
-                    cfDiff.addColumn(cellDiff);
-                }
-            }
-        }
-
-        cfDiff.setDeletionInfo(deletionInfo().diff(cfComposite.deletionInfo()));
-
-        if (!cfDiff.isEmpty())
-            return cfDiff;
-        
-        return null;
-    }
-
-    public long dataSize()
-    {
-        long size = 0;
-        for (Cell cell : this)
-            size += cell.cellDataSize();
-        return size;
-    }
-
-    public long maxTimestamp()
-    {
-        long maxTimestamp = deletionInfo().maxTimestamp();
-        for (Cell cell : this)
-            maxTimestamp = Math.max(maxTimestamp, cell.timestamp());
-        return maxTimestamp;
-    }
-
-    @Override
-    public int hashCode()
-    {
-        HashCodeBuilder builder = new HashCodeBuilder(373, 75437)
-                .append(metadata)
-                .append(deletionInfo());
-        for (Cell cell : this)
-            builder.append(cell);
-        return builder.toHashCode();
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-        if (this == o)
-            return true;
-        if (o == null || !(o instanceof ColumnFamily))
-            return false;
-
-        ColumnFamily comparison = (ColumnFamily) o;
-
-        return metadata.equals(comparison.metadata)
-               && deletionInfo().equals(comparison.deletionInfo())
-               && ByteBufferUtil.compareUnsigned(digest(this), digest(comparison)) == 0;
-    }
-
-    @Override
-    public String toString()
-    {
-        StringBuilder sb = new StringBuilder("ColumnFamily(");
-        sb.append(metadata.cfName);
-
-        if (isMarkedForDelete())
-            sb.append(" -").append(deletionInfo()).append("-");
-
-        sb.append(" [").append(CellNames.getColumnsString(getComparator(), this)).append("])");
-        return sb.toString();
-    }
-
-    public static ByteBuffer digest(ColumnFamily cf)
-    {
-        MessageDigest digest = FBUtilities.threadLocalMD5Digest();
-        if (cf != null)
-            cf.updateDigest(digest);
-        return ByteBuffer.wrap(digest.digest());
-    }
-
-    public void updateDigest(MessageDigest digest)
-    {
-        for (Cell cell : this)
-            cell.updateDigest(digest);
-
-        deletionInfo().updateDigest(digest);
-    }
-
-    public static ColumnFamily diff(ColumnFamily cf1, ColumnFamily cf2)
-    {
-        if (cf1 == null)
-            return cf2;
-        return cf1.diff(cf2);
-    }
-
-    public ColumnStats getColumnStats()
-    {
-        // note that we default to MIN_VALUE/MAX_VALUE here to be able to override them later in this method
-        // we are checking row/range tombstones and actual cells - there should always be data that overrides
-        // these with actual values
-        ColumnStats.MinLongTracker minTimestampTracker = new ColumnStats.MinLongTracker(Long.MIN_VALUE);
-        ColumnStats.MaxLongTracker maxTimestampTracker = new ColumnStats.MaxLongTracker(Long.MAX_VALUE);
-        StreamingHistogram tombstones = new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE);
-        ColumnStats.MaxIntTracker maxDeletionTimeTracker = new ColumnStats.MaxIntTracker(Integer.MAX_VALUE);
-        List<ByteBuffer> minColumnNamesSeen = Collections.emptyList();
-        List<ByteBuffer> maxColumnNamesSeen = Collections.emptyList();
-        boolean hasLegacyCounterShards = false;
-
-        if (deletionInfo().getTopLevelDeletion().localDeletionTime < Integer.MAX_VALUE)
-        {
-            tombstones.update(deletionInfo().getTopLevelDeletion().localDeletionTime);
-            maxDeletionTimeTracker.update(deletionInfo().getTopLevelDeletion().localDeletionTime);
-            minTimestampTracker.update(deletionInfo().getTopLevelDeletion().markedForDeleteAt);
-            maxTimestampTracker.update(deletionInfo().getTopLevelDeletion().markedForDeleteAt);
-        }
-        Iterator<RangeTombstone> it = deletionInfo().rangeIterator();
-        while (it.hasNext())
-        {
-            RangeTombstone rangeTombstone = it.next();
-            tombstones.update(rangeTombstone.getLocalDeletionTime());
-            minTimestampTracker.update(rangeTombstone.timestamp());
-            maxTimestampTracker.update(rangeTombstone.timestamp());
-            maxDeletionTimeTracker.update(rangeTombstone.getLocalDeletionTime());
-            minColumnNamesSeen = ColumnNameHelper.minComponents(minColumnNamesSeen, rangeTombstone.min, metadata.comparator);
-            maxColumnNamesSeen = ColumnNameHelper.maxComponents(maxColumnNamesSeen, rangeTombstone.max, metadata.comparator);
-        }
-
-        for (Cell cell : this)
-        {
-            minTimestampTracker.update(cell.timestamp());
-            maxTimestampTracker.update(cell.timestamp());
-            maxDeletionTimeTracker.update(cell.getLocalDeletionTime());
-
-            int deletionTime = cell.getLocalDeletionTime();
-            if (deletionTime < Integer.MAX_VALUE)
-                tombstones.update(deletionTime);
-            minColumnNamesSeen = ColumnNameHelper.minComponents(minColumnNamesSeen, cell.name(), metadata.comparator);
-            maxColumnNamesSeen = ColumnNameHelper.maxComponents(maxColumnNamesSeen, cell.name(), metadata.comparator);
-            if (cell instanceof CounterCell)
-                hasLegacyCounterShards = hasLegacyCounterShards || ((CounterCell) cell).hasLegacyShards();
-        }
-        return new ColumnStats(getColumnCount(),
-                               minTimestampTracker.get(),
-                               maxTimestampTracker.get(),
-                               maxDeletionTimeTracker.get(),
-                               tombstones,
-                               minColumnNamesSeen,
-                               maxColumnNamesSeen,
-                               hasLegacyCounterShards);
-    }
-
-    public boolean isMarkedForDelete()
-    {
-        return !deletionInfo().isLive();
-    }
-
-    /**
-     * @return the comparator whose sorting order the contained columns conform to
-     */
-    public CellNameType getComparator()
-    {
-        return metadata.comparator;
-    }
-
-    public boolean hasOnlyTombstones(long now)
-    {
-        for (Cell cell : this)
-            if (cell.isLive(now))
-                return false;
-        return true;
-    }
-
-    public Iterator<Cell> iterator()
-    {
-        return getSortedColumns().iterator();
-    }
-
-    public Iterator<Cell> reverseIterator()
-    {
-        return getReverseSortedColumns().iterator();
-    }
-
-    public Map<CellName, ByteBuffer> asMap()
-    {
-        ImmutableMap.Builder<CellName, ByteBuffer> builder = ImmutableMap.builder();
-        for (Cell cell : this)
-            builder.put(cell.name(), cell.value());
-        return builder.build();
-    }
-
-    public static ColumnFamily fromBytes(ByteBuffer bytes)
-    {
-        if (bytes == null)
-            return null;
-
-        try
-        {
-            return serializer.deserialize(new DataInputStream(ByteBufferUtil.inputStream(bytes)),
-                                                              ArrayBackedSortedColumns.factory,
-                                                              ColumnSerializer.Flag.LOCAL,
-                                                              MessagingService.current_version);
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public ByteBuffer toBytes()
-    {
-        try (DataOutputBuffer out = new DataOutputBuffer())
-        {
-            serializer.serialize(this, out, MessagingService.current_version);
-            return ByteBuffer.wrap(out.getData(), 0, out.getLength());
-        }
-    }
-
-
-    /**
-     * @return an iterator where the removes are carried out once everything has been iterated
-     */
-    public abstract BatchRemoveIterator<Cell> batchRemoveIterator();
-
-    public abstract static class Factory <T extends ColumnFamily>
-    {
-        /**
-         * Returns a (initially empty) column map whose columns are sorted
-         * according to the provided comparator.
-         * The {@code insertReversed} flag is an hint on how we expect insertion to be perfomed,
-         * either in sorted or reverse sorted order. This is used by ArrayBackedSortedColumns to
-         * allow optimizing for both forward and reversed slices. This does not matter for ThreadSafeSortedColumns.
-         * Note that this is only an hint on how we expect to do insertion, this does not change the map sorting.
-         */
-        public abstract T create(CFMetaData metadata, boolean insertReversed, int initialCapacity);
-
-        public T create(CFMetaData metadata, boolean insertReversed)
-        {
-            return create(metadata, insertReversed, 0);
-        }
-
-        public T create(CFMetaData metadata)
-        {
-            return create(metadata, false);
-        }
-
-        public T create(String keyspace, String cfName)
-        {
-            return create(Schema.instance.getCFMetaData(keyspace, cfName));
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java b/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
deleted file mode 100644
index 928c21f..0000000
--- a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.util.UUID;
-
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.io.ISSTableSerializer;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.sstable.format.Version;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.UUIDSerializer;
-
-public class ColumnFamilySerializer implements IVersionedSerializer<ColumnFamily>, ISSTableSerializer<ColumnFamily>
-{
-    /*
-     * Serialized ColumnFamily format:
-     *
-     * [serialized for intra-node writes only, e.g. returning a query result]
-     * <cf nullability boolean: false if the cf is null>
-     * <cf id>
-     *
-     * [in sstable only]
-     * <column bloom filter>
-     * <sparse column index, start/finish columns every ColumnIndexSizeInKB of data>
-     *
-     * [always present]
-     * <local deletion time>
-     * <client-provided deletion time>
-     * <column count>
-     * <columns, serialized individually>
-    */
-    public void serialize(ColumnFamily cf, DataOutputPlus out, int version)
-    {
-        try
-        {
-            if (cf == null)
-            {
-                out.writeBoolean(false);
-                return;
-            }
-
-            out.writeBoolean(true);
-            serializeCfId(cf.id(), out, version);
-            cf.getComparator().deletionInfoSerializer().serialize(cf.deletionInfo(), out, version);
-            ColumnSerializer columnSerializer = cf.getComparator().columnSerializer();
-            int count = cf.getColumnCount();
-            out.writeInt(count);
-            int written = 0;
-            for (Cell cell : cf)
-            {
-                columnSerializer.serialize(cell, out);
-                written++;
-            }
-            assert count == written: "Table had " + count + " columns, but " + written + " written";
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public ColumnFamily deserialize(DataInput in, int version) throws IOException
-    {
-        return deserialize(in, ColumnSerializer.Flag.LOCAL, version);
-    }
-
-    public ColumnFamily deserialize(DataInput in, ColumnSerializer.Flag flag, int version) throws IOException
-    {
-        return deserialize(in, ArrayBackedSortedColumns.factory, flag, version);
-    }
-
-    public ColumnFamily deserialize(DataInput in, ColumnFamily.Factory factory, ColumnSerializer.Flag flag, int version) throws IOException
-    {
-        if (!in.readBoolean())
-            return null;
-
-        ColumnFamily cf = factory.create(Schema.instance.getCFMetaData(deserializeCfId(in, version)));
-
-        if (cf.metadata().isSuper() && version < MessagingService.VERSION_20)
-        {
-            SuperColumns.deserializerSuperColumnFamily(in, cf, flag, version);
-        }
-        else
-        {
-            cf.delete(cf.getComparator().deletionInfoSerializer().deserialize(in, version));
-
-            ColumnSerializer columnSerializer = cf.getComparator().columnSerializer();
-            int size = in.readInt();
-            for (int i = 0; i < size; ++i)
-                cf.addColumn(columnSerializer.deserialize(in, flag));
-        }
-        return cf;
-    }
-
-    public long contentSerializedSize(ColumnFamily cf, TypeSizes typeSizes, int version)
-    {
-        long size = cf.getComparator().deletionInfoSerializer().serializedSize(cf.deletionInfo(), typeSizes, version);
-        size += typeSizes.sizeof(cf.getColumnCount());
-        ColumnSerializer columnSerializer = cf.getComparator().columnSerializer();
-        for (Cell cell : cf)
-            size += columnSerializer.serializedSize(cell, typeSizes);
-        return size;
-    }
-
-    public long serializedSize(ColumnFamily cf, TypeSizes typeSizes, int version)
-    {
-        if (cf == null)
-        {
-            return typeSizes.sizeof(false);
-        }
-        else
-        {
-            return typeSizes.sizeof(true)  /* nullness bool */
-                 + cfIdSerializedSize(cf.id(), typeSizes, version)  /* id */
-                 + contentSerializedSize(cf, typeSizes, version);
-        }
-    }
-
-    public long serializedSize(ColumnFamily cf, int version)
-    {
-        return serializedSize(cf, TypeSizes.NATIVE, version);
-    }
-
-    public void serializeForSSTable(ColumnFamily cf, DataOutputPlus out)
-    {
-        // Column families shouldn't be written directly to disk, use ColumnIndex.Builder instead
-        throw new UnsupportedOperationException();
-    }
-
-    public ColumnFamily deserializeFromSSTable(DataInput in, Version version)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public void serializeCfId(UUID cfId, DataOutputPlus out, int version) throws IOException
-    {
-        UUIDSerializer.serializer.serialize(cfId, out, version);
-    }
-
-    public UUID deserializeCfId(DataInput in, int version) throws IOException
-    {
-        UUID cfId = UUIDSerializer.serializer.deserialize(in, version);
-        if (Schema.instance.getCF(cfId) == null)
-            throw new UnknownColumnFamilyException("Couldn't find cfId=" + cfId, cfId);
-
-        return cfId;
-    }
-
-    public int cfIdSerializedSize(UUID cfId, TypeSizes typeSizes, int version)
-    {
-        return typeSizes.sizeof(cfId);
-    }
-}