You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:48:06 UTC

[42/51] [partial] cassandra git commit: Storage engine refactor, a.k.a CASSANDRA-8099

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index bed5d39..8fb83af 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -47,19 +47,14 @@ import org.apache.cassandra.cache.*;
 import org.apache.cassandra.concurrent.*;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.config.CFMetaData.SpeculativeRetry;
+import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.compaction.*;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.filter.ColumnSlice;
-import org.apache.cassandra.db.filter.ExtendedFilter;
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.filter.SliceQueryFilter;
+import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.exceptions.ConfigurationException;
@@ -76,7 +71,6 @@ import org.apache.cassandra.metrics.ColumnFamilyMetrics.Sampler;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.StreamLockfile;
-import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.concurrent.*;
 import org.apache.cassandra.utils.TopKSampler.SamplerResult;
@@ -592,12 +586,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         {
             if (def.isIndexed())
             {
-                CellNameType indexComparator = SecondaryIndex.getIndexComparator(metadata, def);
-                if (indexComparator != null)
-                {
-                    CFMetaData indexMetadata = CFMetaData.newIndexMetadata(metadata, def, indexComparator);
+                CFMetaData indexMetadata = SecondaryIndex.newIndexMetadata(metadata, def);
+                if (indexMetadata != null)
                     scrubDataDirectories(indexMetadata);
-                }
             }
         }
     }
@@ -1220,7 +1211,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             return;
 
         RowCacheKey cacheKey = new RowCacheKey(metadata.cfId, key);
-        invalidateCachedRow(cacheKey);
+        invalidateCachedPartition(cacheKey);
     }
 
     /**
@@ -1230,11 +1221,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      * param @ key - key for update/insert
      * param @ columnFamily - columnFamily changes
      */
-    public void apply(DecoratedKey key, ColumnFamily columnFamily, SecondaryIndexManager.Updater indexer, OpOrder.Group opGroup, ReplayPosition replayPosition)
+    public void apply(PartitionUpdate update, SecondaryIndexManager.Updater indexer, OpOrder.Group opGroup, ReplayPosition replayPosition)
     {
         long start = System.nanoTime();
         Memtable mt = data.getMemtableFor(opGroup, replayPosition);
-        final long timeDelta = mt.put(key, columnFamily, indexer, opGroup);
+        long timeDelta = mt.put(update, indexer, opGroup);
+        DecoratedKey key = update.partitionKey();
         maybeUpdateRowCache(key);
         metric.samplers.get(Sampler.WRITES).addSample(key.getKey(), key.hashCode(), 1);
         metric.writeLatency.addNano(System.nanoTime() - start);
@@ -1243,93 +1235,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     }
 
     /**
-     * Purges gc-able top-level and range tombstones, returning `cf` if there are any columns or tombstones left,
-     * null otherwise.
-     * @param gcBefore a timestamp (in seconds); tombstones with a localDeletionTime before this will be purged
-     */
-    public static ColumnFamily removeDeletedCF(ColumnFamily cf, int gcBefore)
-    {
-        // purge old top-level and range tombstones
-        cf.purgeTombstones(gcBefore);
-
-        // if there are no columns or tombstones left, return null
-        return !cf.hasColumns() && !cf.isMarkedForDelete() ? null : cf;
-    }
-
-    /**
-     * Removes deleted columns and purges gc-able tombstones.
-     * @return an updated `cf` if any columns or tombstones remain, null otherwise
-     */
-    public static ColumnFamily removeDeleted(ColumnFamily cf, int gcBefore)
-    {
-        return removeDeleted(cf, gcBefore, SecondaryIndexManager.nullUpdater);
-    }
-
-    /*
-     This is complicated because we need to preserve deleted columns and columnfamilies
-     until they have been deleted for at least GC_GRACE_IN_SECONDS.  But, we do not need to preserve
-     their contents; just the object itself as a "tombstone" that can be used to repair other
-     replicas that do not know about the deletion.
-     */
-    public static ColumnFamily removeDeleted(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer)
-    {
-        if (cf == null)
-        {
-            return null;
-        }
-
-        return removeDeletedCF(removeDeletedColumnsOnly(cf, gcBefore, indexer), gcBefore);
-    }
-
-    /**
-     * Removes only per-cell tombstones, cells that are shadowed by a row-level or range tombstone, or
-     * columns that have been dropped from the schema (for CQL3 tables only).
-     * @return the updated ColumnFamily
-     */
-    public static ColumnFamily removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer)
-    {
-        BatchRemoveIterator<Cell> iter = cf.batchRemoveIterator();
-        DeletionInfo.InOrderTester tester = cf.inOrderDeletionTester();
-        boolean hasDroppedColumns = !cf.metadata.getDroppedColumns().isEmpty();
-        while (iter.hasNext())
-        {
-            Cell c = iter.next();
-            // remove columns if
-            // (a) the column itself is gcable or
-            // (b) the column is shadowed by a CF tombstone
-            // (c) the column has been dropped from the CF schema (CQL3 tables only)
-            if (c.getLocalDeletionTime() < gcBefore || tester.isDeleted(c) || (hasDroppedColumns && isDroppedColumn(c, cf.metadata())))
-            {
-                iter.remove();
-                indexer.remove(c);
-            }
-        }
-        iter.commit();
-        return cf;
-    }
-
-    // returns true if
-    // 1. this column has been dropped from schema and
-    // 2. if it has been re-added since then, this particular column was inserted before the last drop
-    private static boolean isDroppedColumn(Cell c, CFMetaData meta)
-    {
-        Long droppedAt = meta.getDroppedColumns().get(c.name().cql3ColumnName(meta));
-        return droppedAt != null && c.timestamp() <= droppedAt;
-    }
-
-    private void removeDroppedColumns(ColumnFamily cf)
-    {
-        if (cf == null || cf.metadata.getDroppedColumns().isEmpty())
-            return;
-
-        BatchRemoveIterator<Cell> iter = cf.batchRemoveIterator();
-        while (iter.hasNext())
-            if (isDroppedColumn(iter.next(), metadata))
-                iter.remove();
-        iter.commit();
-    }
-
-    /**
      * @param sstables
      * @return sstables whose key range overlaps with that of the given sstables, not including itself.
      * (The given sstables may or may not overlap with each other.)
@@ -1348,7 +1253,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         Set<SSTableReader> results = null;
         for (SSTableReader sstable : sstables)
         {
-            Set<SSTableReader> overlaps = ImmutableSet.copyOf(tree.search(Interval.<RowPosition, SSTableReader>create(sstable.first, sstable.last)));
+            Set<SSTableReader> overlaps = ImmutableSet.copyOf(tree.search(Interval.<PartitionPosition, SSTableReader>create(sstable.first, sstable.last)));
             results = results == null ? overlaps : Sets.union(results, overlaps).immutableCopy();
         }
         results = Sets.difference(results, ImmutableSet.copyOf(sstables));
@@ -1532,9 +1437,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return valid;
     }
 
-
-
-
     /**
      * Package protected for access from the CompactionManager.
      */
@@ -1553,249 +1455,30 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return data.getUncompacting();
     }
 
