You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/01/22 00:18:06 UTC

cassandra git commit: Add batch remove iterator to ABSC

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.0 ae380da9e -> cc5fb19e5


Add batch remove iterator to ABSC

patch by Jimmy Mårdell; reviewed by Richard Low for CASSANDRA-8414


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cc5fb19e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cc5fb19e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cc5fb19e

Branch: refs/heads/cassandra-2.0
Commit: cc5fb19e5c864110b798375f43ec1597904d03ab
Parents: ae380da
Author: Jimmy Mårdell <ya...@spotify.com>
Authored: Thu Jan 22 02:16:10 2015 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Thu Jan 22 02:16:10 2015 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/db/ArrayBackedSortedColumns.java  | 89 ++++++++++++++++++-
 .../org/apache/cassandra/db/ColumnFamily.java   | 32 +++++++
 .../apache/cassandra/db/ColumnFamilyStore.java  |  6 +-
 .../cassandra/utils/BatchRemoveIterator.java    | 32 +++++++
 .../db/ArrayBackedSortedColumnsTest.java        | 93 ++++++++++++++++++++
 6 files changed, 249 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc5fb19e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6604783..0d08cce 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 2.0.13:
  * Round up time deltas lower than 1ms in BulkLoader (CASSANDRA-8645)
+ * Add batch remove iterator to ABSC (CASSANDRA-8414)
 
 
 2.0.12:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc5fb19e/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
index 389e0f8..8d553be 100644
--- a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.filter.ColumnSlice;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.utils.Allocator;
+import org.apache.cassandra.utils.BatchRemoveIterator;
 
 /**
  * A ColumnFamily backed by an ArrayList.
@@ -54,14 +55,14 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
     {
         super(metadata);
         this.reversed = reversed;
-        this.columns = new ArrayList<Column>();
+        this.columns = new ArrayList<>();
     }
 
     private ArrayBackedSortedColumns(Collection<Column> columns, CFMetaData metadata, boolean reversed)
     {
         super(metadata);
         this.reversed = reversed;
-        this.columns = new ArrayList<Column>(columns);
+        this.columns = new ArrayList<>(columns);
     }
 
     public ColumnFamily.Factory getFactory()
@@ -292,6 +293,90 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
         return new SlicesIterator(columns, getComparator(), slices, !reversed);
     }
 
+    @Override
+    public BatchRemoveIterator<Column> batchRemoveIterator()
+    {
+        return new BatchRemoveIterator<Column>()
+        {
+            private Iterator<Column> iter = iterator();
+            private BitSet removedIndexes = new BitSet(columns.size());
+            private int idx = -1;
+            private boolean shouldCallNext = true;
+            private boolean isCommitted = false;
+            private boolean removedAnything = false;
+
+            public void commit()
+            {
+                if (isCommitted)
+                    throw new IllegalStateException();
+                isCommitted = true;
+
+                if (!removedAnything)
+                    return;
+
+                // the lowest index both not visited and known to be not removed
+                int keepIdx = removedIndexes.nextClearBit(0);
+                // the running total of kept items
+                int resultLength = 0;
+                // start from the first not-removed cell, and shift left.
+                int removeIdx = removedIndexes.nextSetBit(keepIdx + 1);
+                while (removeIdx >= 0)
+                {
+                    int length = removeIdx - keepIdx;
+                    if (length > 0)
+                    {
+                        copy(keepIdx, resultLength, length);
+                        resultLength += length;
+                    }
+                    keepIdx = removedIndexes.nextClearBit(removeIdx + 1);
+                    if (keepIdx < 0)
+                        keepIdx = columns.size();
+                    removeIdx = removedIndexes.nextSetBit(keepIdx + 1);
+                }
+                // Copy everything after the last deleted column
+                int length = columns.size() - keepIdx;
+                if (length > 0)
+                {
+                    copy(keepIdx, resultLength, length);
+                    resultLength += length;
+                }
+
+                columns.subList(resultLength, columns.size()).clear();
+            }
+
+            private void copy(int src, int dst, int len)
+            {
+                // [src, src+len) and [dst, dst+len) might overlap, but it's okay because we're going from left to right
+                assert dst <= src : "dst must not be greater than src";
+
+                if (dst < src)
+                    Collections.copy(columns.subList(dst, dst + len), columns.subList(src, src + len));
+            }
+
+            public boolean hasNext()
+            {
+                return iter.hasNext();
+            }
+
+            public Column next()
+            {
+                idx++;
+                shouldCallNext = false;
+                return iter.next();
+            }
+
+            public void remove()
+            {
+                if (shouldCallNext)
+                    throw new IllegalStateException();
+
+                removedIndexes.set(reversed ? columns.size() - idx - 1 : idx);
+                removedAnything = true;
+                shouldCallNext = true;
+            }
+        };
+    }
+
     private static class SlicesIterator extends AbstractIterator<Column>
     {
         private final List<Column> list;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc5fb19e/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java
index 7edf825..19f8c16 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -532,6 +532,38 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
         return ByteBuffer.wrap(out.getData(), 0, out.getLength());
     }
 
+
+    /**
+     * @return an iterator where the removes are carried out once everything has been iterated
+     */
+    public BatchRemoveIterator<Column> batchRemoveIterator()
+    {
+        // Default implementation is the ordinary iterator
+        return new BatchRemoveIterator<Column>()
+        {
+            private final Iterator<Column> iter = iterator();
+
+            public void commit()
+            {
+            }
+
+            public boolean hasNext()
+            {
+                return iter.hasNext();
+            }
+
+            public Column next()
+            {
+                return iter.next();
+            }
+
+            public void remove()
+            {
+                iter.remove();
+            }
+        };
+    }
+
     public abstract static class Factory <T extends ColumnFamily>
     {
         /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc5fb19e/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index e936473..34d3f1d 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -953,7 +953,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      */
     public static long removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer)
     {
-        Iterator<Column> iter = cf.iterator();
+        BatchRemoveIterator<Column> iter = cf.batchRemoveIterator();
         DeletionInfo.InOrderTester tester = cf.inOrderDeletionTester();
         boolean hasDroppedColumns = !cf.metadata.getDroppedColumns().isEmpty();
         long removedBytes = 0;
@@ -971,6 +971,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                 removedBytes += c.dataSize();
             }
         }
