You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/01/22 01:18:15 UTC

[1/2] cassandra git commit: Add batch remove iterator to ABSC

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 0c2eaa9cb -> 06cd494c1


Add batch remove iterator to ABSC

patch by Jimmy Mårdell; reviewed by Richard Low for CASSANDRA-8414


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cc5fb19e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cc5fb19e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cc5fb19e

Branch: refs/heads/cassandra-2.1
Commit: cc5fb19e5c864110b798375f43ec1597904d03ab
Parents: ae380da
Author: Jimmy Mårdell <ya...@spotify.com>
Authored: Thu Jan 22 02:16:10 2015 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Thu Jan 22 02:16:10 2015 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/db/ArrayBackedSortedColumns.java  | 89 ++++++++++++++++++-
 .../org/apache/cassandra/db/ColumnFamily.java   | 32 +++++++
 .../apache/cassandra/db/ColumnFamilyStore.java  |  6 +-
 .../cassandra/utils/BatchRemoveIterator.java    | 32 +++++++
 .../db/ArrayBackedSortedColumnsTest.java        | 93 ++++++++++++++++++++
 6 files changed, 249 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc5fb19e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6604783..0d08cce 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 2.0.13:
  * Round up time deltas lower than 1ms in BulkLoader (CASSANDRA-8645)
+ * Add batch remove iterator to ABSC (CASSANDRA-8414)
 
 
 2.0.12:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc5fb19e/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
index 389e0f8..8d553be 100644
--- a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.filter.ColumnSlice;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.utils.Allocator;
+import org.apache.cassandra.utils.BatchRemoveIterator;
 
 /**
  * A ColumnFamily backed by an ArrayList.
@@ -54,14 +55,14 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
     {
         super(metadata);
         this.reversed = reversed;
-        this.columns = new ArrayList<Column>();
+        this.columns = new ArrayList<>();
     }
 
     private ArrayBackedSortedColumns(Collection<Column> columns, CFMetaData metadata, boolean reversed)
     {
         super(metadata);
         this.reversed = reversed;
-        this.columns = new ArrayList<Column>(columns);
+        this.columns = new ArrayList<>(columns);
     }
 
     public ColumnFamily.Factory getFactory()
@@ -292,6 +293,90 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
         return new SlicesIterator(columns, getComparator(), slices, !reversed);
     }
 
+    @Override
+    public BatchRemoveIterator<Column> batchRemoveIterator()
+    {
+        return new BatchRemoveIterator<Column>()
+        {
+            private Iterator<Column> iter = iterator();
+            private BitSet removedIndexes = new BitSet(columns.size());
+            private int idx = -1;
+            private boolean shouldCallNext = true;
+            private boolean isCommitted = false;
+            private boolean removedAnything = false;
+
+            public void commit()
+            {
+                if (isCommitted)
+                    throw new IllegalStateException();
+                isCommitted = true;
+
+                if (!removedAnything)
+                    return;
+
+                // the lowest index both not visited and known to be not removed
+                int keepIdx = removedIndexes.nextClearBit(0);
+                // the running total of kept items
+                int resultLength = 0;
+                // start from the first not-removed cell, and shift left.
+                int removeIdx = removedIndexes.nextSetBit(keepIdx + 1);
+                while (removeIdx >= 0)
+                {
+                    int length = removeIdx - keepIdx;
+                    if (length > 0)
+                    {
+                        copy(keepIdx, resultLength, length);
+                        resultLength += length;
+                    }
+                    keepIdx = removedIndexes.nextClearBit(removeIdx + 1);
+                    if (keepIdx < 0)
+                        keepIdx = columns.size();
+                    removeIdx = removedIndexes.nextSetBit(keepIdx + 1);
+                }
+                // Copy everything after the last deleted column
+                int length = columns.size() - keepIdx;
+                if (length > 0)
+                {
+                    copy(keepIdx, resultLength, length);
+                    resultLength += length;
+                }
+
+                columns.subList(resultLength, columns.size()).clear();
+            }
+
+            private void copy(int src, int dst, int len)
+            {
+                // [src, src+len) and [dst, dst+len) might overlap, but it's okay because we're going from left to right
+                assert dst <= src : "dst must not be greater than src";
+
+                if (dst < src)
+                    Collections.copy(columns.subList(dst, dst + len), columns.subList(src, src + len));
+            }
+
+            public boolean hasNext()
+            {
+                return iter.hasNext();
+            }
+
+            public Column next()
+            {
+                idx++;
+                shouldCallNext = false;
+                return iter.next();
+            }
+
+            public void remove()
+            {
+                if (shouldCallNext)
+                    throw new IllegalStateException();
+
+                removedIndexes.set(reversed ? columns.size() - idx - 1 : idx);
+                removedAnything = true;
+                shouldCallNext = true;
+            }
+        };
+    }
+
     private static class SlicesIterator extends AbstractIterator<Column>
     {
         private final List<Column> list;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc5fb19e/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java
index 7edf825..19f8c16 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -532,6 +532,38 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
         return ByteBuffer.wrap(out.getData(), 0, out.getLength());
     }
 
+
+    /**
+     * @return an iterator where the removes are carried out once everything has been iterated
+     */
+    public BatchRemoveIterator<Column> batchRemoveIterator()
+    {
+        // Default implementation is the ordinary iterator
+        return new BatchRemoveIterator<Column>()
+        {
+            private final Iterator<Column> iter = iterator();
+
+            public void commit()
+            {
+            }
+
+            public boolean hasNext()
+            {
+                return iter.hasNext();
+            }
+
+            public Column next()
+            {
+                return iter.next();
+            }
+
+            public void remove()
+            {
+                iter.remove();
+            }
+        };
+    }
+
     public abstract static class Factory <T extends ColumnFamily>
     {
         /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc5fb19e/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index e936473..34d3f1d 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -953,7 +953,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      */
     public static long removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer)
     {
-        Iterator<Column> iter = cf.iterator();
+        BatchRemoveIterator<Column> iter = cf.batchRemoveIterator();
         DeletionInfo.InOrderTester tester = cf.inOrderDeletionTester();
         boolean hasDroppedColumns = !cf.metadata.getDroppedColumns().isEmpty();
         long removedBytes = 0;
@@ -971,6 +971,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                 removedBytes += c.dataSize();
             }
         }
+        iter.commit();
         return removedBytes;
     }
 
@@ -993,10 +994,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         if (cf == null || cf.metadata.getDroppedColumns().isEmpty())
             return;
 
-        Iterator<Column> iter = cf.iterator();
+        BatchRemoveIterator<Column> iter = cf.batchRemoveIterator();
         while (iter.hasNext())
             if (isDroppedColumn(iter.next(), metadata))
                 iter.remove();