-    public ColumnFamily getColumnFamily(DecoratedKey key,
-                                        Composite start,
-                                        Composite finish,
-                                        boolean reversed,
-                                        int limit,
-                                        long timestamp)
-    {
-        return getColumnFamily(QueryFilter.getSliceFilter(key, name, start, finish, reversed, limit, timestamp));
-    }
-
-    /**
-     * Fetch the row and columns given by filter.key if it is in the cache; if not, read it from disk and cache it
-     *
-     * If row is cached, and the filter given is within its bounds, we return from cache, otherwise from disk
-     *
-     * If row is not cached, we figure out what filter is "biggest", read that from disk, then
-     * filter the result and either cache that or return it.
-     *
-     * @param cfId the column family to read the row from
-     * @param filter the columns being queried.
-     * @return the requested data for the filter provided
-     */
-    private ColumnFamily getThroughCache(UUID cfId, QueryFilter filter)
-    {
-        assert isRowCacheEnabled()
-               : String.format("Row cache is not enabled on table [" + name + "]");
-
-        RowCacheKey key = new RowCacheKey(cfId, filter.key);
-
-        // attempt a sentinel-read-cache sequence.  if a write invalidates our sentinel, we'll return our
-        // (now potentially obsolete) data, but won't cache it. see CASSANDRA-3862
-        // TODO: don't evict entire rows on writes (#2864)
-        IRowCacheEntry cached = CacheService.instance.rowCache.get(key);
-        if (cached != null)
-        {
-            if (cached instanceof RowCacheSentinel)
-            {
-                // Some other read is trying to cache the value, just do a normal non-caching read
-                Tracing.trace("Row cache miss (race)");
-                metric.rowCacheMiss.inc();
-                return getTopLevelColumns(filter, Integer.MIN_VALUE);
-            }
-
-            ColumnFamily cachedCf = (ColumnFamily)cached;
-            if (isFilterFullyCoveredBy(filter.filter, cachedCf, filter.timestamp))
-            {
-                metric.rowCacheHit.inc();
-                Tracing.trace("Row cache hit");
-                return filterColumnFamily(cachedCf, filter);
-            }
-
-            metric.rowCacheHitOutOfRange.inc();
-            Tracing.trace("Ignoring row cache as cached value could not satisfy query");
-            return getTopLevelColumns(filter, Integer.MIN_VALUE);
-        }
-
-        metric.rowCacheMiss.inc();
-        Tracing.trace("Row cache miss");
-        RowCacheSentinel sentinel = new RowCacheSentinel();
-        boolean sentinelSuccess = CacheService.instance.rowCache.putIfAbsent(key, sentinel);
-        ColumnFamily data = null;
-        ColumnFamily toCache = null;
-        try
-        {
-            // If we are explicitely asked to fill the cache with full partitions, we go ahead and query the whole thing
-            if (metadata.getCaching().rowCache.cacheFullPartitions())
-            {
-                data = getTopLevelColumns(QueryFilter.getIdentityFilter(filter.key, name, filter.timestamp), Integer.MIN_VALUE);
-                toCache = data;
-                Tracing.trace("Populating row cache with the whole partition");
-                if (sentinelSuccess && toCache != null)
-                    CacheService.instance.rowCache.replace(key, sentinel, toCache);
-                return filterColumnFamily(data, filter);
-            }
-
-            // Otherwise, if we want to cache the result of the query we're about to do, we must make sure this query
-            // covers what needs to be cached. And if the user filter does not satisfy that, we sometimes extend said
-            // filter so we can populate the cache but only if:
-            //   1) we can guarantee it is a strict extension, i.e. that we will still fetch the data asked by the user.
-            //   2) the extension does not make us query more than getRowsPerPartitionToCache() (as a mean to limit the
-            //      amount of extra work we'll do on a user query for the purpose of populating the cache).
-            //
-            // In practice, we can only guarantee those 2 points if the filter is one that queries the head of the
-            // partition (and if that filter actually counts CQL3 rows since that's what we cache and it would be
-            // bogus to compare the filter count to the 'rows to cache' otherwise).
-            if (filter.filter.isHeadFilter() && filter.filter.countCQL3Rows(metadata.comparator))
-            {
-                SliceQueryFilter sliceFilter = (SliceQueryFilter)filter.filter;
-                int rowsToCache = metadata.getCaching().rowCache.rowsToCache;
-
-                SliceQueryFilter cacheSlice = readFilterForCache();
-                QueryFilter cacheFilter = new QueryFilter(filter.key, name, cacheSlice, filter.timestamp);
-
-                // If the filter count is less than the number of rows cached, we simply extend it to make sure we do cover the
-                // number of rows to cache, and if that count is greater than the number of rows to cache, we simply filter what
-                // needs to be cached afterwards.
-                if (sliceFilter.count < rowsToCache)
-                {
-                    toCache = getTopLevelColumns(cacheFilter, Integer.MIN_VALUE);
-                    if (toCache != null)
-                    {
-                        Tracing.trace("Populating row cache ({} rows cached)", cacheSlice.lastCounted());
-                        data = filterColumnFamily(toCache, filter);
-                    }
-                }
-                else
-                {
-                    data = getTopLevelColumns(filter, Integer.MIN_VALUE);
-                    if (data != null)
-                    {
-                        // The filter limit was greater than the number of rows to cache. But, if the filter had a non-empty
-                        // finish bound, we may have gotten less than what needs to be cached, in which case we shouldn't cache it
-                        // (otherwise a cache hit would assume the whole partition is cached which is not the case).
-                        if (sliceFilter.finish().isEmpty() || sliceFilter.lastCounted() >= rowsToCache)
-                        {
-                            toCache = filterColumnFamily(data, cacheFilter);
-                            Tracing.trace("Caching {} rows (out of {} requested)", cacheSlice.lastCounted(), sliceFilter.count);
-                        }
-                        else
-                        {
-                            Tracing.trace("Not populating row cache, not enough rows fetched ({} fetched but {} required for the cache)", sliceFilter.lastCounted(), rowsToCache);
-                        }
-                    }
-                }
-
-                if (sentinelSuccess && toCache != null)
-                    CacheService.instance.rowCache.replace(key, sentinel, toCache);
-                return data;
-            }
-            else
-            {
-                Tracing.trace("Fetching data but not populating cache as query does not query from the start of the partition");
-                return getTopLevelColumns(filter, Integer.MIN_VALUE);
-            }
-        }
-        finally
-        {
-            if (sentinelSuccess && toCache == null)
-                invalidateCachedRow(key);
-        }
-    }
-
-    public SliceQueryFilter readFilterForCache()
-    {
-        // We create a new filter everytime before for now SliceQueryFilter is unfortunatly mutable.
-        return new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, false, metadata.getCaching().rowCache.rowsToCache, metadata.clusteringColumns().size());
-    }
-
-    public boolean isFilterFullyCoveredBy(IDiskAtomFilter filter, ColumnFamily cachedCf, long now)
+    public boolean isFilterFullyCoveredBy(ClusteringIndexFilter filter, DataLimits limits, CachedPartition cached, int nowInSec)
     {
         // We can use the cached value only if we know that no data it doesn't contain could be covered
         // by the query filter, that is if:
         //   1) either the whole partition is cached
-        //   2) or we can ensure than any data the filter selects are in the cached partition
-
-        // When counting rows to decide if the whole row is cached, we should be careful with expiring
-        // columns: if we use a timestamp newer than the one that was used when populating the cache, we might
-        // end up deciding the whole partition is cached when it's really not (just some rows expired since the
-        // cf was cached). This is the reason for Integer.MIN_VALUE below.
-        boolean wholePartitionCached = cachedCf.liveCQL3RowCount(Integer.MIN_VALUE) < metadata.getCaching().rowCache.rowsToCache;
-
-        // Contrarily to the "wholePartitionCached" check above, we do want isFullyCoveredBy to take the
-        // timestamp of the query into account when dealing with expired columns. Otherwise, we could think
-        // the cached partition has enough live rows to satisfy the filter when it doesn't because some
-        // are now expired.
-        return wholePartitionCached || filter.isFullyCoveredBy(cachedCf, now);
-    }
-
-    public int gcBefore(long now)
-    {
-        return (int) (now / 1000) - metadata.getGcGraceSeconds();
-    }
-
-    /**
-     * get a list of columns starting from a given column, in a specified order.
-     * only the latest version of a column is returned.
-     * @return null if there is no data and no tombstones; otherwise a ColumnFamily
-     */
-    public ColumnFamily getColumnFamily(QueryFilter filter)
-    {
-        assert name.equals(filter.getColumnFamilyName()) : filter.getColumnFamilyName();
-
-        ColumnFamily result = null;
-
-        long start = System.nanoTime();
-        try
-        {
-            int gcBefore = gcBefore(filter.timestamp);
-            if (isRowCacheEnabled())
-            {
-                assert !isIndex(); // CASSANDRA-5732
-                UUID cfId = metadata.cfId;
-
-                ColumnFamily cached = getThroughCache(cfId, filter);
-                if (cached == null)
-                {
-                    logger.trace("cached row is empty");
-                    return null;
-                }
-
-                result = cached;
-            }
-            else
-            {
-                ColumnFamily cf = getTopLevelColumns(filter, gcBefore);
+        //   2) or we can ensure than any data the filter selects is in the cached partition
 
-                if (cf == null)
-                    return null;
-
-                result = removeDeletedCF(cf, gcBefore);
-            }
-
-            removeDroppedColumns(result);
-
-            if (filter.filter instanceof SliceQueryFilter)
-            {
-                // Log the number of tombstones scanned on single key queries
-                metric.tombstoneScannedHistogram.update(((SliceQueryFilter) filter.filter).lastTombstones());
-                metric.liveScannedHistogram.update(((SliceQueryFilter) filter.filter).lastLive());
-            }
-        }
-        finally
-        {
-            metric.readLatency.addNano(System.nanoTime() - start);
-        }
+        // We can guarantee that a partition is fully cached if the number of rows it contains is less than
+        // what we're caching. Wen doing that, we should be careful about expiring cells: we should count
+        // something expired that wasn't when the partition was cached, or we could decide that the whole
+        // partition is cached when it's not. This is why we use CachedPartition#cachedLiveRows.
+        if (cached.cachedLiveRows() < metadata.getCaching().rowCache.rowsToCache)
+            return true;
 
-        return result;
+        // If the whole partition isn't cached, then we must guarantee that the filter cannot select data that
+        // is not in the cache. We can guarantee that if either the filter is a "head filter" and the cached
+        // partition has more live rows that queried (where live rows refers to the rows that are live now),
+        // or if we can prove that everything the filter selects is in the cached partition based on its content.
+        return (filter.isHeadFilter() && limits.hasEnoughLiveData(cached, nowInSec)) || filter.isFullyCoveredBy(cached);
     }
 
