You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/01/22 00:18:06 UTC
cassandra git commit: Add batch remove iterator to ABSC
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.0 ae380da9e -> cc5fb19e5
Add batch remove iterator to ABSC
patch by Jimmy Mårdell; reviewed by Richard Low for CASSANDRA-8414
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cc5fb19e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cc5fb19e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cc5fb19e
Branch: refs/heads/cassandra-2.0
Commit: cc5fb19e5c864110b798375f43ec1597904d03ab
Parents: ae380da
Author: Jimmy Mårdell <ya...@spotify.com>
Authored: Thu Jan 22 02:16:10 2015 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Thu Jan 22 02:16:10 2015 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/db/ArrayBackedSortedColumns.java | 89 ++++++++++++++++++-
.../org/apache/cassandra/db/ColumnFamily.java | 32 +++++++
.../apache/cassandra/db/ColumnFamilyStore.java | 6 +-
.../cassandra/utils/BatchRemoveIterator.java | 32 +++++++
.../db/ArrayBackedSortedColumnsTest.java | 93 ++++++++++++++++++++
6 files changed, 249 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc5fb19e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6604783..0d08cce 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
2.0.13:
* Round up time deltas lower than 1ms in BulkLoader (CASSANDRA-8645)
+ * Add batch remove iterator to ABSC (CASSANDRA-8414)
2.0.12:
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc5fb19e/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
index 389e0f8..8d553be 100644
--- a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.filter.ColumnSlice;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.utils.Allocator;
+import org.apache.cassandra.utils.BatchRemoveIterator;
/**
* A ColumnFamily backed by an ArrayList.
@@ -54,14 +55,14 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
{
super(metadata);
this.reversed = reversed;
- this.columns = new ArrayList<Column>();
+ this.columns = new ArrayList<>();
}
private ArrayBackedSortedColumns(Collection<Column> columns, CFMetaData metadata, boolean reversed)
{
super(metadata);
this.reversed = reversed;
- this.columns = new ArrayList<Column>(columns);
+ this.columns = new ArrayList<>(columns);
}
public ColumnFamily.Factory getFactory()
@@ -292,6 +293,90 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
return new SlicesIterator(columns, getComparator(), slices, !reversed);
}
+ @Override
+ public BatchRemoveIterator<Column> batchRemoveIterator()
+ {
+ return new BatchRemoveIterator<Column>()
+ {
+ private Iterator<Column> iter = iterator();
+ private BitSet removedIndexes = new BitSet(columns.size());
+ private int idx = -1;
+ private boolean shouldCallNext = true;
+ private boolean isCommitted = false;
+ private boolean removedAnything = false;
+
+ public void commit()
+ {
+ if (isCommitted)
+ throw new IllegalStateException();
+ isCommitted = true;
+
+ if (!removedAnything)
+ return;
+
+ // the lowest index both not visited and known to be not removed
+ int keepIdx = removedIndexes.nextClearBit(0);
+ // the running total of kept items
+ int resultLength = 0;
+ // start from the first not-removed cell, and shift left.
+ int removeIdx = removedIndexes.nextSetBit(keepIdx + 1);
+ while (removeIdx >= 0)
+ {
+ int length = removeIdx - keepIdx;
+ if (length > 0)
+ {
+ copy(keepIdx, resultLength, length);
+ resultLength += length;
+ }
+ keepIdx = removedIndexes.nextClearBit(removeIdx + 1);
+ if (keepIdx < 0)
+ keepIdx = columns.size();
+ removeIdx = removedIndexes.nextSetBit(keepIdx + 1);
+ }
+ // Copy everything after the last deleted column
+ int length = columns.size() - keepIdx;
+ if (length > 0)
+ {
+ copy(keepIdx, resultLength, length);
+ resultLength += length;
+ }
+
+ columns.subList(resultLength, columns.size()).clear();
+ }
+
+ private void copy(int src, int dst, int len)
+ {
+ // [src, src+len) and [dst, dst+len) might overlap, but it's okay because we're going from left to right
+ assert dst <= src : "dst must not be greater than src";
+
+ if (dst < src)
+ Collections.copy(columns.subList(dst, dst + len), columns.subList(src, src + len));
+ }
+
+ public boolean hasNext()
+ {
+ return iter.hasNext();
+ }
+
+ public Column next()
+ {
+ idx++;
+ shouldCallNext = false;
+ return iter.next();
+ }
+
+ public void remove()
+ {
+ if (shouldCallNext)
+ throw new IllegalStateException();
+
+ removedIndexes.set(reversed ? columns.size() - idx - 1 : idx);
+ removedAnything = true;
+ shouldCallNext = true;
+ }
+ };
+ }
+
private static class SlicesIterator extends AbstractIterator<Column>
{
private final List<Column> list;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc5fb19e/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java
index 7edf825..19f8c16 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -532,6 +532,38 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
return ByteBuffer.wrap(out.getData(), 0, out.getLength());
}
+
+ /**
+ * @return an iterator where the removes are carried out once everything has been iterated
+ */
+ public BatchRemoveIterator<Column> batchRemoveIterator()
+ {
+ // Default implementation is the ordinary iterator
+ return new BatchRemoveIterator<Column>()
+ {
+ private final Iterator<Column> iter = iterator();
+
+ public void commit()
+ {
+ }
+
+ public boolean hasNext()
+ {
+ return iter.hasNext();
+ }
+
+ public Column next()
+ {
+ return iter.next();
+ }
+
+ public void remove()
+ {
+ iter.remove();
+ }
+ };
+ }
+
public abstract static class Factory <T extends ColumnFamily>
{
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc5fb19e/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index e936473..34d3f1d 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -953,7 +953,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
*/
public static long removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer)
{
- Iterator<Column> iter = cf.iterator();
+ BatchRemoveIterator<Column> iter = cf.batchRemoveIterator();
DeletionInfo.InOrderTester tester = cf.inOrderDeletionTester();
boolean hasDroppedColumns = !cf.metadata.getDroppedColumns().isEmpty();
long removedBytes = 0;
@@ -971,6 +971,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
removedBytes += c.dataSize();
}
}
+ iter.commit();
return removedBytes;
}
@@ -993,10 +994,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
if (cf == null || cf.metadata.getDroppedColumns().isEmpty())
return;
- Iterator<Column> iter = cf.iterator();
+ BatchRemoveIterator<Column> iter = cf.batchRemoveIterator();
while (iter.hasNext())
if (isDroppedColumn(iter.next(), metadata))
iter.remove();
+ iter.commit();
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc5fb19e/src/java/org/apache/cassandra/utils/BatchRemoveIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/BatchRemoveIterator.java b/src/java/org/apache/cassandra/utils/BatchRemoveIterator.java
new file mode 100644
index 0000000..4377426
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/BatchRemoveIterator.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.util.Iterator;
+
+/**
+ * Iterator that allows us to more efficiently remove many items
+ */
+public interface BatchRemoveIterator<T> extends Iterator<T>
+{
+ /**
+ * Commits the remove operations in this batch iterator. After this no more
+ * deletes can be made. Any further calls to remove() or commit() will throw IllegalStateException.
+ */
+ void commit();
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc5fb19e/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java b/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
index 06e2e75..90cd70f 100644
--- a/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
+++ b/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
@@ -28,10 +28,12 @@ import org.junit.Test;
import static org.junit.Assert.*;
import com.google.common.base.Functions;
+import com.google.common.collect.Sets;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.utils.BatchRemoveIterator;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.db.filter.ColumnSlice;
import org.apache.cassandra.utils.HeapAllocator;
@@ -193,4 +195,95 @@ public class ArrayBackedSortedColumnsTest extends SchemaLoader
iter.remove();
assertTrue(!iter.hasNext());
}
+
+ @Test(expected = IllegalStateException.class)
+ public void testBatchRemoveTwice()
+ {
+ ColumnFamily map = ArrayBackedSortedColumns.factory.create(metadata(), false);
+ map.addColumn(new Column(ByteBufferUtil.bytes(1)), HeapAllocator.instance);
+ map.addColumn(new Column(ByteBufferUtil.bytes(2)), HeapAllocator.instance);
+
+ BatchRemoveIterator<Column> batchIter = map.batchRemoveIterator();
+ batchIter.next();
+ batchIter.remove();
+ batchIter.remove();
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testBatchCommitTwice()
+ {
+ ColumnFamily map = ArrayBackedSortedColumns.factory.create(metadata(), false);
+ map.addColumn(new Column(ByteBufferUtil.bytes(1)), HeapAllocator.instance);
+ map.addColumn(new Column(ByteBufferUtil.bytes(2)), HeapAllocator.instance);
+
+ BatchRemoveIterator<Column> batchIter = map.batchRemoveIterator();
+ batchIter.next();
+ batchIter.remove();
+ batchIter.commit();
+ batchIter.commit();
+ }
+
+ @Test
+ public void testBatchRemove()
+ {
+ testBatchRemoveInternal(false);
+ testBatchRemoveInternal(true);
+ }
+
+ public void testBatchRemoveInternal(boolean reversed)
+ {
+ ColumnFamily map = ArrayBackedSortedColumns.factory.create(metadata(), reversed);
+ int[] values = new int[]{ 1, 2, 3, 5 };
+
+ for (int i = 0; i < values.length; ++i)
+ map.addColumn(new Column(ByteBufferUtil.bytes(values[reversed ? values.length - 1 - i : i])), HeapAllocator.instance);
+
+ BatchRemoveIterator<Column> batchIter = map.batchRemoveIterator();
+ batchIter.next();
+ batchIter.remove();
+ batchIter.next();
+ batchIter.remove();
+
+ assertEquals("1st column before commit", 1, map.iterator().next().name().getInt(0));
+
+ batchIter.commit();
+
+ assertEquals("1st column after commit", 3, map.iterator().next().name().getInt(0));
+ }
+
+ @Test
+ public void testBatchRemoveCopy()
+ {
+ // Test delete some random columns and check the result
+ ColumnFamily map = ArrayBackedSortedColumns.factory.create(metadata(), false);
+ int n = 127;
+ int[] values = new int[n];
+ for (int i = 0; i < n; i++) values[i] = i;
+ Set<Integer> toRemove = Sets.newHashSet(3, 12, 13, 15, 58, 103, 112);
+
+ for (int value : values)
+ map.addColumn(new Column(ByteBufferUtil.bytes(value)), HeapAllocator.instance);
+
+ BatchRemoveIterator<Column> batchIter = map.batchRemoveIterator();
+ while (batchIter.hasNext())
+ if (toRemove.contains(batchIter.next().name().getInt(0)))
+ batchIter.remove();
+
+ batchIter.commit();
+
+ int expected = 0;
+
+ while (toRemove.contains(expected))
+ expected++;
+
+ for (Column column : map)
+ {
+ assertEquals(expected, column.name().getInt(0));
+ expected++;
+ while (toRemove.contains(expected))
+ expected++;
+ }
+
+ assertEquals(expected, n);
+ }
}