+        iter.commit();
         return removedBytes;
     }
 
@@ -993,10 +994,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         if (cf == null || cf.metadata.getDroppedColumns().isEmpty())
             return;
 
-        Iterator<Column> iter = cf.iterator();
+        BatchRemoveIterator<Column> iter = cf.batchRemoveIterator();
         while (iter.hasNext())
             if (isDroppedColumn(iter.next(), metadata))
                 iter.remove();
+        iter.commit();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc5fb19e/src/java/org/apache/cassandra/utils/BatchRemoveIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/BatchRemoveIterator.java b/src/java/org/apache/cassandra/utils/BatchRemoveIterator.java
new file mode 100644
index 0000000..4377426
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/BatchRemoveIterator.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.util.Iterator;
+
+/**
+ * Iterator that allows us to more efficiently remove many items
+ */
+public interface BatchRemoveIterator<T> extends Iterator<T>
+{
+    /**
+     * Commits the remove operations in this batch iterator. After this no more
+     * deletes can be made. Any further calls to remove() or commit() will throw IllegalStateException.
+     */
+    void commit();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc5fb19e/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java b/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
index 06e2e75..90cd70f 100644
--- a/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
+++ b/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
@@ -28,10 +28,12 @@ import org.junit.Test;
 import static org.junit.Assert.*;
 
 import com.google.common.base.Functions;
+import com.google.common.collect.Sets;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.utils.BatchRemoveIterator;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.db.filter.ColumnSlice;
 import org.apache.cassandra.utils.HeapAllocator;
@@ -193,4 +195,95 @@ public class ArrayBackedSortedColumnsTest extends SchemaLoader
         iter.remove();
         assertTrue(!iter.hasNext());
     }
+
+    @Test(expected = IllegalStateException.class)
+    public void testBatchRemoveTwice()
+    {
+        ColumnFamily map = ArrayBackedSortedColumns.factory.create(metadata(), false);
+        map.addColumn(new Column(ByteBufferUtil.bytes(1)), HeapAllocator.instance);
+        map.addColumn(new Column(ByteBufferUtil.bytes(2)), HeapAllocator.instance);
+
+        BatchRemoveIterator<Column> batchIter = map.batchRemoveIterator();
+        batchIter.next();
+        batchIter.remove();
+        batchIter.remove();
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testBatchCommitTwice()
+    {
+        ColumnFamily map = ArrayBackedSortedColumns.factory.create(metadata(), false);
+        map.addColumn(new Column(ByteBufferUtil.bytes(1)), HeapAllocator.instance);
+        map.addColumn(new Column(ByteBufferUtil.bytes(2)), HeapAllocator.instance);
+
+        BatchRemoveIterator<Column> batchIter = map.batchRemoveIterator();
+        batchIter.next();
+        batchIter.remove();
+        batchIter.commit();
+        batchIter.commit();
+    }
+
+    @Test
+    public void testBatchRemove()
+    {
+        testBatchRemoveInternal(false);
+        testBatchRemoveInternal(true);
+    }
+
+    public void testBatchRemoveInternal(boolean reversed)
+    {
+        ColumnFamily map = ArrayBackedSortedColumns.factory.create(metadata(), reversed);
+        int[] values = new int[]{ 1, 2, 3, 5 };
+
+        for (int i = 0; i < values.length; ++i)
+            map.addColumn(new Column(ByteBufferUtil.bytes(values[reversed ? values.length - 1 - i : i])), HeapAllocator.instance);
+
+        BatchRemoveIterator<Column> batchIter = map.batchRemoveIterator();
+        batchIter.next();
+        batchIter.remove();
+        batchIter.next();
+        batchIter.remove();
+
+        assertEquals("1st column before commit", 1, map.iterator().next().name().getInt(0));
+
+        batchIter.commit();
+
+        assertEquals("1st column after commit", 3, map.iterator().next().name().getInt(0));
+    }
+
+    @Test
+    public void testBatchRemoveCopy()
+    {
+        // Test delete some random columns and check the result
+        ColumnFamily map = ArrayBackedSortedColumns.factory.create(metadata(), false);
+        int n = 127;
+        int[] values = new int[n];
+        for (int i = 0; i < n; i++) values[i] = i;
+        Set<Integer> toRemove = Sets.newHashSet(3, 12, 13, 15, 58, 103, 112);
+
+        for (int value : values)
+            map.addColumn(new Column(ByteBufferUtil.bytes(value)), HeapAllocator.instance);
+
+        BatchRemoveIterator<Column> batchIter = map.batchRemoveIterator();
+        while (batchIter.hasNext())
+            if (toRemove.contains(batchIter.next().name().getInt(0)))
+                batchIter.remove();
+
+        batchIter.commit();
+
+        int expected = 0;
+
+        while (toRemove.contains(expected))
+            expected++;
+
+        for (Column column : map)
+        {
+            assertEquals(expected, column.name().getInt(0));
+            expected++;
+            while (toRemove.contains(expected))
+                expected++;
+        }
+
+        assertEquals(expected, n);
+    }
 }