-    /**
-     *  Filter a cached row, which will not be modified by the filter, but may be modified by throwing out
-     *  tombstones that are no longer relevant.
-     *  The returned column family won't be thread safe.
-     */
-    ColumnFamily filterColumnFamily(ColumnFamily cached, QueryFilter filter)
+    public int gcBefore(int nowInSec)
     {
-        if (cached == null)
-            return null;
-
-        ColumnFamily cf = cached.cloneMeShallow(ArrayBackedSortedColumns.factory, filter.filter.isReversed());
-        int gcBefore = gcBefore(filter.timestamp);
-        filter.collateOnDiskAtom(cf, filter.getIterator(cached), gcBefore);
-        return removeDeletedCF(cf, gcBefore);
+        return nowInSec - metadata.getGcGraceSeconds();
     }
 
     public Set<SSTableReader> getUnrepairedSSTables()
@@ -1881,7 +1564,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      * @return a ViewFragment containing the sstables and memtables that may need to be merged
      * for rows within @param rowBounds, inclusive, according to the interval tree.
      */
-    public Function<View, List<SSTableReader>> viewFilter(final AbstractBounds<RowPosition> rowBounds)
+    public Function<View, List<SSTableReader>> viewFilter(final AbstractBounds<PartitionPosition> rowBounds)
     {
         return new Function<View, List<SSTableReader>>()
         {
@@ -1896,14 +1579,14 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      * @return a ViewFragment containing the sstables and memtables that may need to be merged
      * for rows for all of @param rowBoundsCollection, inclusive, according to the interval tree.
      */
-    public Function<View, List<SSTableReader>> viewFilter(final Collection<AbstractBounds<RowPosition>> rowBoundsCollection, final boolean includeRepaired)
+    public Function<View, List<SSTableReader>> viewFilter(final Collection<AbstractBounds<PartitionPosition>> rowBoundsCollection, final boolean includeRepaired)
     {
         return new Function<View, List<SSTableReader>>()
         {
             public List<SSTableReader> apply(View view)
             {
                 Set<SSTableReader> sstables = Sets.newHashSet();
-                for (AbstractBounds<RowPosition> rowBounds : rowBoundsCollection)
+                for (AbstractBounds<PartitionPosition> rowBounds : rowBoundsCollection)
                 {
                     for (SSTableReader sstable : view.sstablesInBounds(rowBounds))
                     {
@@ -1934,20 +1617,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
     }
 
-    public ColumnFamily getTopLevelColumns(QueryFilter filter, int gcBefore)
-    {
-        Tracing.trace("Executing single-partition query on {}", name);
-        CollationController controller = new CollationController(this, filter, gcBefore);
-        ColumnFamily columns;
-        try (OpOrder.Group op = readOrdering.start())
-        {
-            columns = controller.getTopLevelColumns(Memtable.MEMORY_POOL.needToCopyOnHeap());
-        }
-        if (columns != null)
-            metric.samplers.get(Sampler.READS).addSample(filter.key.getKey(), filter.key.hashCode(), 1);
-        metric.updateSSTableIterated(controller.getSstablesIterated());
-        return columns;
-    }
 
     public void beginLocalSampling(String sampler, int capacity)
     {
@@ -1982,7 +1651,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             RowCacheKey key = keyIter.next();
             DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.key));
             if (key.cfId.equals(metadata.cfId) && !Range.isInRanges(dk.getToken(), ranges))
-                invalidateCachedRow(dk);
+                invalidateCachedPartition(dk);
         }
 
         if (metadata.isCounter())
@@ -1998,247 +1667,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
     }
 
-    public static abstract class AbstractScanIterator extends AbstractIterator<Row> implements CloseableIterator<Row>
-    {
-        public boolean needsFiltering()
-        {
-            return true;
-        }
-    }
-
-    /**
-      * Iterate over a range of rows and columns from memtables/sstables.
-      *
-      * @param range The range of keys and columns within those keys to fetch
-     */
-    @SuppressWarnings("resource")
-    private AbstractScanIterator getSequentialIterator(final DataRange range, long now)
-    {
-        assert !(range.keyRange() instanceof Range) || !((Range<?>)range.keyRange()).isWrapAround() || range.keyRange().right.isMinimum() : range.keyRange();
-
-        final ViewFragment view = select(viewFilter(range.keyRange()));
-        Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), range.keyRange().getString(metadata.getKeyValidator()));
-
-        final CloseableIterator<Row> iterator = RowIteratorFactory.getIterator(view.memtables, view.sstables, range, this, now);
-
-        // todo this could be pushed into SSTableScanner
-        return new AbstractScanIterator()
-        {
-            protected Row computeNext()
-            {
-                while (true)
-                {
-                    // pull a row out of the iterator
-                    if (!iterator.hasNext())
-                        return endOfData();
-
-                    Row current = iterator.next();
-                    DecoratedKey key = current.key;
-
-                    if (!range.stopKey().isMinimum() && range.stopKey().compareTo(key) < 0)
-                        return endOfData();
-
-                    // skipping outside of assigned range
-                    if (!range.contains(key))
-                        continue;
-
-                    if (logger.isTraceEnabled())
-                        logger.trace("scanned {}", metadata.getKeyValidator().getString(key.getKey()));
-
-                    return current;
-                }
-            }
-
-            public void close() throws IOException
-            {
-                iterator.close();
-            }
-        };
-    }
-
-    @VisibleForTesting
-    public List<Row> getRangeSlice(final AbstractBounds<RowPosition> range,
-                                   List<IndexExpression> rowFilter,
-                                   IDiskAtomFilter columnFilter,
-                                   int maxResults)
-    {
-        return getRangeSlice(range, rowFilter, columnFilter, maxResults, System.currentTimeMillis());
-    }
-
-    public List<Row> getRangeSlice(final AbstractBounds<RowPosition> range,
-                                   List<IndexExpression> rowFilter,
-                                   IDiskAtomFilter columnFilter,
-                                   int maxResults,
-                                   long now)
-    {
-        return getRangeSlice(makeExtendedFilter(range, columnFilter, rowFilter, maxResults, false, false, now));
-    }
-
-    /**
-     * Allows generic range paging with the slice column filter.
-     * Typically, suppose we have rows A, B, C ... Z having each some columns in [1, 100].
-     * And suppose we want to page through the query that for all rows returns the columns
-     * within [25, 75]. For that, we need to be able to do a range slice starting at (row r, column c)
-     * and ending at (row Z, column 75), *but* that only return columns in [25, 75].
-     * That is what this method allows. The columnRange is the "window" of  columns we are interested
-     * in each row, and columnStart (resp. columnEnd) is the start (resp. end) for the first
-     * (resp. last) requested row.
-     */
-    public ExtendedFilter makeExtendedFilter(AbstractBounds<RowPosition> keyRange,
-                                             SliceQueryFilter columnRange,
-                                             Composite columnStart,
-                                             Composite columnStop,
-                                             List<IndexExpression> rowFilter,
-                                             int maxResults,
-                                             boolean countCQL3Rows,
-                                             long now)
-    {
-        DataRange dataRange = new DataRange.Paging(keyRange, columnRange, columnStart, columnStop, metadata);
-        return ExtendedFilter.create(this, dataRange, rowFilter, maxResults, countCQL3Rows, now);
-    }
-
-    public List<Row> getRangeSlice(AbstractBounds<RowPosition> range,
-                                   List<IndexExpression> rowFilter,
-                                   IDiskAtomFilter columnFilter,
-                                   int maxResults,
-                                   long now,
-                                   boolean countCQL3Rows,
-                                   boolean isPaging)
-    {
-        return getRangeSlice(makeExtendedFilter(range, columnFilter, rowFilter, maxResults, countCQL3Rows, isPaging, now));
-    }
-
-    public ExtendedFilter makeExtendedFilter(AbstractBounds<RowPosition> range,
-                                             IDiskAtomFilter columnFilter,
-                                             List<IndexExpression> rowFilter,
-                                             int maxResults,
-                                             boolean countCQL3Rows,
-                                             boolean isPaging,
-                                             long timestamp)
-    {
-        DataRange dataRange;
-        if (isPaging)
-        {
-            assert columnFilter instanceof SliceQueryFilter;
-            SliceQueryFilter sfilter = (SliceQueryFilter)columnFilter;
-            assert sfilter.slices.length == 1;
-            // create a new SliceQueryFilter that selects all cells, but pass the original slice start and finish
-            // through to DataRange.Paging to be used on the first and last partitions
-            SliceQueryFilter newFilter = new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, sfilter.isReversed(), sfilter.count);
-            dataRange = new DataRange.Paging(range, newFilter, sfilter.start(), sfilter.finish(), metadata);
-        }
-        else
-        {
-            dataRange = new DataRange(range, columnFilter);
-        }
-        return ExtendedFilter.create(this, dataRange, rowFilter, maxResults, countCQL3Rows, timestamp);
-    }
-
-    public List<Row> getRangeSlice(ExtendedFilter filter)
-    {
-        long start = System.nanoTime();
-        try (OpOrder.Group op = readOrdering.start())
-        {
-            return filter(getSequentialIterator(filter.dataRange, filter.timestamp), filter);
-        }
-        finally
-        {
-            metric.rangeLatency.addNano(System.nanoTime() - start);
-        }
-    }
-
-    @VisibleForTesting
-    public List<Row> search(AbstractBounds<RowPosition> range,
-                            List<IndexExpression> clause,
-                            IDiskAtomFilter dataFilter,
-                            int maxResults)
-    {
-        return search(range, clause, dataFilter, maxResults, System.currentTimeMillis());
-    }
-
-    public List<Row> search(AbstractBounds<RowPosition> range,
-                            List<IndexExpression> clause,
-                            IDiskAtomFilter dataFilter,
-                            int maxResults,
-                            long now)
-    {
-        return search(makeExtendedFilter(range, dataFilter, clause, maxResults, false, false, now));
-    }
-
-    public List<Row> search(ExtendedFilter filter)
-    {
-        Tracing.trace("Executing indexed scan for {}", filter.dataRange.keyRange().getString(metadata.getKeyValidator()));
-        return indexManager.search(filter);
-    }
-
-    public List<Row> filter(AbstractScanIterator rowIterator, ExtendedFilter filter)
-    {
-        logger.trace("Filtering {} for rows matching {}", rowIterator, filter);
-        List<Row> rows = new ArrayList<Row>();
-        int columnsCount = 0;
-        int total = 0, matched = 0;
-        boolean ignoreTombstonedPartitions = filter.ignoreTombstonedPartitions();
-
-        try
-        {
-            while (rowIterator.hasNext() && matched < filter.maxRows() && columnsCount < filter.maxColumns())
-            {
-                // get the raw columns requested, and additional columns for the expressions if necessary
-                Row rawRow = rowIterator.next();
-                total++;
-                ColumnFamily data = rawRow.cf;
-
-                if (rowIterator.needsFiltering())
-                {
-                    IDiskAtomFilter extraFilter = filter.getExtraFilter(rawRow.key, data);
-                    if (extraFilter != null)
-                    {
-                        ColumnFamily cf = filter.cfs.getColumnFamily(new QueryFilter(rawRow.key, name, extraFilter, filter.timestamp));
-                        if (cf != null)
-                            data.addAll(cf);
-                    }
-
-                    removeDroppedColumns(data);
-
-                    if (!filter.isSatisfiedBy(rawRow.key, data, null, null))
-                        continue;
-
-                    logger.trace("{} satisfies all filter expressions", data);
-                    // cut the resultset back to what was requested, if necessary
-                    data = filter.prune(rawRow.key, data);
-                }
-                else
-                {
-                    removeDroppedColumns(data);
-                }
-
-                rows.add(new Row(rawRow.key, data));
-                if (!ignoreTombstonedPartitions || !data.hasOnlyTombstones(filter.timestamp))
-                    matched++;
-
-                if (data != null)
-                    columnsCount += filter.lastCounted(data);
-                // Update the underlying filter to avoid querying more columns per slice than necessary and to handle paging
-                filter.updateFilter(columnsCount);
-            }
-
-            return rows;
-        }
-        finally
-        {
-            try
-            {
-                rowIterator.close();
-                Tracing.trace("Scanned {} rows and matched {}", total, matched);
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException(e);
-            }
-        }
-    }
-
-    public CellNameType getComparator()
+    public ClusteringComparator getComparator()
     {
         return metadata.comparator;
     }