+        iter.commit();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc5fb19e/src/java/org/apache/cassandra/utils/BatchRemoveIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/BatchRemoveIterator.java b/src/java/org/apache/cassandra/utils/BatchRemoveIterator.java
new file mode 100644
index 0000000..4377426
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/BatchRemoveIterator.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.util.Iterator;
+
+/**
+ * Iterator that allows us to more efficiently remove many items
+ */
+public interface BatchRemoveIterator<T> extends Iterator<T>
+{
+    /**
+     * Commits the remove operations in this batch iterator. After this no more
+     * deletes can be made. Any further calls to remove() or commit() will throw IllegalStateException.
+     */
+    void commit();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc5fb19e/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java b/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
index 06e2e75..90cd70f 100644
--- a/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
+++ b/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
@@ -28,10 +28,12 @@ import org.junit.Test;
 import static org.junit.Assert.*;
 
 import com.google.common.base.Functions;
+import com.google.common.collect.Sets;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.utils.BatchRemoveIterator;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.db.filter.ColumnSlice;
 import org.apache.cassandra.utils.HeapAllocator;
@@ -193,4 +195,95 @@ public class ArrayBackedSortedColumnsTest extends SchemaLoader
         iter.remove();
         assertTrue(!iter.hasNext());
     }
+
+    @Test(expected = IllegalStateException.class)
+    public void testBatchRemoveTwice()
+    {
+        ColumnFamily map = ArrayBackedSortedColumns.factory.create(metadata(), false);
+        map.addColumn(new Column(ByteBufferUtil.bytes(1)), HeapAllocator.instance);
+        map.addColumn(new Column(ByteBufferUtil.bytes(2)), HeapAllocator.instance);
+
+        BatchRemoveIterator<Column> batchIter = map.batchRemoveIterator();
+        batchIter.next();
+        batchIter.remove();
+        batchIter.remove();
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testBatchCommitTwice()
+    {
+        ColumnFamily map = ArrayBackedSortedColumns.factory.create(metadata(), false);
+        map.addColumn(new Column(ByteBufferUtil.bytes(1)), HeapAllocator.instance);
+        map.addColumn(new Column(ByteBufferUtil.bytes(2)), HeapAllocator.instance);
+
+        BatchRemoveIterator<Column> batchIter = map.batchRemoveIterator();
+        batchIter.next();
+        batchIter.remove();
+        batchIter.commit();
+        batchIter.commit();
+    }
+
+    @Test
+    public void testBatchRemove()
+    {
+        testBatchRemoveInternal(false);
+        testBatchRemoveInternal(true);
+    }
+
+    public void testBatchRemoveInternal(boolean reversed)
+    {
+        ColumnFamily map = ArrayBackedSortedColumns.factory.create(metadata(), reversed);
+        int[] values = new int[]{ 1, 2, 3, 5 };
+
+        for (int i = 0; i < values.length; ++i)
+            map.addColumn(new Column(ByteBufferUtil.bytes(values[reversed ? values.length - 1 - i : i])), HeapAllocator.instance);
+
+        BatchRemoveIterator<Column> batchIter = map.batchRemoveIterator();
+        batchIter.next();
+        batchIter.remove();
+        batchIter.next();
+        batchIter.remove();
+
+        assertEquals("1st column before commit", 1, map.iterator().next().name().getInt(0));
+
+        batchIter.commit();
+
+        assertEquals("1st column after commit", 3, map.iterator().next().name().getInt(0));
+    }
+
+    @Test
+    public void testBatchRemoveCopy()
+    {
+        // Test delete some random columns and check the result
+        ColumnFamily map = ArrayBackedSortedColumns.factory.create(metadata(), false);
+        int n = 127;
+        int[] values = new int[n];
+        for (int i = 0; i < n; i++) values[i] = i;
+        Set<Integer> toRemove = Sets.newHashSet(3, 12, 13, 15, 58, 103, 112);
+
+        for (int value : values)
+            map.addColumn(new Column(ByteBufferUtil.bytes(value)), HeapAllocator.instance);
+
+        BatchRemoveIterator<Column> batchIter = map.batchRemoveIterator();
+        while (batchIter.hasNext())
+            if (toRemove.contains(batchIter.next().name().getInt(0)))
+                batchIter.remove();
+
+        batchIter.commit();
+
+        int expected = 0;
+
+        while (toRemove.contains(expected))
+            expected++;
+
+        for (Column column : map)
+        {
+            assertEquals(expected, column.name().getInt(0));
+            expected++;
+            while (toRemove.contains(expected))
+                expected++;
+        }
+
+        assertEquals(expected, n);
+    }
 }


[2/2] cassandra git commit: Merge branch 'cassandra-2.0' into cassandra-2.1

Posted by al...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1

Conflicts:
	CHANGES.txt
	src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
	src/java/org/apache/cassandra/db/ColumnFamilyStore.java
	test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/06cd494c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/06cd494c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/06cd494c

Branch: refs/heads/cassandra-2.1
Commit: 06cd494c1496ec96886ed41ff3207847631986c9
Parents: 0c2eaa9 cc5fb19
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Thu Jan 22 03:17:55 2015 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Thu Jan 22 03:17:55 2015 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/db/ArrayBackedSortedColumns.java  | 96 +++++++++++++++++--
 .../apache/cassandra/db/AtomicBTreeColumns.java |  5 +
 .../org/apache/cassandra/db/ColumnFamily.java   |  6 ++
 .../apache/cassandra/db/ColumnFamilyStore.java  |  7 +-
 .../cassandra/utils/BatchRemoveIterator.java    | 32 +++++++
 .../db/ArrayBackedSortedColumnsTest.java        | 99 +++++++++++++++++++-
 7 files changed, 236 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/06cd494c/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 9cd8189,0d08cce..a94ca04
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,70 -1,9 +1,71 @@@
 -2.0.13:
 - * Round up time deltas lower than 1ms in BulkLoader (CASSANDRA-8645)
 +2.1.3
 + * Duplicate rows returned when in clause has repeated values (CASSANDRA-6707)
 + * Add tooling to detect hot partitions (CASSANDRA-7974)
 + * Fix cassandra-stress user-mode truncation of partition generation (CASSANDRA-8608)
 + * Only stream from unrepaired sstables during inc repair (CASSANDRA-8267)
 + * Don't allow starting multiple inc repairs on the same sstables (CASSANDRA-8316)
 + * Invalidate prepared BATCH statements when related tables
 +   or keyspaces are dropped (CASSANDRA-8652)
 + * Fix missing results in secondary index queries on collections
 +   with ALLOW FILTERING (CASSANDRA-8421)
 + * Expose EstimatedHistogram metrics for range slices (CASSANDRA-8627)
 + * (cqlsh) Escape clqshrc passwords properly (CASSANDRA-8618)
 + * Fix NPE when passing wrong argument in ALTER TABLE statement (CASSANDRA-8355)
 + * Pig: Refactor and deprecate CqlStorage (CASSANDRA-8599)
 + * Don't reuse the same cleanup strategy for all sstables (CASSANDRA-8537)
 + * Fix case-sensitivity of index name on CREATE and DROP INDEX
 +   statements (CASSANDRA-8365)
 + * Better detection/logging for corruption in compressed sstables (CASSANDRA-8192)
 + * Use the correct repairedAt value when closing writer (CASSANDRA-8570)
 + * (cqlsh) Handle a schema mismatch being detected on startup (CASSANDRA-8512)
 + * Properly calculate expected write size during compaction (CASSANDRA-8532)
 + * Invalidate affected prepared statements when a table's columns
 +   are altered (CASSANDRA-7910)
 + * Stress - user defined writes should populate sequentally (CASSANDRA-8524)
 + * Fix regression in SSTableRewriter causing some rows to become unreadable 
 +   during compaction (CASSANDRA-8429)
 + * Run major compactions for repaired/unrepaired in parallel (CASSANDRA-8510)
 + * (cqlsh) Fix compression options in DESCRIBE TABLE output when compression
 +   is disabled (CASSANDRA-8288)
 + * (cqlsh) Fix DESCRIBE output after keyspaces are altered (CASSANDRA-7623)
 + * Make sure we set lastCompactedKey correctly (CASSANDRA-8463)
 + * (cqlsh) Fix output of CONSISTENCY command (CASSANDRA-8507)
 + * (cqlsh) Fixed the handling of LIST statements (CASSANDRA-8370)
 + * Make sstablescrub check leveled manifest again (CASSANDRA-8432)
 + * Check first/last keys in sstable when giving out positions (CASSANDRA-8458)
 + * Disable mmap on Windows (CASSANDRA-6993)
 + * Add missing ConsistencyLevels to cassandra-stress (CASSANDRA-8253)
 + * Add auth support to cassandra-stress (CASSANDRA-7985)
 + * Fix ArrayIndexOutOfBoundsException when generating error message
 +   for some CQL syntax errors (CASSANDRA-8455)
 + * Scale memtable slab allocation logarithmically (CASSANDRA-7882)
 + * cassandra-stress simultaneous inserts over same seed (CASSANDRA-7964)
 + * Reduce cassandra-stress sampling memory requirements (CASSANDRA-7926)
 + * Ensure memtable flush cannot expire commit log entries from its future (CASSANDRA-8383)
 + * Make read "defrag" async to reclaim memtables (CASSANDRA-8459)
 + * Remove tmplink files for offline compactions (CASSANDRA-8321)
 + * Reduce maxHintsInProgress (CASSANDRA-8415)
 + * BTree updates may call provided update function twice (CASSANDRA-8018)
 + * Release sstable references after anticompaction (CASSANDRA-8386)
 + * Handle abort() in SSTableRewriter properly (CASSANDRA-8320)
 + * Fix high size calculations for prepared statements (CASSANDRA-8231)
 + * Centralize shared executors (CASSANDRA-8055)
 + * Fix filtering for CONTAINS (KEY) relations on frozen collection
 +   clustering columns when the query is restricted to a single
 +   partition (CASSANDRA-8203)
 + * Do more aggressive entire-sstable TTL expiry checks (CASSANDRA-8243)
 + * Add more log info if readMeter is null (CASSANDRA-8238)
 + * add check of the system wall clock time at startup (CASSANDRA-8305)
 + * Support for frozen collections (CASSANDRA-7859)
 + * Fix overflow on histogram computation (CASSANDRA-8028)
 + * Have paxos reuse the timestamp generation of normal queries (CASSANDRA-7801)
 + * Fix incremental repair not remove parent session on remote (CASSANDRA-8291)
 + * Improve JBOD disk utilization (CASSANDRA-7386)
 + * Log failed host when preparing incremental repair (CASSANDRA-8228)
 + * Force config client mode in CQLSSTableWriter (CASSANDRA-8281)
 +Merged from 2.0:
+  * Add batch remove iterator to ABSC (CASSANDRA-8414)
 -
 -
 -2.0.12:
 + * Round up time deltas lower than 1ms in BulkLoader (CASSANDRA-8645)
   * Use more efficient slice size for querying internal secondary
     index tables (CASSANDRA-8550)
   * Fix potentially returning deleted rows with range tombstone (CASSANDRA-8558)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06cd494c/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
index b5ed8d2,8d553be..64752e3
--- a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
@@@ -17,42 -17,31 +17,38 @@@
   */
  package org.apache.cassandra.db;
  
- import java.util.AbstractCollection;
- import java.util.Arrays;
- import java.util.Collection;
- import java.util.Comparator;
- import java.util.Iterator;
 -import java.nio.ByteBuffer;
+ import java.util.*;
  
  import com.google.common.base.Function;
  import com.google.common.collect.AbstractIterator;
  import com.google.common.collect.Iterables;
 -import com.google.common.collect.Lists;
  
- import net.nicoulaj.compilecommand.annotations.Inline;
  import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.db.composites.CellName;
 +import org.apache.cassandra.db.composites.Composite;
  import org.apache.cassandra.db.filter.ColumnSlice;
 -import org.apache.cassandra.db.marshal.AbstractType;
 -import org.apache.cassandra.utils.Allocator;