@@ -2388,20 +1817,20 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     }
 
     /**
-     * @return the cached row for @param key if it is already present in the cache.
-     * That is, unlike getThroughCache, it will not readAndCache the row if it is not present, nor
+     * @return the cached partition for @param key if it is already present in the cache.
+     * Not that this will not readAndCache the parition if it is not present, nor
      * are these calls counted in cache statistics.
      *
-     * Note that this WILL cause deserialization of a SerializingCache row, so if all you
-     * need to know is whether a row is present or not, use containsCachedRow instead.
+     * Note that this WILL cause deserialization of a SerializingCache partition, so if all you
+     * need to know is whether a partition is present or not, use containsCachedParition instead.
      */
-    public ColumnFamily getRawCachedRow(DecoratedKey key)
+    public CachedPartition getRawCachedPartition(DecoratedKey key)
     {
         if (!isRowCacheEnabled())
             return null;
 
         IRowCacheEntry cached = CacheService.instance.rowCache.getInternal(new RowCacheKey(metadata.cfId, key));
-        return cached == null || cached instanceof RowCacheSentinel ? null : (ColumnFamily)cached;
+        return cached == null || cached instanceof RowCacheSentinel ? null : (CachedPartition)cached;
     }
 
     private void invalidateCaches()
@@ -2415,37 +1844,37 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     /**
      * @return true if @param key is contained in the row cache
      */
-    public boolean containsCachedRow(DecoratedKey key)
+    public boolean containsCachedParition(DecoratedKey key)
     {
         return CacheService.instance.rowCache.getCapacity() != 0 && CacheService.instance.rowCache.containsKey(new RowCacheKey(metadata.cfId, key));
     }
 
-    public void invalidateCachedRow(RowCacheKey key)
+    public void invalidateCachedPartition(RowCacheKey key)
     {
         CacheService.instance.rowCache.remove(key);
     }
 
-    public void invalidateCachedRow(DecoratedKey key)
+    public void invalidateCachedPartition(DecoratedKey key)
     {
         UUID cfId = Schema.instance.getId(keyspace.getName(), this.name);
         if (cfId == null)
             return; // secondary index
 
-        invalidateCachedRow(new RowCacheKey(cfId, key));
+        invalidateCachedPartition(new RowCacheKey(cfId, key));
     }
 
-    public ClockAndCount getCachedCounter(ByteBuffer partitionKey, CellName cellName)
+    public ClockAndCount getCachedCounter(ByteBuffer partitionKey, Clustering clustering, ColumnDefinition column, CellPath path)
     {
         if (CacheService.instance.counterCache.getCapacity() == 0L) // counter cache disabled.
             return null;
-        return CacheService.instance.counterCache.get(CounterCacheKey.create(metadata.cfId, partitionKey, cellName));
+        return CacheService.instance.counterCache.get(CounterCacheKey.create(metadata.cfId, partitionKey, clustering, column, path));
     }
 
-    public void putCachedCounter(ByteBuffer partitionKey, CellName cellName, ClockAndCount clockAndCount)
+    public void putCachedCounter(ByteBuffer partitionKey, Clustering clustering, ColumnDefinition column, CellPath path, ClockAndCount clockAndCount)
     {
         if (CacheService.instance.counterCache.getCapacity() == 0L) // counter cache disabled.
             return;
-        CacheService.instance.counterCache.put(CounterCacheKey.create(metadata.cfId, partitionKey, cellName), clockAndCount);
+        CacheService.instance.counterCache.put(CounterCacheKey.create(metadata.cfId, partitionKey, clustering, column, path), clockAndCount);
     }
 
     public void forceMajorCompaction() throws InterruptedException, ExecutionException
@@ -2830,7 +2259,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return view.sstables.isEmpty() && view.getCurrentMemtable().getOperations() == 0 && view.liveMemtables.size() <= 1 && view.flushingMemtables.size() == 0;
     }
 
-    private boolean isRowCacheEnabled()
+    public boolean isRowCacheEnabled()
     {
         return metadata.getCaching().rowCache.isEnabled() && CacheService.instance.rowCache.getCapacity() > 0;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ColumnFamilyType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyType.java b/src/java/org/apache/cassandra/db/ColumnFamilyType.java
deleted file mode 100644
index 51e8b63..0000000
--- a/src/java/org/apache/cassandra/db/ColumnFamilyType.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-/**
- * column family type enum
- */
-public enum ColumnFamilyType
-{
-    Standard,
-    Super;
-
-    public static ColumnFamilyType create(String name)
-    {
-        try
-        {
-            // TODO thrift optional parameter in CfDef is leaking down here which it shouldn't
-            return name == null ? ColumnFamilyType.Standard : ColumnFamilyType.valueOf(name);
-        }
-        catch (IllegalArgumentException e)
-        {
-            return null;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index d9d6a9c..1a9b92d 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -18,15 +18,15 @@
 package org.apache.cassandra.db;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.*;
 
 import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.io.sstable.IndexHelper;
-import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.util.SequentialWriter;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class ColumnIndex
@@ -42,6 +42,14 @@ public class ColumnIndex
         this.columnsIndex = columnsIndex;
     }
 
+    public static ColumnIndex writeAndBuildIndex(UnfilteredRowIterator iterator, SequentialWriter output, SerializationHeader header, Version version) throws IOException
+    {
+        assert !iterator.isEmpty() && version.storeRows();
+
+        Builder builder = new Builder(iterator, output, header, version.correspondingMessagingVersion());
+        return builder.build();
+    }
+
     @VisibleForTesting
     public static ColumnIndex nothing()
     {
@@ -52,192 +60,114 @@ public class ColumnIndex
      * Help to create an index for a column family based on size of columns,
      * and write said columns to disk.
      */
-    public static class Builder
+    private static class Builder
     {
+        private final UnfilteredRowIterator iterator;
+        private final SequentialWriter writer;
+        private final SerializationHeader header;
+        private final int version;
+
         private final ColumnIndex result;
-        private final long indexOffset;
+        private final long initialPosition;
         private long startPosition = -1;
-        private long endPosition = 0;
-        private long blockSize;
-        private OnDiskAtom firstColumn;
-        private OnDiskAtom lastColumn;
-        private OnDiskAtom lastBlockClosing;
-        private final DataOutputPlus output;
-        private final RangeTombstone.Tracker tombstoneTracker;
-        private int atomCount;
-        private final ByteBuffer key;
-        private final DeletionInfo deletionInfo; // only used for serializing and calculating row header size
-
-        private final OnDiskAtom.Serializer atomSerializer;
-
-        public Builder(ColumnFamily cf,
-                       ByteBuffer key,
-                       DataOutputPlus output)
-        {
-            assert cf != null;
-            assert key != null;
-            assert output != null;
 
-            this.key = key;
-            deletionInfo = cf.deletionInfo();
-            this.indexOffset = rowHeaderSize(key, deletionInfo);
-            this.result = new ColumnIndex(new ArrayList<IndexHelper.IndexInfo>());
-            this.output = output;
-            this.tombstoneTracker = new RangeTombstone.Tracker(cf.getComparator());
-            this.atomSerializer = cf.getComparator().onDiskAtomSerializer();
-        }
+        private int written;
 
-        /**
-         * Returns the number of bytes between the beginning of the row and the
-         * first serialized column.
-         */
-        private static long rowHeaderSize(ByteBuffer key, DeletionInfo delInfo)
-        {
-            TypeSizes typeSizes = TypeSizes.NATIVE;
-            // TODO fix constantSize when changing the nativeconststs.
-            int keysize = key.remaining();
-            return typeSizes.sizeof((short) keysize) + keysize          // Row key
-                 + DeletionTime.serializer.serializedSize(delInfo.getTopLevelDeletion(), typeSizes);
-        }
+        private ClusteringPrefix firstClustering;
+        private final ReusableClusteringPrefix lastClustering;
+
+        private DeletionTime openMarker;
 
-        public RangeTombstone.Tracker tombstoneTracker()
+        public Builder(UnfilteredRowIterator iterator,
+                       SequentialWriter writer,
+                       SerializationHeader header,
+                       int version)
         {
-            return tombstoneTracker;
+            this.iterator = iterator;
+            this.writer = writer;
+            this.header = header;
+            this.version = version;
+
+            this.result = new ColumnIndex(new ArrayList<IndexHelper.IndexInfo>());
+            this.initialPosition = writer.getFilePointer();
+            this.lastClustering = new ReusableClusteringPrefix(iterator.metadata().clusteringColumns().size());
         }
 
-        public int writtenAtomCount()
+        private void writePartitionHeader(UnfilteredRowIterator iterator) throws IOException
         {
-            return atomCount + tombstoneTracker.writtenAtom();
+            ByteBufferUtil.writeWithShortLength(iterator.partitionKey().getKey(), writer.stream);
+            DeletionTime.serializer.serialize(iterator.partitionLevelDeletion(), writer.stream);
+            if (header.hasStatic())
+                UnfilteredSerializer.serializer.serialize(iterator.staticRow(), header, writer.stream, version);
         }
 
-        /**
-         * Serializes the index into in-memory structure with all required components
-         * such as Bloom Filter, index block size, IndexInfo list
-         *
-         * @param cf Column family to create index for
-         *
-         * @return information about index - it's Bloom Filter, block size and IndexInfo list
-         */
-        public ColumnIndex build(ColumnFamily cf) throws IOException
+        public ColumnIndex build() throws IOException
         {
-            // cf has disentangled the columns and range tombstones, we need to re-interleave them in comparator order
-            Comparator<Composite> comparator = cf.getComparator();
-            DeletionInfo.InOrderTester tester = cf.deletionInfo().inOrderTester();
-            Iterator<RangeTombstone> rangeIter = cf.deletionInfo().rangeIterator();
-            RangeTombstone tombstone = rangeIter.hasNext() ? rangeIter.next() : null;
-
-            for (Cell c : cf)
-            {
-                while (tombstone != null && comparator.compare(c.name(), tombstone.min) >= 0)
-                {
-                    // skip range tombstones that are shadowed by partition tombstones
-                    if (!cf.deletionInfo().getTopLevelDeletion().isDeleted(tombstone))
-                        add(tombstone);
-                    tombstone = rangeIter.hasNext() ? rangeIter.next() : null;
-                }
-
-                // We can skip any cell if it's shadowed by a tombstone already. This is a more
-                // general case than was handled by CASSANDRA-2589.
-                if (!tester.isDeleted(c))
-                    add(c);
-            }
-
-            while (tombstone != null)
-            {
-                add(tombstone);
-                tombstone = rangeIter.hasNext() ? rangeIter.next() : null;
-            }
-            ColumnIndex index = build();
+            writePartitionHeader(iterator);
 
-            maybeWriteEmptyRowHeader();
+            while (iterator.hasNext())
+                add(iterator.next());
 
-            return index;
+            return close();
         }
 
-        /**
-         * The important distinction wrt build() is that we may be building for a row that ends up
-         * being compacted away entirely, i.e., the input consists only of expired tombstones (or
-         * columns shadowed by expired tombstone).  Thus, it is the caller's responsibility
-         * to decide whether to write the header for an empty row.
-         */
-        public ColumnIndex buildForCompaction(Iterator<OnDiskAtom> columns) throws IOException
+        private long currentPosition()
         {
-            while (columns.hasNext())
-            {
-                OnDiskAtom c =  columns.next();
-                add(c);
-            }
+            return writer.getFilePointer() - initialPosition;
+        }
 
-            return build();
+        private void addIndexBlock()
+        {
+            IndexHelper.IndexInfo cIndexInfo = new IndexHelper.IndexInfo(firstClustering,
+                                                                         lastClustering.get().takeAlias(),
+                                                                         startPosition,
+                                                                         currentPosition() - startPosition,
+                                                                         openMarker);
+            result.columnsIndex.add(cIndexInfo);
+            firstClustering = null;
         }
 
-        public void add(OnDiskAtom column) throws IOException
+        private void add(Unfiltered unfiltered) throws IOException
         {
-            atomCount++;
+            lastClustering.copy(unfiltered.clustering());
+            boolean isMarker = unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER;
 
-            if (firstColumn == null)
+            if (firstClustering == null)
             {
-                firstColumn = column;
-                startPosition = endPosition;
-                // TODO: have that use the firstColumn as min + make sure we optimize that on read
-                endPosition += tombstoneTracker.writeOpenedMarker(firstColumn, output, atomSerializer);
-                blockSize = 0; // We don't count repeated tombstone marker in the block size, to avoid a situation
-                               // where we wouldn't make any progress because a block is filled by said marker
+                // Beginning of an index block. Remember the start and position
+                firstClustering = lastClustering.get().takeAlias();
+                startPosition = currentPosition();
             }
 
-            long size = atomSerializer.serializedSizeForSSTable(column);
-            endPosition += size;
-            blockSize += size;
+            UnfilteredSerializer.serializer.serialize(unfiltered, header, writer.stream, version);
+            ++written;
 
-            // if we hit the column index size that we have to index after, go ahead and index it.
-            if (blockSize >= DatabaseDescriptor.getColumnIndexSize())
+            if (isMarker)
             {
-                IndexHelper.IndexInfo cIndexInfo = new IndexHelper.IndexInfo(firstColumn.name(), column.name(), indexOffset + startPosition, endPosition - startPosition);
-                result.columnsIndex.add(cIndexInfo);
-                firstColumn = null;
-                lastBlockClosing = column;
+                RangeTombstoneMarker marker = (RangeTombstoneMarker) unfiltered;
+                openMarker = marker.isOpen(false) ? marker.openDeletionTime(false) : null;
             }
 
-            maybeWriteRowHeader();
-            atomSerializer.serializeForSSTable(column, output);
-
-            // TODO: Should deal with removing unneeded tombstones
-            tombstoneTracker.update(column, false);
-
-            lastColumn = column;
+            // if we hit the column index size that we have to index after, go ahead and index it.
+            if (currentPosition() - startPosition >= DatabaseDescriptor.getColumnIndexSize())
+                addIndexBlock();
         }
 
-        private void maybeWriteRowHeader() throws IOException
+        private ColumnIndex close() throws IOException
         {
-            if (lastColumn == null)
-            {
-                ByteBufferUtil.writeWithShortLength(key, output);
-                DeletionTime.serializer.serialize(deletionInfo.getTopLevelDeletion(), output);
-            }
-        }
+            UnfilteredSerializer.serializer.writeEndOfPartition(writer.stream);
 
-        public ColumnIndex build()
-        {
-            // all columns were GC'd after all
-            if (lastColumn == null)
+            // It's possible we add no rows, just a top level deletion
+            if (written == 0)
                 return ColumnIndex.EMPTY;
 
             // the last column may have fallen on an index boundary already.  if not, index it explicitly.
-            if (result.columnsIndex.isEmpty() || lastBlockClosing != lastColumn)
-            {
-                IndexHelper.IndexInfo cIndexInfo = new IndexHelper.IndexInfo(firstColumn.name(), lastColumn.name(), indexOffset + startPosition, endPosition - startPosition);
-                result.columnsIndex.add(cIndexInfo);
-            }
+            if (firstClustering != null)
+                addIndexBlock();
 
             // we should always have at least one computed index block, but we only write it out if there is more than that.
             assert result.columnsIndex.size() > 0;
             return result;
         }
-
-        public void maybeWriteEmptyRowHeader() throws IOException
-        {
-            if (!deletionInfo.isLive())
-                maybeWriteRowHeader();
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ColumnSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnSerializer.java b/src/java/org/apache/cassandra/db/ColumnSerializer.java
deleted file mode 100644
index 8e7026c..0000000
--- a/src/java/org/apache/cassandra/db/ColumnSerializer.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.io.ISerializer;
-import org.apache.cassandra.io.FSReadError;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-public class ColumnSerializer implements ISerializer<Cell>
-{
-    public final static int DELETION_MASK        = 0x01;
-    public final static int EXPIRATION_MASK      = 0x02;
-    public final static int COUNTER_MASK         = 0x04;
-    public final static int COUNTER_UPDATE_MASK  = 0x08;
-    public final static int RANGE_TOMBSTONE_MASK = 0x10;
-
-    /**
-     * Flag affecting deserialization behavior.
-     *  - LOCAL: for deserialization of local data (Expired columns are
-     *      converted to tombstones (to gain disk space)).
-     *  - FROM_REMOTE: for deserialization of data received from remote hosts
-     *      (Expired columns are converted to tombstone and counters have
-     *      their delta cleared)
-     *  - PRESERVE_SIZE: used when no transformation must be performed, i.e,
-     *      when we must ensure that deserializing and reserializing the
-     *      result yield the exact same bytes. Streaming uses this.
-     */
-    public static enum Flag
-    {
-        LOCAL, FROM_REMOTE, PRESERVE_SIZE;
-    }
-
-    private final CellNameType type;
-
-    public ColumnSerializer(CellNameType type)
-    {
-        this.type = type;
-    }
-
-    public void serialize(Cell cell, DataOutputPlus out) throws IOException
-    {
-        assert !cell.name().isEmpty();
-        type.cellSerializer().serialize(cell.name(), out);
-        try
-        {
-            out.writeByte(cell.serializationFlags());
-            if (cell instanceof CounterCell)
-            {
-                out.writeLong(((CounterCell) cell).timestampOfLastDelete());
-            }
-            else if (cell instanceof ExpiringCell)
-            {
-                out.writeInt(((ExpiringCell) cell).getTimeToLive());
-                out.writeInt(cell.getLocalDeletionTime());
-            }
-            out.writeLong(cell.timestamp());
-            ByteBufferUtil.writeWithLength(cell.value(), out);
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public Cell deserialize(DataInput in) throws IOException
-    {
-        return deserialize(in, Flag.LOCAL);
-    }
-
-    /*
-     * For counter columns, we must know when we deserialize them if what we
-     * deserialize comes from a remote host. If it does, then we must clear
-     * the delta.
-     */
-    public Cell deserialize(DataInput in, ColumnSerializer.Flag flag) throws IOException
-    {
-        return deserialize(in, flag, Integer.MIN_VALUE);
-    }
-
-    public Cell deserialize(DataInput in, ColumnSerializer.Flag flag, int expireBefore) throws IOException
-    {
-        CellName name = type.cellSerializer().deserialize(in);
-
-        int b = in.readUnsignedByte();
-        return deserializeColumnBody(in, name, b, flag, expireBefore);
-    }
-
-    Cell deserializeColumnBody(DataInput in, CellName name, int mask, ColumnSerializer.Flag flag, int expireBefore) throws IOException
-    {
-        if ((mask & COUNTER_MASK) != 0)
-        {
-            long timestampOfLastDelete = in.readLong();
-            long ts = in.readLong();
-            ByteBuffer value = ByteBufferUtil.readWithLength(in);
-            return BufferCounterCell.create(name, value, ts, timestampOfLastDelete, flag);
-        }
-        else if ((mask & EXPIRATION_MASK) != 0)
-        {
-            int ttl = in.readInt();
-            int expiration = in.readInt();
-            long ts = in.readLong();
-            ByteBuffer value = ByteBufferUtil.readWithLength(in);
-            return BufferExpiringCell.create(name, value, ts, ttl, expiration, expireBefore, flag);
-        }
-        else
-        {
-            long ts = in.readLong();
-            ByteBuffer value = ByteBufferUtil.readWithLength(in);
-            return (mask & COUNTER_UPDATE_MASK) != 0
-                   ? new BufferCounterUpdateCell(name, value, ts)
-                   : ((mask & DELETION_MASK) == 0
-                      ? new BufferCell(name, value, ts)
-                      : new BufferDeletedCell(name, value, ts));
-        }
-    }
-
-    void skipColumnBody(DataInput in, int mask) throws IOException
-    {
-        if ((mask & COUNTER_MASK) != 0)
-            FileUtils.skipBytesFully(in, 16);
-        else if ((mask & EXPIRATION_MASK) != 0)
-            FileUtils.skipBytesFully(in, 16);
-        else
-            FileUtils.skipBytesFully(in, 8);
-
-        int length = in.readInt();
-        FileUtils.skipBytesFully(in, length);
-    }
-
-    public long serializedSize(Cell cell, TypeSizes typeSizes)
-    {
-        return cell.serializedSize(type, typeSizes);
-    }
-
-    public static class CorruptColumnException extends IOException
-    {
-        public CorruptColumnException(String s)
-        {
-            super(s);
-        }
-
-        public static CorruptColumnException create(DataInput in, ByteBuffer name)
-        {
-            assert name.remaining() <= 0;
-            String format = "invalid column name length %d%s";
-            String details = "";
-            if (in instanceof FileDataInput)
-            {
-                FileDataInput fdis = (FileDataInput)in;
-                long remaining;
-                try
-                {
-                    remaining = fdis.bytesRemaining();
-                }
-                catch (IOException e)
-                {
-                    throw new FSReadError(e, fdis.getPath());
-                }
-                details = String.format(" (%s, %d bytes remaining)", fdis.getPath(), remaining);
-            }
-            return new CorruptColumnException(String.format(format, name.remaining(), details));
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/Columns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Columns.java b/src/java/org/apache/cassandra/db/Columns.java
new file mode 100644
index 0000000..83d39db
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/Columns.java
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.util.*;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterators;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ * An immutable and sorted list of (non-PK) columns for a given table.
+ * <p>
+ * Note that in practice, it will either store only static columns, or only regular ones. When
+ * we need both type of columns, we use a {@link PartitionColumns} object.
+ */
+public class Columns implements Iterable<ColumnDefinition>
+{
+    public static final Serializer serializer = new Serializer();
+    public static final Columns NONE = new Columns(new ColumnDefinition[0], 0);
+
+    public final ColumnDefinition[] columns;
+    public final int complexIdx; // Index of the first complex column
+
+    private Columns(ColumnDefinition[] columns, int complexIdx)
+    {
+        assert complexIdx <= columns.length;
+        this.columns = columns;
+        this.complexIdx = complexIdx;
+    }
+
+    private Columns(ColumnDefinition[] columns)
+    {
+        this(columns, findFirstComplexIdx(columns));
+    }
+
+    /**
+     * Creates a {@code Columns} holding only the one column provided.
+     *
+     * @param c the column for which to create a {@code Columns} object.
+     *
+     * @return the newly created {@code Columns} containing only {@code c}.
+     */
+    public static Columns of(ColumnDefinition c)
+    {
+        ColumnDefinition[] columns = new ColumnDefinition[]{ c };
+        return new Columns(columns, c.isComplex() ? 0 : 1);
+    }
+
+    /**
+     * Returns a new {@code Columns} object holing the same columns than the provided set.
+     *
+     * @param param s the set from which to create the new {@code Columns}.
+     *
+     * @return the newly created {@code Columns} containing the columns from {@code s}.
+     */
+    public static Columns from(Set<ColumnDefinition> s)
+    {
+        ColumnDefinition[] columns = s.toArray(new ColumnDefinition[s.size()]);
+        Arrays.sort(columns);
+        return new Columns(columns, findFirstComplexIdx(columns));
+    }
+
+    private static int findFirstComplexIdx(ColumnDefinition[] columns)
+    {
+        for (int i = 0; i < columns.length; i++)
+            if (columns[i].isComplex())
+                return i;
+        return columns.length;
+    }
+
+    /**
+     * Whether this columns is empty.
+     *
+     * @return whether this columns is empty.
+     */
+    public boolean isEmpty()
+    {
+        return columns.length == 0;
+    }
+
+    /**
+     * The number of simple columns in this object.
+     *
+     * @return the number of simple columns in this object.
+     */
+    public int simpleColumnCount()
+    {
+        return complexIdx;
+    }
+
+    /**
+     * The number of complex columns (non-frozen collections, udts, ...) in this object.
+     *
+     * @return the number of complex columns in this object.
+     */
+    public int complexColumnCount()
+    {
+        return columns.length - complexIdx;
+    }
+
+    /**
+     * The total number of columns in this object.
+     *
+     * @return the total number of columns in this object.
+     */
+    public int columnCount()
+    {
+        return columns.length;
+    }
+
+    /**
+     * Whether this objects contains simple columns.
+     *
+     * @return whether this objects contains simple columns.
+     */
+    public boolean hasSimple()
+    {
+        return complexIdx > 0;
+    }
+
+    /**
+     * Whether this objects contains complex columns.
+     *
+     * @return whether this objects contains complex columns.
+     */
+    public boolean hasComplex()
+    {
+        return complexIdx < columns.length;
+    }
+
+    /**
+     * Returns the ith simple column of this object.
+     *
+     * @param i the index for the simple column to fectch. This must
+     * satisfy {@code 0 <= i < simpleColumnCount()}.
+     *
+     * @return the {@code i}th simple column in this object.
+     */
+    public ColumnDefinition getSimple(int i)
+    {
+        return columns[i];
+    }
+
+    /**
+     * Returns the ith complex column of this object.
+     *
+     * @param i the index for the complex column to fectch. This must
+     * satisfy {@code 0 <= i < complexColumnCount()}.
+     *
+     * @return the {@code i}th complex column in this object.
+     */
+    public ColumnDefinition getComplex(int i)
+    {
+        return columns[complexIdx + i];
+    }
+
+    /**
+     * The index of the provided simple column in this object (if it contains
+     * the provided column).
+     *
+     * @param c the simple column for which to return the index of.
+     * @param from the index to start the search from.
+     *
+     * @return the index for simple column {@code c} if it is contains in this
+     * object (starting from index {@code from}), {@code -1} otherwise.
+     */
+    public int simpleIdx(ColumnDefinition c, int from)
+    {
+        assert !c.isComplex();
+        for (int i = from; i < complexIdx; i++)
+            // We know we only use "interned" ColumnIdentifier so == is ok.
+            if (columns[i].name == c.name)
+                return i;
+        return -1;
+    }
+
+    /**
+     * The index of the provided complex column in this object (if it contains
+     * the provided column).
+     *
+     * @param c the complex column for which to return the index of.
+     * @param from the index to start the search from.
+     *
+     * @return the index for complex column {@code c} if it is contains in this
+     * object (starting from index {@code from}), {@code -1} otherwise.
+     */
+    public int complexIdx(ColumnDefinition c, int from)
+    {
+        assert c.isComplex();
+        for (int i = complexIdx + from; i < columns.length; i++)
+            // We know we only use "interned" ColumnIdentifier so == is ok.
+            if (columns[i].name == c.name)
+                return i - complexIdx;
+        return -1;
+    }
+
+    /**
+     * Whether the provided column is contained by this object.
+     *
+     * @param c the column to check presence of.
+     *
+     * @return whether {@code c} is contained by this object.
+     */
+    public boolean contains(ColumnDefinition c)
+    {
+        return c.isComplex() ? complexIdx(c, 0) >= 0 : simpleIdx(c, 0) >= 0;
+    }
+
+    /**
+     * Whether or not there is some counter columns within those columns.
+     *
+     * @return whether or not there is some counter columns within those columns.
+     */
+    public boolean hasCounters()
+    {
+        for (int i = 0; i < complexIdx; i++)
+        {
+            if (columns[i].type.isCounter())
+                return true;
+        }
+
+        for (int i = complexIdx; i < columns.length; i++)
+        {
+            // We only support counter in maps because that's all we need for now (and we need it for the sake of thrift super columns of counter)
+            if (columns[i].type instanceof MapType && (((MapType)columns[i].type).valueComparator().isCounter()))
+                return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Returns the result of merging this {@code Columns} object with the
+     * provided one.
+     *
+     * @param other the other {@code Columns} to merge this object with.
+     *
+     * @return the result of merging/taking the union of {@code this} and
+     * {@code other}. The returned object may be one of the operand and that
+     * operand is a subset of the other operand.
+     */
+    public Columns mergeTo(Columns other)
+    {
+        if (this == other || other == NONE)
+            return this;
+        if (this == NONE)
+            return other;
+
+        int i = 0, j = 0;
+        int size = 0;
+        while (i < columns.length && j < other.columns.length)
+        {
+            ++size;
+            int cmp = columns[i].compareTo(other.columns[j]);
+            if (cmp == 0)
+            {
+                ++i;
+                ++j;
+            }
+            else if (cmp < 0)
+            {
+                ++i;
+            }
+            else
+            {
+                ++j;
+            }
+        }
+
+        // If every element was always counted on both array, we have the same
+        // arrays for the first min elements
+        if (i == size && j == size)
+        {
+            // We've exited because of either c1 or c2 (or both). The array that
+            // made us stop is thus a subset of the 2nd one, return that array.
+            return i == columns.length ? other : this;
+        }
+
+        size += i == columns.length ? other.columns.length - j : columns.length - i;
+        ColumnDefinition[] result = new ColumnDefinition[size];
+        i = 0;
+        j = 0;
+        for (int k = 0; k < size; k++)
+        {
+            int cmp = i >= columns.length ? 1
+                    : (j >= other.columns.length ? -1 : columns[i].compareTo(other.columns[j]));
+            if (cmp == 0)
+            {
+                result[k] = columns[i];
+                ++i;
+                ++j;
+            }
+            else if (cmp < 0)
+            {
+                result[k] = columns[i++];
+            }
+            else
+            {
+                result[k] = other.columns[j++];
+            }
+        }
+        return new Columns(result, findFirstComplexIdx(result));
+    }
+
+    /**
+     * Whether this object is a subset of the provided other {@code Columns object}.
+     *
+     * @param other the othere object to test for inclusion in this object.
+     *
+     * @return whether all the columns of {@code other} are contained by this object.
+     */
+    public boolean contains(Columns other)
+    {
+        if (other.columns.length > columns.length)
+            return false;
+
+        int j = 0;
+        int cmp = 0;
+        for (ColumnDefinition def : other.columns)
+        {
+            while (j < columns.length && (cmp = columns[j].compareTo(def)) < 0)
+                j++;
+
+            if (j >= columns.length || cmp > 0)
+                return false;
+
+            // cmp == 0, we've found the definition. Ce can bump j once more since
+            // we know we won't need to compare that element again
+            j++;
+        }
+        return true;
+    }
+
+    /**
+     * Iterator over the simple columns of this object.
+     *
+     * @return an iterator over the simple columns of this object.
+     */
+    public Iterator<ColumnDefinition> simpleColumns()
+    {
+        return new ColumnIterator(0, complexIdx);
+    }
+
+    /**
+     * Iterator over the complex columns of this object.
+     *
+     * @return an iterator over the complex columns of this object.
+     */
+    public Iterator<ColumnDefinition> complexColumns()
+    {
+        return new ColumnIterator(complexIdx, columns.length);
+    }
+
+    /**
+     * Iterator over all the columns of this object.
+     *
+     * @return an iterator over all the columns of this object.
+     */
+    public Iterator<ColumnDefinition> iterator()
+    {
+        return Iterators.forArray(columns);
+    }
+
+    /**
+     * An iterator that returns the columns of this object in "select" order (that
+     * is in global alphabetical order, where the "normal" iterator returns simple
+     * columns first and the complex second).
+     *
+     * @return an iterator returning columns in alphabetical order.
+     */
+    public Iterator<ColumnDefinition> selectOrderIterator()
+    {
+        // In wildcard selection, we want to return all columns in alphabetical order,
+        // irregarding of whether they are complex or not
+        return new AbstractIterator<ColumnDefinition>()
+        {
+            private int regular;
+            private int complex = complexIdx;
+
+            protected ColumnDefinition computeNext()
+            {
+                if (complex >= columns.length)
+                    return regular >= complexIdx ? endOfData() : columns[regular++];
+                if (regular >= complexIdx)
+                    return columns[complex++];
+
+                return ByteBufferUtil.compareUnsigned(columns[regular].name.bytes, columns[complex].name.bytes) < 0
+                     ? columns[regular++]
+                     : columns[complex++];
+            }
+        };
+    }
+
+    /**
+     * Returns the equivalent of those columns but with the provided column removed.
+     *
+     * @param column the column to remove.
+     *
+     * @return newly allocated columns containing all the columns of {@code this} expect
+     * for {@code column}.
+     */
+    public Columns without(ColumnDefinition column)
+    {
+        int idx = column.isComplex() ? complexIdx(column, 0) : simpleIdx(column, 0);
+        if (idx < 0)
+            return this;
+
+        int realIdx = column.isComplex() ? complexIdx + idx : idx;
+
+        ColumnDefinition[] newColumns = new ColumnDefinition[columns.length - 1];
+        System.arraycopy(columns, 0, newColumns, 0, realIdx);
+        System.arraycopy(columns, realIdx + 1, newColumns, realIdx, newColumns.length - realIdx);
+        return new Columns(newColumns);
+    }
+
+    public void digest(MessageDigest digest)
+    {
+        for (ColumnDefinition c : this)
+            digest.update(c.name.bytes.duplicate());
+    }
+
+    @Override
+    public boolean equals(Object other)
+    {
+        if (!(other instanceof Columns))
+            return false;
+
+        Columns that = (Columns)other;
+        return this.complexIdx == that.complexIdx && Arrays.equals(this.columns, that.columns);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(complexIdx, Arrays.hashCode(columns));
+    }
+
+    @Override
+    public String toString()
+    {
+        StringBuilder sb = new StringBuilder();
+        boolean first = true;
+        for (ColumnDefinition def : this)
+        {
+            if (first) first = false; else sb.append(" ");
+            sb.append(def.name);
+        }
+        return sb.toString();
+    }
+
+    private class ColumnIterator extends AbstractIterator<ColumnDefinition>
+    {
+        private final int to;
+        private int idx;
+
+        private ColumnIterator(int from, int to)
+        {
+            this.idx = from;
+            this.to = to;
+        }
+
+        protected ColumnDefinition computeNext()
+        {
+            if (idx >= to)
+                return endOfData();
+            return columns[idx++];
+        }
+    }
+
+    public static class Serializer
+    {
+        public void serialize(Columns columns, DataOutputPlus out) throws IOException
+        {
+            out.writeShort(columns.columnCount());
+            for (ColumnDefinition column : columns)
+                ByteBufferUtil.writeWithShortLength(column.name.bytes, out);
+        }
+
+        public long serializedSize(Columns columns, TypeSizes sizes)
+        {
+            long size = sizes.sizeof((short)columns.columnCount());
+            for (ColumnDefinition column : columns)
+                size += sizes.sizeofWithShortLength(column.name.bytes);
+            return size;
+        }
+
+        public Columns deserialize(DataInput in, CFMetaData metadata) throws IOException
+        {
+            int length = in.readUnsignedShort();
+            ColumnDefinition[] columns = new ColumnDefinition[length];
+            for (int i = 0; i < length; i++)
+            {
+                ByteBuffer name = ByteBufferUtil.readWithShortLength(in);
+                ColumnDefinition column = metadata.getColumnDefinition(name);
+                if (column == null)
+                {
+                    // If we don't find the definition, it could be we have data for a dropped column, and we shouldn't
+                    // fail deserialization because of that. So we grab a "fake" ColumnDefinition that ensure proper
+                    // deserialization. The column will be ignore later on anyway.
+                    column = metadata.getDroppedColumnDefinition(name);
+                    if (column == null)
+                        throw new RuntimeException("Unknown column " + UTF8Type.instance.getString(name) + " during deserialization");
+                }
+                columns[i] = column;
+            }
+            return new Columns(columns);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/CompactTables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CompactTables.java b/src/java/org/apache/cassandra/db/CompactTables.java
new file mode 100644
index 0000000..a72e7f2
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/CompactTables.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ * Small utility methods pertaining to the encoding of COMPACT STORAGE tables.
+ *
+ * COMPACT STORAGE tables exists mainly for the sake of encoding internally thrift tables (as well as
+ * exposing those tables through CQL). Note that due to these constraints, the internal representation
+ * of compact tables does *not* correspond exactly to their CQL definition.
+ *
+ * The internal layout of such tables is such that it can encode any thrift table. That layout is as follow:
+ *   CREATE TABLE compact (
+ *      key [key_validation_class],
+ *      [column_metadata_1] [type1] static,
+ *      ...,
+ *      [column_metadata_n] [type1] static,
+ *      column [comparator],
+ *      value [default_validation_class]
+ *      PRIMARY KEY (key, column)
+ *   )
+ * More specifically, the table:
+ *  - always has a clustering column and a regular value, which are used to store the "dynamic" thrift columns name and value.
+ *    Those are always present because we have no way to know in advance if "dynamic" columns will be inserted or not. Note
+ *    that when declared from CQL, compact tables may not have any clustering: in that case, we still have a clustering
+ *    defined internally, it is just ignored as far as interacting from CQL is concerned.
+ *  - have a static column for every "static" column defined in the thrift "column_metadata". Note that when declaring a compact
+ *    table from CQL without any clustering (but some non-PK columns), the columns ends up static internally even though they are
+ *    not in the declaration
+ *
+ * On variation is that if the table comparator is a CompositeType, then the underlying table will have one clustering column by
+ * element of the CompositeType, but the rest of the layout is as above.
+ *
+ * As far as thrift is concerned, one exception to this is super column families, which have a different layout. Namely, a super
+ * column families is encoded with:
+ *   CREATE TABLE super (
+ *      key [key_validation_class],
+ *      super_column_name [comparator],
+ *      [column_metadata_1] [type1],
+ *      ...,
+ *      [column_metadata_n] [type1],
+ *      "" map<[sub_comparator], [default_validation_class]>
+ *      PRIMARY KEY (key, super_column_name)
+ *   )
+ * In other words, every super column is encoded by a row. That row has one column for each defined "column_metadata", but it also
+ * has a special map column (whose name is the empty string as this is guaranteed to never conflict with a user-defined
+ * "column_metadata") which stores the super column "dynamic" sub-columns.
+ */
+public abstract class CompactTables
+{
+    // We use an empty value for the 1) this can't conflict with a user-defined column and 2) this actually
+    // validate with any comparator which makes it convenient for columnDefinitionComparator().
+    public static final ByteBuffer SUPER_COLUMN_MAP_COLUMN = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+    public static final String SUPER_COLUMN_MAP_COLUMN_STR = UTF8Type.instance.compose(SUPER_COLUMN_MAP_COLUMN);
+
+    private CompactTables() {}
+
+    public static ColumnDefinition getCompactValueColumn(PartitionColumns columns, boolean isSuper)
+    {
+        if (isSuper)
+        {
+            for (ColumnDefinition column : columns.regulars)
+                if (column.name.bytes.equals(SUPER_COLUMN_MAP_COLUMN))
+                    return column;
+            throw new AssertionError("Invalid super column table definition, no 'dynamic' map column");
+        }
+        assert columns.regulars.simpleColumnCount() == 1 && columns.regulars.complexColumnCount() == 0;
+        return columns.regulars.getSimple(0);
+    }
+
+    public static AbstractType<?> columnDefinitionComparator(ColumnDefinition.Kind kind, boolean isSuper, AbstractType<?> rawComparator, AbstractType<?> rawSubComparator)
+    {
+        if (isSuper)
+            return kind == ColumnDefinition.Kind.REGULAR ? rawSubComparator : UTF8Type.instance;
+        else
+            return kind == ColumnDefinition.Kind.STATIC ? rawComparator : UTF8Type.instance;
+    }
+
+    public static boolean hasEmptyCompactValue(CFMetaData metadata)
+    {
+        return metadata.compactValueColumn().type instanceof EmptyType;
+    }
+
+    public static boolean isSuperColumnMapColumn(ColumnDefinition column)
+    {
+        return column.kind == ColumnDefinition.Kind.REGULAR && column.name.bytes.equals(SUPER_COLUMN_MAP_COLUMN);
+    }
+
+    public static DefaultNames defaultNameGenerator(Set<String> usedNames)
+    {
+        return new DefaultNames(new HashSet<String>(usedNames));
+    }
+
+    public static DefaultNames defaultNameGenerator(Iterable<ColumnDefinition> defs)
+    {
+        Set<String> usedNames = new HashSet<>();
+        for (ColumnDefinition def : defs)
+            usedNames.add(def.name.toString());
+        return new DefaultNames(usedNames);
+    }
+
+    public static class DefaultNames
+    {
+        private static final String DEFAULT_PARTITION_KEY_NAME = "key";
+        private static final String DEFAULT_CLUSTERING_NAME = "column";
+        private static final String DEFAULT_COMPACT_VALUE_NAME = "value";
+
+        private final Set<String> usedNames;
+        private int partitionIndex = 0;
+        private int clusteringIndex = 1;
+        private int compactIndex = 0;
+
+        private DefaultNames(Set<String> usedNames)
+        {
+            this.usedNames = usedNames;
+        }
+
+        public String defaultPartitionKeyName()
+        {
+            while (true)
+            {
+                // For compatibility sake, we call the first alias 'key' rather than 'key1'. This
+                // is inconsistent with column alias, but it's probably not worth risking breaking compatibility now.
+                String candidate = partitionIndex == 0 ? DEFAULT_PARTITION_KEY_NAME : DEFAULT_PARTITION_KEY_NAME + (partitionIndex + 1);
+                ++partitionIndex;
+                if (usedNames.add(candidate))
+                    return candidate;
+            }
+        }
+
+        public String defaultClusteringName()
+        {
+            while (true)
+            {
+                String candidate = DEFAULT_CLUSTERING_NAME + clusteringIndex;
+                ++clusteringIndex;
+                if (usedNames.add(candidate))
+                    return candidate;
+            }
+        }
+
+        public String defaultCompactValueName()
+        {
+            while (true)
+            {
+                String candidate = compactIndex == 0 ? DEFAULT_COMPACT_VALUE_NAME : DEFAULT_COMPACT_VALUE_NAME + compactIndex;
+                ++compactIndex;
+                if (usedNames.add(candidate))
+                    return candidate;
+            }
+        }
+    }
+}