+ import org.apache.cassandra.utils.BatchRemoveIterator;
 +import org.apache.cassandra.utils.memory.AbstractAllocator;
  
  /**
 - * A ColumnFamily backed by an ArrayList.
 + * A ColumnFamily backed by an array.
   * This implementation is not synchronized and should only be used when
   * thread-safety is not required. This implementation makes sense when the
 - * main operations performed are iterating over the map and adding columns
 + * main operations performed are iterating over the cells and adding cells
   * (especially if insertion is in sorted order).
   */
 -public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
 +public class ArrayBackedSortedColumns extends ColumnFamily
  {
 +    private static final Cell[] EMPTY_ARRAY = new Cell[0];
 +    private static final int MINIMAL_CAPACITY = 10;
 +
      private final boolean reversed;
 -    private final ArrayList<Column> columns;
 +
 +    private DeletionInfo deletionInfo;
 +    private Cell[] cells;
 +    private int size;
 +    private int sortedSize;
 +    private volatile boolean isSorted;
  
      public static final ColumnFamily.Factory<ArrayBackedSortedColumns> factory = new Factory<ArrayBackedSortedColumns>()
      {
@@@ -114,15 -80,15 +110,103 @@@
          return reversed;
      }
  
 -    private Comparator<ByteBuffer> internalComparator()
++    public BatchRemoveIterator<Cell> batchRemoveIterator()
++    {
++        maybeSortCells();
++
++        return new BatchRemoveIterator<Cell>()
++        {
++            private final Iterator<Cell> iter = iterator();
++            private BitSet removedIndexes = new BitSet(size);
++            private int idx = -1;
++            private boolean shouldCallNext = false;
++            private boolean isCommitted = false;
++            private boolean removedAnything = false;
++
++            public void commit()
++            {
++                if (isCommitted)
++                    throw new IllegalStateException();
++                isCommitted = true;
++
++                if (!removedAnything)
++                    return;
++
++                // the lowest index both not visited and known to be not removed
++                int keepIdx = removedIndexes.nextClearBit(0);
++                // the running total of kept items
++                int resultLength = 0;
++                // start from the first not-removed cell, and shift left.
++                int removeIdx = removedIndexes.nextSetBit(keepIdx + 1);
++                while (removeIdx >= 0)
++                {
++                    int length = removeIdx - keepIdx;
++                    if (length > 0)
++                    {
++                        copy(keepIdx, resultLength, length);
++                        resultLength += length;
++                    }
++                    keepIdx = removedIndexes.nextClearBit(removeIdx + 1);
++                    if (keepIdx < 0)
++                        keepIdx = size;
++                    removeIdx = removedIndexes.nextSetBit(keepIdx + 1);
++                }
++                // Copy everything after the last deleted column
++                int length = size - keepIdx;
++                if (length > 0)
++                {
++                    copy(keepIdx, resultLength, length);
++                    resultLength += length;
++                }
++
++                for (int i = resultLength; i < size; i++)
++                    cells[i] = null;
++
++                size = sortedSize = resultLength;
++            }
++
++            private void copy(int src, int dst, int len)
++            {
++                // [src, src+len) and [dst, dst+len) might overlap but it's okay because we're going from left to right
++                assert dst <= src : "dst must not be greater than src";
++
++                if (dst < src)
++                    System.arraycopy(cells, src, cells, dst, len);
++            }
++
++            public boolean hasNext()
++            {
++                return iter.hasNext();
++            }
++
++            public Cell next()
++            {
++                idx++;
++                shouldCallNext = false;
++                return iter.next();
++            }
++
++            public void remove()
++            {
++                if (shouldCallNext)
++                    throw new IllegalStateException();
++
++                removedIndexes.set(reversed ? size - idx - 1 : idx);
++                removedAnything = true;
++                shouldCallNext = true;
++            }
++        };
++    }
++
 +    private Comparator<Composite> internalComparator()
      {
 -        return reversed ? getComparator().reverseComparator : getComparator();
 +        return reversed ? getComparator().reverseComparator() : getComparator();
      }
  
 -    public Column getColumn(ByteBuffer name)
 +    private void maybeSortCells()
      {
 -        int pos = binarySearch(name);
 -        return pos >= 0 ? columns.get(pos) : null;
 +        if (!isSorted)
 +            sortCells();
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06cd494c/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
index dc2b5ee,0000000..47f0b85
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
@@@ -1,559 -1,0 +1,564 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db;
 +
 +import java.util.AbstractCollection;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Comparator;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 +
 +import com.google.common.base.Function;
 +import com.google.common.base.Functions;
 +import com.google.common.collect.AbstractIterator;
 +import com.google.common.collect.Iterators;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.db.composites.CellName;
 +import org.apache.cassandra.db.composites.Composite;
 +import org.apache.cassandra.db.filter.ColumnSlice;
 +import org.apache.cassandra.utils.*;
 +import org.apache.cassandra.utils.btree.BTree;
 +import org.apache.cassandra.utils.btree.UpdateFunction;
 +import org.apache.cassandra.utils.concurrent.Locks;
 +import org.apache.cassandra.utils.concurrent.OpOrder;
 +import org.apache.cassandra.utils.memory.HeapAllocator;
 +import org.apache.cassandra.utils.memory.MemtableAllocator;
 +import org.apache.cassandra.utils.memory.NativePool;
 +
 +import static org.apache.cassandra.db.index.SecondaryIndexManager.Updater;
 +
 +/**
 + * A thread-safe and atomic ISortedColumns implementation.
 + * Operations (in particular addAll) on this implemenation are atomic and
 + * isolated (in the sense of ACID). Typically a addAll is guaranteed that no
 + * other thread can see the state where only parts but not all columns have
 + * been added.
 + * <p/>
 + * WARNING: removing element through getSortedColumns().iterator() is *not* supported
 + */
 +public class AtomicBTreeColumns extends ColumnFamily
 +{
 +    static final long EMPTY_SIZE = ObjectSizes.measure(new AtomicBTreeColumns(CFMetaData.IndexCf, null))
 +            + ObjectSizes.measure(new Holder(null, null));
 +
 +    // Reserved values for wasteTracker field. These values must not be consecutive (see avoidReservedValues)
 +    private static final int TRACKER_NEVER_WASTED = 0;
 +    private static final int TRACKER_PESSIMISTIC_LOCKING = Integer.MAX_VALUE;
 +
 +    // The granularity with which we track wasted allocation/work; we round up
 +    private static final int ALLOCATION_GRANULARITY_BYTES = 1024;
 +    // The number of bytes we have to waste in excess of our acceptable realtime rate of waste (defined below)
 +    private static final long EXCESS_WASTE_BYTES = 10 * 1024 * 1024L;
 +    private static final int EXCESS_WASTE_OFFSET = (int) (EXCESS_WASTE_BYTES / ALLOCATION_GRANULARITY_BYTES);
 +    // Note this is a shift, because dividing a long time and then picking the low 32 bits doesn't give correct rollover behavior
 +    private static final int CLOCK_SHIFT = 17;
 +    // CLOCK_GRANULARITY = 1^9ns >> CLOCK_SHIFT == 132us == (1/7.63)ms
 +
 +    /**
 +     * (clock + allocation) granularity are combined to give us an acceptable (waste) allocation rate that is defined by
 +     * the passage of real time of ALLOCATION_GRANULARITY_BYTES/CLOCK_GRANULARITY, or in this case 7.63Kb/ms, or 7.45Mb/s
 +     *
 +     * in wasteTracker we maintain within EXCESS_WASTE_OFFSET before the current time; whenever we waste bytes
 +     * we increment the current value if it is within this window, and set it to the min of the window plus our waste
 +     * otherwise.
 +     */
 +    private volatile int wasteTracker = TRACKER_NEVER_WASTED;
 +
 +    private static final AtomicIntegerFieldUpdater<AtomicBTreeColumns> wasteTrackerUpdater = AtomicIntegerFieldUpdater.newUpdater(AtomicBTreeColumns.class, "wasteTracker");
 +
 +    private static final Function<Cell, CellName> NAME = new Function<Cell, CellName>()
 +    {
 +        public CellName apply(Cell column)
 +        {
 +            return column.name();
 +        }
 +    };
 +
 +    public static final Factory<AtomicBTreeColumns> factory = new Factory<AtomicBTreeColumns>()
 +    {
 +        public AtomicBTreeColumns create(CFMetaData metadata, boolean insertReversed, int initialCapacity)
 +        {
 +            if (insertReversed)
 +                throw new IllegalArgumentException();
 +            return new AtomicBTreeColumns(metadata);
 +        }
 +    };
 +
 +    private static final DeletionInfo LIVE = DeletionInfo.live();
 +    // This is a small optimization: DeletionInfo is mutable, but we know that we will always copy it in that class,
 +    // so we can safely alias one DeletionInfo.live() reference and avoid some allocations.
 +    private static final Holder EMPTY = new Holder(BTree.empty(), LIVE);
 +
 +    private volatile Holder ref;
 +
 +    private static final AtomicReferenceFieldUpdater<AtomicBTreeColumns, Holder> refUpdater = AtomicReferenceFieldUpdater.newUpdater(AtomicBTreeColumns.class, Holder.class, "ref");
 +
 +    private AtomicBTreeColumns(CFMetaData metadata)
 +    {
 +        this(metadata, EMPTY);
 +    }
 +
 +    private AtomicBTreeColumns(CFMetaData metadata, Holder holder)
 +    {
 +        super(metadata);
 +        this.ref = holder;
 +    }
 +
 +    public Factory getFactory()
 +    {
 +        return factory;
 +    }
 +
 +    public ColumnFamily cloneMe()
 +    {
 +        return new AtomicBTreeColumns(metadata, ref);
 +    }
 +
 +    public DeletionInfo deletionInfo()
 +    {
 +        return ref.deletionInfo;
 +    }
 +
 +    public void delete(DeletionTime delTime)
 +    {
 +        delete(new DeletionInfo(delTime));
 +    }
 +
 +    protected void delete(RangeTombstone tombstone)
 +    {
 +        delete(new DeletionInfo(tombstone, getComparator()));
 +    }
 +
 +    public void delete(DeletionInfo info)
 +    {
 +        if (info.isLive())
 +            return;
 +
 +        // Keeping deletion info for max markedForDeleteAt value
 +        while (true)
 +        {
 +            Holder current = ref;
 +            DeletionInfo curDelInfo = current.deletionInfo;
 +            DeletionInfo newDelInfo = info.mayModify(curDelInfo) ? curDelInfo.copy().add(info) : curDelInfo;
 +            if (refUpdater.compareAndSet(this, current, current.with(newDelInfo)))
 +                break;
 +        }
 +    }
 +
 +    public void setDeletionInfo(DeletionInfo newInfo)
 +    {
 +        ref = ref.with(newInfo);
 +    }
 +
 +    public void purgeTombstones(int gcBefore)
 +    {
 +        while (true)
 +        {
 +            Holder current = ref;
 +            if (!current.deletionInfo.hasPurgeableTombstones(gcBefore))
 +                break;
 +
 +            DeletionInfo purgedInfo = current.deletionInfo.copy();
 +            purgedInfo.purge(gcBefore);
 +            if (refUpdater.compareAndSet(this, current, current.with(purgedInfo)))
 +                break;
 +        }
 +    }
 +
 +    /**
 +     * This is only called by Memtable.resolve, so only AtomicBTreeColumns needs to implement it.
 +     *
 +     * @return the difference in size seen after merging the given columns
 +     */
 +    public Pair<Long, Long> addAllWithSizeDelta(final ColumnFamily cm, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
 +    {
 +        ColumnUpdater updater = new ColumnUpdater(this, cm.metadata, allocator, writeOp, indexer);
 +        DeletionInfo inputDeletionInfoCopy = null;
 +
 +        boolean monitorOwned = false;
 +        try
 +        {
 +            if (usePessimisticLocking())
 +            {
 +                Locks.monitorEnterUnsafe(this);
 +                monitorOwned = true;
 +            }
 +            while (true)
 +            {
 +                Holder current = ref;
 +                updater.ref = current;
 +                updater.reset();
 +
 +                DeletionInfo deletionInfo;
 +                if (cm.deletionInfo().mayModify(current.deletionInfo))
 +                {
 +                    if (inputDeletionInfoCopy == null)
 +                        inputDeletionInfoCopy = cm.deletionInfo().copy(HeapAllocator.instance);
 +
 +                    deletionInfo = current.deletionInfo.copy().add(inputDeletionInfoCopy);
 +                    updater.allocated(deletionInfo.unsharedHeapSize() - current.deletionInfo.unsharedHeapSize());
 +                }
 +                else
 +                {
 +                    deletionInfo = current.deletionInfo;
 +                }
 +
 +                Object[] tree = BTree.update(current.tree, metadata.comparator.columnComparator(Memtable.MEMORY_POOL instanceof NativePool), cm, cm.getColumnCount(), true, updater);
 +
 +                if (tree != null && refUpdater.compareAndSet(this, current, new Holder(tree, deletionInfo)))
 +                {
 +                    indexer.updateRowLevelIndexes();
 +                    updater.finish();
 +                    return Pair.create(updater.dataSize, updater.colUpdateTimeDelta);
 +                }
 +                else if (!monitorOwned)
 +                {
 +                    boolean shouldLock = usePessimisticLocking();
 +                    if (!shouldLock)
 +                    {
 +                        shouldLock = updateWastedAllocationTracker(updater.heapSize);
 +                    }
 +                    if (shouldLock)
 +                    {
 +                        Locks.monitorEnterUnsafe(this);
 +                        monitorOwned = true;
 +                    }
 +                }
 +            }
 +        }
 +        finally
 +        {
 +            if (monitorOwned)
 +                Locks.monitorExitUnsafe(this);
 +        }
 +    }
 +
 +    boolean usePessimisticLocking()
 +    {
 +        return wasteTracker == TRACKER_PESSIMISTIC_LOCKING;
 +    }
 +
 +    /**
 +     * Update the wasted allocation tracker state based on newly wasted allocation information
 +     *
 +     * @param wastedBytes the number of bytes wasted by this thread
 +     * @return true if the caller should now proceed with pessimistic locking because the waste limit has been reached
 +     */
 +    private boolean updateWastedAllocationTracker(long wastedBytes) {
 +        // Early check for huge allocation that exceeds the limit
 +        if (wastedBytes < EXCESS_WASTE_BYTES)
 +        {
 +            // We round up to ensure work < granularity are still accounted for
 +            int wastedAllocation = ((int) (wastedBytes + ALLOCATION_GRANULARITY_BYTES - 1)) / ALLOCATION_GRANULARITY_BYTES;
 +
 +            int oldTrackerValue;
 +            while (TRACKER_PESSIMISTIC_LOCKING != (oldTrackerValue = wasteTracker))
 +            {
 +                // Note this time value has an arbitrary offset, but is a constant rate 32 bit counter (that may wrap)
 +                int time = (int) (System.nanoTime() >>> CLOCK_SHIFT);
 +                int delta = oldTrackerValue - time;
 +                if (oldTrackerValue == TRACKER_NEVER_WASTED || delta >= 0 || delta < -EXCESS_WASTE_OFFSET)
 +                    delta = -EXCESS_WASTE_OFFSET;
 +                delta += wastedAllocation;
 +                if (delta >= 0)
 +                    break;
 +                if (wasteTrackerUpdater.compareAndSet(this, oldTrackerValue, avoidReservedValues(time + delta)))
 +                    return false;
 +            }
 +        }
 +        // We have definitely reached our waste limit so set the state if it isn't already
 +        wasteTrackerUpdater.set(this, TRACKER_PESSIMISTIC_LOCKING);
 +        // And tell the caller to proceed with pessimistic locking
 +        return true;
 +    }
 +
 +    private static int avoidReservedValues(int wasteTracker)
 +    {
 +        if (wasteTracker == TRACKER_NEVER_WASTED || wasteTracker == TRACKER_PESSIMISTIC_LOCKING)
 +            return wasteTracker + 1;
 +        return wasteTracker;
 +    }
 +
 +    // no particular reason not to implement these next methods, we just haven't needed them yet
 +
 +    public void addColumn(Cell column)
 +    {
 +        throw new UnsupportedOperationException();
 +    }
 +
 +    public void maybeAppendColumn(Cell cell, DeletionInfo.InOrderTester tester, int gcBefore)
 +    {
 +        throw new UnsupportedOperationException();
 +    }
 +
 +    public void addAll(ColumnFamily cf)
 +    {
 +        throw new UnsupportedOperationException();
 +    }
 +
 +    public void clear()
 +    {
 +        throw new UnsupportedOperationException();
 +    }
 +
 +    public Cell getColumn(CellName name)
 +    {
 +        return (Cell) BTree.find(ref.tree, asymmetricComparator(), name);
 +    }
 +
 +    private Comparator<Object> asymmetricComparator()
 +    {
 +        return metadata.comparator.asymmetricColumnComparator(Memtable.MEMORY_POOL instanceof NativePool);
 +    }
 +
 +    public Iterable<CellName> getColumnNames()
 +    {
 +        return collection(false, NAME);
 +    }
 +
 +    public Collection<Cell> getSortedColumns()
 +    {
 +        return collection(true, Functions.<Cell>identity());
 +    }
 +
 +    public Collection<Cell> getReverseSortedColumns()
 +    {
 +        return collection(false, Functions.<Cell>identity());
 +    }
 +
 +    private <V> Collection<V> collection(final boolean forwards, final Function<Cell, V> f)
 +    {
 +        final Holder ref = this.ref;
 +        return new AbstractCollection<V>()
 +        {
 +            public Iterator<V> iterator()
 +            {
 +                return Iterators.transform(BTree.<Cell>slice(ref.tree, forwards), f);
 +            }
 +
 +            public int size()
 +            {
 +                return BTree.slice(ref.tree, true).count();
 +            }
 +        };
 +    }
 +
 +    public int getColumnCount()
 +    {
 +        return BTree.slice(ref.tree, true).count();
 +    }
 +
 +    public boolean hasColumns()
 +    {
 +        return !BTree.isEmpty(ref.tree);
 +    }
 +
 +    public Iterator<Cell> iterator(ColumnSlice[] slices)
 +    {
 +        return slices.length == 1
 +             ? slice(ref.tree, asymmetricComparator(), slices[0].start, slices[0].finish, true)
 +             : new SliceIterator(ref.tree, asymmetricComparator(), true, slices);
 +    }
 +
 +    public Iterator<Cell> reverseIterator(ColumnSlice[] slices)
 +    {
 +        return slices.length == 1
 +             ? slice(ref.tree, asymmetricComparator(), slices[0].finish, slices[0].start, false)
 +             : new SliceIterator(ref.tree, asymmetricComparator(), false, slices);
 +    }
 +
 +    public boolean isInsertReversed()
 +    {
 +        return false;
 +    }
 +
++    public BatchRemoveIterator<Cell> batchRemoveIterator()
++    {
++        throw new UnsupportedOperationException();
++    }
++
 +    private static final class Holder
 +    {
 +        final DeletionInfo deletionInfo;
 +        // the btree of columns
 +        final Object[] tree;
 +
 +        Holder(Object[] tree, DeletionInfo deletionInfo)
 +        {
 +            this.tree = tree;
 +            this.deletionInfo = deletionInfo;
 +        }
 +
 +        Holder with(DeletionInfo info)
 +        {
 +            return new Holder(this.tree, info);
 +        }
 +    }
 +
 +    // the function we provide to the btree utilities to perform any column replacements
 +    private static final class ColumnUpdater implements UpdateFunction<Cell>
 +    {
 +        final AtomicBTreeColumns updating;
 +        final CFMetaData metadata;
 +        final MemtableAllocator allocator;
 +        final OpOrder.Group writeOp;
 +        final Updater indexer;
 +        Holder ref;
 +        long dataSize;
 +        long heapSize;
 +        long colUpdateTimeDelta = Long.MAX_VALUE;
 +        final MemtableAllocator.DataReclaimer reclaimer;
 +        List<Cell> inserted; // TODO: replace with walk of aborted BTree
 +
 +        private ColumnUpdater(AtomicBTreeColumns updating, CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
 +        {
 +            this.updating = updating;
 +            this.allocator = allocator;
 +            this.writeOp = writeOp;
 +            this.indexer = indexer;
 +            this.metadata = metadata;
 +            this.reclaimer = allocator.reclaimer();
 +        }
 +
 +        public Cell apply(Cell insert)
 +        {
 +            indexer.insert(insert);
 +            insert = insert.localCopy(metadata, allocator, writeOp);
 +            this.dataSize += insert.cellDataSize();
 +            this.heapSize += insert.unsharedHeapSizeExcludingData();
 +            if (inserted == null)
 +                inserted = new ArrayList<>();
 +            inserted.add(insert);
 +            return insert;
 +        }
 +
 +        public Cell apply(Cell existing, Cell update)
 +        {
 +            Cell reconciled = existing.reconcile(update);
 +            indexer.update(existing, reconciled);
 +            if (existing != reconciled)
 +            {
 +                reconciled = reconciled.localCopy(metadata, allocator, writeOp);
 +                dataSize += reconciled.cellDataSize() - existing.cellDataSize();
 +                heapSize += reconciled.unsharedHeapSizeExcludingData() - existing.unsharedHeapSizeExcludingData();
 +                if (inserted == null)
 +                    inserted = new ArrayList<>();
 +                inserted.add(reconciled);
 +                discard(existing);
 +                //Getting the minimum delta for an update containing multiple columns
 +                colUpdateTimeDelta =  Math.min(Math.abs(existing.timestamp()  - update.timestamp()), colUpdateTimeDelta);
 +            }
 +            return reconciled;
 +        }
 +
 +        protected void reset()
 +        {
 +            this.dataSize = 0;
 +            this.heapSize = 0;
 +            if (inserted != null)
 +            {
 +                for (Cell cell : inserted)
 +                    abort(cell);
 +                inserted.clear();
 +            }
 +            reclaimer.cancel();
 +        }
 +
 +        protected void abort(Cell abort)
 +        {
 +            reclaimer.reclaimImmediately(abort);
 +        }
 +
 +        protected void discard(Cell discard)
 +        {
 +            reclaimer.reclaim(discard);
 +        }
 +
 +        public boolean abortEarly()
 +        {
 +            return updating.ref != ref;
 +        }
 +
 +        public void allocated(long heapSize)
 +        {
 +            this.heapSize += heapSize;
 +        }
 +
 +        protected void finish()
 +        {
 +            allocator.onHeap().allocate(heapSize, writeOp);
 +            reclaimer.commit();
 +        }
 +    }
 +
 +    private static class SliceIterator extends AbstractIterator<Cell>
 +    {
 +        private final Object[] btree;
 +        private final boolean forwards;
 +        private final Comparator<Object> comparator;
 +        private final ColumnSlice[] slices;
 +
 +        private int idx = 0;
 +        private Iterator<Cell> currentSlice;
 +
 +        SliceIterator(Object[] btree, Comparator<Object> comparator, boolean forwards, ColumnSlice[] slices)
 +        {
 +            this.btree = btree;
 +            this.comparator = comparator;
 +            this.slices = slices;
 +            this.forwards = forwards;
 +        }
 +
 +        protected Cell computeNext()
 +        {
 +            while (currentSlice != null || idx < slices.length)
 +            {
 +                if (currentSlice == null)
 +                {
 +                    ColumnSlice slice = slices[idx++];
 +                    if (forwards)
 +                        currentSlice = slice(btree, comparator, slice.start, slice.finish, true);
 +                    else
 +                        currentSlice = slice(btree, comparator, slice.finish, slice.start, false);
 +                }
 +
 +                if (currentSlice.hasNext())
 +                    return currentSlice.next();
 +
 +                currentSlice = null;
 +            }
 +
 +            return endOfData();
 +        }
 +    }
 +
 +    private static Iterator<Cell> slice(Object[] btree, Comparator<Object> comparator, Composite start, Composite finish, boolean forwards)
 +    {
 +        return BTree.slice(btree,
 +                           comparator,
 +                           start.isEmpty() ? null : start,
 +                           true,
 +                           finish.isEmpty() ? null : finish,
 +                           true,
 +                           forwards);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06cd494c/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamily.java
index 483ecb0,19f8c16..f21d161
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@@ -514,6 -532,38 +514,12 @@@ public abstract class ColumnFamily impl
          return ByteBuffer.wrap(out.getData(), 0, out.getLength());
      }
  
+ 
+     /**
+      * @return an iterator where the removes are carried out once everything has been iterated
+      */
 -    public BatchRemoveIterator<Column> batchRemoveIterator()
 -    {
 -        // Default implementation is the ordinary iterator
 -        return new BatchRemoveIterator<Column>()
 -        {
 -            private final Iterator<Column> iter = iterator();
 -
 -            public void commit()
 -            {
 -            }
 -
 -            public boolean hasNext()
 -            {
 -                return iter.hasNext();
 -            }
 -
 -            public Column next()
 -            {
 -                return iter.next();
 -            }
 -
 -            public void remove()
 -            {
 -                iter.remove();
 -            }
 -        };
 -    }
++    public abstract BatchRemoveIterator<Cell> batchRemoveIterator();
+ 
      public abstract static class Factory <T extends ColumnFamily>
      {
          /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06cd494c/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 0c95b0e,34d3f1d..3822648
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -1237,14 -951,15 +1237,14 @@@ public class ColumnFamilyStore implemen
       * columns that have been dropped from the schema (for CQL3 tables only).
       * @return the updated ColumnFamily
       */
 -    public static long removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer)
 +    public static ColumnFamily removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer)
      {
-         Iterator<Cell> iter = cf.iterator();
 -        BatchRemoveIterator<Column> iter = cf.batchRemoveIterator();
++        BatchRemoveIterator<Cell> iter = cf.batchRemoveIterator();
          DeletionInfo.InOrderTester tester = cf.inOrderDeletionTester();
          boolean hasDroppedColumns = !cf.metadata.getDroppedColumns().isEmpty();
 -        long removedBytes = 0;
          while (iter.hasNext())
          {
 -            Column c = iter.next();
 +            Cell c = iter.next();
              // remove columns if
              // (a) the column itself is gcable or
              // (b) the column is shadowed by a CF tombstone
@@@ -1253,10 -968,16 +1253,10 @@@
              {
                  iter.remove();
                  indexer.remove(c);
 -                removedBytes += c.dataSize();
              }
          }
- 
+         iter.commit();
 -        return removedBytes;
 -    }
 -
 -    public static long removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore)
 -    {
 -        return removeDeletedColumnsOnly(cf, gcBefore, SecondaryIndexManager.nullUpdater);
 +        return cf;
      }
  
      // returns true if
@@@ -1273,7 -994,7 +1273,7 @@@
          if (cf == null || cf.metadata.getDroppedColumns().isEmpty())
              return;
  
-         Iterator<Cell> iter = cf.iterator();
 -        BatchRemoveIterator<Column> iter = cf.batchRemoveIterator();
++        BatchRemoveIterator<Cell> iter = cf.batchRemoveIterator();
          while (iter.hasNext())
              if (isDroppedColumn(iter.next(), metadata))
                  iter.remove();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06cd494c/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
index 83a58e4,90cd70f..18851d4
--- a/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
+++ b/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
@@@ -25,13 -27,16 +25,16 @@@ import org.junit.Test
  
  import static org.junit.Assert.*;
  
 -import com.google.common.base.Functions;
+ import com.google.common.collect.Sets;
+ 
  import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.config.CFMetaData;
  import org.apache.cassandra.config.Schema;
- import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.db.composites.*;
 +import org.apache.cassandra.db.filter.ColumnSlice;
 +import org.apache.cassandra.db.marshal.Int32Type;
+ import org.apache.cassandra.utils.BatchRemoveIterator;
+ import org.apache.cassandra.utils.ByteBufferUtil;
 -import org.apache.cassandra.db.filter.ColumnSlice;
 -import org.apache.cassandra.utils.HeapAllocator;
  
  public class ArrayBackedSortedColumnsTest extends SchemaLoader
  {
@@@ -265,4 -195,95 +268,98 @@@
          iter.remove();
          assertTrue(!iter.hasNext());
      }
+ 
+     @Test(expected = IllegalStateException.class)
+     public void testBatchRemoveTwice()
+     {
++        CellNameType type = new SimpleDenseCellNameType(Int32Type.instance);
+         ColumnFamily map = ArrayBackedSortedColumns.factory.create(metadata(), false);
 -        map.addColumn(new Column(ByteBufferUtil.bytes(1)), HeapAllocator.instance);
 -        map.addColumn(new Column(ByteBufferUtil.bytes(2)), HeapAllocator.instance);
++        map.addColumn(new BufferCell(type.makeCellName(1)));
++        map.addColumn(new BufferCell(type.makeCellName(2)));
+ 
 -        BatchRemoveIterator<Column> batchIter = map.batchRemoveIterator();
++        BatchRemoveIterator<Cell> batchIter = map.batchRemoveIterator();
+         batchIter.next();
+         batchIter.remove();
+         batchIter.remove();
+     }
+ 
+     @Test(expected = IllegalStateException.class)
+     public void testBatchCommitTwice()
+     {
++        CellNameType type = new SimpleDenseCellNameType(Int32Type.instance);
+         ColumnFamily map = ArrayBackedSortedColumns.factory.create(metadata(), false);
 -        map.addColumn(new Column(ByteBufferUtil.bytes(1)), HeapAllocator.instance);
 -        map.addColumn(new Column(ByteBufferUtil.bytes(2)), HeapAllocator.instance);
++        map.addColumn(new BufferCell(type.makeCellName(1)));
++        map.addColumn(new BufferCell(type.makeCellName(2)));
+ 
 -        BatchRemoveIterator<Column> batchIter = map.batchRemoveIterator();
++        BatchRemoveIterator<Cell> batchIter = map.batchRemoveIterator();
+         batchIter.next();
+         batchIter.remove();
+         batchIter.commit();
+         batchIter.commit();
+     }
+ 
+     @Test
+     public void testBatchRemove()
+     {
+         testBatchRemoveInternal(false);
+         testBatchRemoveInternal(true);
+     }
+ 
+     public void testBatchRemoveInternal(boolean reversed)
+     {
++        CellNameType type = new SimpleDenseCellNameType(Int32Type.instance);
+         ColumnFamily map = ArrayBackedSortedColumns.factory.create(metadata(), reversed);
+         int[] values = new int[]{ 1, 2, 3, 5 };
+ 
+         for (int i = 0; i < values.length; ++i)
 -            map.addColumn(new Column(ByteBufferUtil.bytes(values[reversed ? values.length - 1 - i : i])), HeapAllocator.instance);
++            map.addColumn(new BufferCell(type.makeCellName(values[reversed ? values.length - 1 - i : i])));
+ 
 -        BatchRemoveIterator<Column> batchIter = map.batchRemoveIterator();
++        BatchRemoveIterator<Cell> batchIter = map.batchRemoveIterator();
+         batchIter.next();
+         batchIter.remove();
+         batchIter.next();
+         batchIter.remove();
+ 
 -        assertEquals("1st column before commit", 1, map.iterator().next().name().getInt(0));
++        assertEquals("1st column before commit", 1, map.iterator().next().name().toByteBuffer().getInt(0));
+ 
+         batchIter.commit();
+ 
 -        assertEquals("1st column after commit", 3, map.iterator().next().name().getInt(0));
++        assertEquals("1st column after commit", 3, map.iterator().next().name().toByteBuffer().getInt(0));
+     }
+ 
+     @Test
+     public void testBatchRemoveCopy()
+     {
+         // Test delete some random columns and check the result
++        CellNameType type = new SimpleDenseCellNameType(Int32Type.instance);
+         ColumnFamily map = ArrayBackedSortedColumns.factory.create(metadata(), false);
+         int n = 127;
+         int[] values = new int[n];
 -        for (int i = 0; i < n; i++) values[i] = i;
++        for (int i = 0; i < n; i++)
++            values[i] = i;
+         Set<Integer> toRemove = Sets.newHashSet(3, 12, 13, 15, 58, 103, 112);
+ 
+         for (int value : values)
 -            map.addColumn(new Column(ByteBufferUtil.bytes(value)), HeapAllocator.instance);
++            map.addColumn(new BufferCell(type.makeCellName(value)));
+ 
 -        BatchRemoveIterator<Column> batchIter = map.batchRemoveIterator();
++        BatchRemoveIterator<Cell> batchIter = map.batchRemoveIterator();
+         while (batchIter.hasNext())
 -            if (toRemove.contains(batchIter.next().name().getInt(0)))
++            if (toRemove.contains(batchIter.next().name().toByteBuffer().getInt(0)))
+                 batchIter.remove();
+ 
+         batchIter.commit();
+ 
+         int expected = 0;
 -
+         while (toRemove.contains(expected))
+             expected++;
+ 
 -        for (Column column : map)
++        for (Cell column : map)
+         {
 -            assertEquals(expected, column.name().getInt(0));
++            assertEquals(expected, column.name().toByteBuffer().getInt(0));
+             expected++;
+             while (toRemove.contains(expected))
+                 expected++;
+         }
 -
+         assertEquals(expected, n);
+     }
  }