You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/01/22 01:22:48 UTC
[1/3] cassandra git commit: Add batch remove iterator to ABSC
Repository: cassandra
Updated Branches:
refs/heads/trunk 39108df7c -> 2b4cd105e
Add batch remove iterator to ABSC
patch by Jimmy Mårdell; reviewed by Richard Low for CASSANDRA-8414
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cc5fb19e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cc5fb19e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cc5fb19e
Branch: refs/heads/trunk
Commit: cc5fb19e5c864110b798375f43ec1597904d03ab
Parents: ae380da
Author: Jimmy Mårdell <ya...@spotify.com>
Authored: Thu Jan 22 02:16:10 2015 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Thu Jan 22 02:16:10 2015 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/db/ArrayBackedSortedColumns.java | 89 ++++++++++++++++++-
.../org/apache/cassandra/db/ColumnFamily.java | 32 +++++++
.../apache/cassandra/db/ColumnFamilyStore.java | 6 +-
.../cassandra/utils/BatchRemoveIterator.java | 32 +++++++
.../db/ArrayBackedSortedColumnsTest.java | 93 ++++++++++++++++++++
6 files changed, 249 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc5fb19e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6604783..0d08cce 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
2.0.13:
* Round up time deltas lower than 1ms in BulkLoader (CASSANDRA-8645)
+ * Add batch remove iterator to ABSC (CASSANDRA-8414)
2.0.12:
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc5fb19e/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
index 389e0f8..8d553be 100644
--- a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.filter.ColumnSlice;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.utils.Allocator;
+import org.apache.cassandra.utils.BatchRemoveIterator;
/**
* A ColumnFamily backed by an ArrayList.
@@ -54,14 +55,14 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
{
super(metadata);
this.reversed = reversed;
- this.columns = new ArrayList<Column>();
+ this.columns = new ArrayList<>();
}
private ArrayBackedSortedColumns(Collection<Column> columns, CFMetaData metadata, boolean reversed)
{
super(metadata);
this.reversed = reversed;
- this.columns = new ArrayList<Column>(columns);
+ this.columns = new ArrayList<>(columns);
}
public ColumnFamily.Factory getFactory()
@@ -292,6 +293,90 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
return new SlicesIterator(columns, getComparator(), slices, !reversed);
}
+ @Override
+ public BatchRemoveIterator<Column> batchRemoveIterator()
+ {
+ return new BatchRemoveIterator<Column>()
+ {
+ private Iterator<Column> iter = iterator();
+ private BitSet removedIndexes = new BitSet(columns.size());
+ private int idx = -1;
+ private boolean shouldCallNext = true;
+ private boolean isCommitted = false;
+ private boolean removedAnything = false;
+
+ public void commit()
+ {
+ if (isCommitted)
+ throw new IllegalStateException();
+ isCommitted = true;
+
+ if (!removedAnything)
+ return;
+
+ // the lowest index both not visited and known to be not removed
+ int keepIdx = removedIndexes.nextClearBit(0);
+ // the running total of kept items
+ int resultLength = 0;
+ // start from the first not-removed cell, and shift left.
+ int removeIdx = removedIndexes.nextSetBit(keepIdx + 1);
+ while (removeIdx >= 0)
+ {
+ int length = removeIdx - keepIdx;
+ if (length > 0)
+ {
+ copy(keepIdx, resultLength, length);
+ resultLength += length;
+ }
+ keepIdx = removedIndexes.nextClearBit(removeIdx + 1);
+ if (keepIdx < 0)
+ keepIdx = columns.size();
+ removeIdx = removedIndexes.nextSetBit(keepIdx + 1);
+ }
+ // Copy everything after the last deleted column
+ int length = columns.size() - keepIdx;
+ if (length > 0)
+ {
+ copy(keepIdx, resultLength, length);
+ resultLength += length;
+ }
+
+ columns.subList(resultLength, columns.size()).clear();
+ }
+
+ private void copy(int src, int dst, int len)
+ {
+ // [src, src+len) and [dst, dst+len) might overlap, but it's okay because we're going from left to right
+ assert dst <= src : "dst must not be greater than src";
+
+ if (dst < src)
+ Collections.copy(columns.subList(dst, dst + len), columns.subList(src, src + len));
+ }
+
+ public boolean hasNext()
+ {
+ return iter.hasNext();
+ }
+
+ public Column next()
+ {
+ idx++;
+ shouldCallNext = false;
+ return iter.next();
+ }
+
+ public void remove()
+ {
+ if (shouldCallNext)
+ throw new IllegalStateException();
+
+ removedIndexes.set(reversed ? columns.size() - idx - 1 : idx);
+ removedAnything = true;
+ shouldCallNext = true;
+ }
+ };
+ }
+
private static class SlicesIterator extends AbstractIterator<Column>
{
private final List<Column> list;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc5fb19e/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java
index 7edf825..19f8c16 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -532,6 +532,38 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
return ByteBuffer.wrap(out.getData(), 0, out.getLength());
}
+
+ /**
+ * @return an iterator where the removes are carried out once everything has been iterated
+ */
+ public BatchRemoveIterator<Column> batchRemoveIterator()
+ {
+ // Default implementation is the ordinary iterator
+ return new BatchRemoveIterator<Column>()
+ {
+ private final Iterator<Column> iter = iterator();
+
+ public void commit()
+ {
+ }
+
+ public boolean hasNext()
+ {
+ return iter.hasNext();
+ }
+
+ public Column next()
+ {
+ return iter.next();
+ }
+
+ public void remove()
+ {
+ iter.remove();
+ }
+ };
+ }
+
public abstract static class Factory <T extends ColumnFamily>
{
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc5fb19e/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index e936473..34d3f1d 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -953,7 +953,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
*/
public static long removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer)
{
- Iterator<Column> iter = cf.iterator();
+ BatchRemoveIterator<Column> iter = cf.batchRemoveIterator();
DeletionInfo.InOrderTester tester = cf.inOrderDeletionTester();
boolean hasDroppedColumns = !cf.metadata.getDroppedColumns().isEmpty();
long removedBytes = 0;
@@ -971,6 +971,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
removedBytes += c.dataSize();
}
}
+ iter.commit();
return removedBytes;
}
@@ -993,10 +994,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
if (cf == null || cf.metadata.getDroppedColumns().isEmpty())
return;
- Iterator<Column> iter = cf.iterator();
+ BatchRemoveIterator<Column> iter = cf.batchRemoveIterator();
while (iter.hasNext())
if (isDroppedColumn(iter.next(), metadata))
iter.remove();
+ iter.commit();
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc5fb19e/src/java/org/apache/cassandra/utils/BatchRemoveIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/BatchRemoveIterator.java b/src/java/org/apache/cassandra/utils/BatchRemoveIterator.java
new file mode 100644
index 0000000..4377426
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/BatchRemoveIterator.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.util.Iterator;
+
+/**
+ * Iterator that allows us to more efficiently remove many items
+ */
+public interface BatchRemoveIterator<T> extends Iterator<T>
+{
+ /**
+ * Commits the remove operations in this batch iterator. After this no more
+ * deletes can be made. Any further calls to remove() or commit() will throw IllegalStateException.
+ */
+ void commit();
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc5fb19e/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java b/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
index 06e2e75..90cd70f 100644
--- a/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
+++ b/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
@@ -28,10 +28,12 @@ import org.junit.Test;
import static org.junit.Assert.*;
import com.google.common.base.Functions;
+import com.google.common.collect.Sets;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.utils.BatchRemoveIterator;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.db.filter.ColumnSlice;
import org.apache.cassandra.utils.HeapAllocator;
@@ -193,4 +195,95 @@ public class ArrayBackedSortedColumnsTest extends SchemaLoader
iter.remove();
assertTrue(!iter.hasNext());
}
+
+ @Test(expected = IllegalStateException.class)
+ public void testBatchRemoveTwice()
+ {
+ ColumnFamily map = ArrayBackedSortedColumns.factory.create(metadata(), false);
+ map.addColumn(new Column(ByteBufferUtil.bytes(1)), HeapAllocator.instance);
+ map.addColumn(new Column(ByteBufferUtil.bytes(2)), HeapAllocator.instance);
+
+ BatchRemoveIterator<Column> batchIter = map.batchRemoveIterator();
+ batchIter.next();
+ batchIter.remove();
+ batchIter.remove();
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testBatchCommitTwice()
+ {
+ ColumnFamily map = ArrayBackedSortedColumns.factory.create(metadata(), false);
+ map.addColumn(new Column(ByteBufferUtil.bytes(1)), HeapAllocator.instance);
+ map.addColumn(new Column(ByteBufferUtil.bytes(2)), HeapAllocator.instance);
+
+ BatchRemoveIterator<Column> batchIter = map.batchRemoveIterator();
+ batchIter.next();
+ batchIter.remove();
+ batchIter.commit();
+ batchIter.commit();
+ }
+
+ @Test
+ public void testBatchRemove()
+ {
+ testBatchRemoveInternal(false);
+ testBatchRemoveInternal(true);
+ }
+
+ public void testBatchRemoveInternal(boolean reversed)
+ {
+ ColumnFamily map = ArrayBackedSortedColumns.factory.create(metadata(), reversed);
+ int[] values = new int[]{ 1, 2, 3, 5 };
+
+ for (int i = 0; i < values.length; ++i)
+ map.addColumn(new Column(ByteBufferUtil.bytes(values[reversed ? values.length - 1 - i : i])), HeapAllocator.instance);
+
+ BatchRemoveIterator<Column> batchIter = map.batchRemoveIterator();
+ batchIter.next();
+ batchIter.remove();
+ batchIter.next();
+ batchIter.remove();
+
+ assertEquals("1st column before commit", 1, map.iterator().next().name().getInt(0));
+
+ batchIter.commit();
+
+ assertEquals("1st column after commit", 3, map.iterator().next().name().getInt(0));
+ }
+
+ @Test
+ public void testBatchRemoveCopy()
+ {
+ // Test delete some random columns and check the result
+ ColumnFamily map = ArrayBackedSortedColumns.factory.create(metadata(), false);
+ int n = 127;
+ int[] values = new int[n];
+ for (int i = 0; i < n; i++) values[i] = i;
+ Set<Integer> toRemove = Sets.newHashSet(3, 12, 13, 15, 58, 103, 112);
+
+ for (int value : values)
+ map.addColumn(new Column(ByteBufferUtil.bytes(value)), HeapAllocator.instance);
+
+ BatchRemoveIterator<Column> batchIter = map.batchRemoveIterator();
+ while (batchIter.hasNext())
+ if (toRemove.contains(batchIter.next().name().getInt(0)))
+ batchIter.remove();
+
+ batchIter.commit();
+
+ int expected = 0;
+
+ while (toRemove.contains(expected))
+ expected++;
+
+ for (Column column : map)
+ {
+ assertEquals(expected, column.name().getInt(0));
+ expected++;
+ while (toRemove.contains(expected))
+ expected++;
+ }
+
+ assertEquals(expected, n);
+ }
}
[3/3] cassandra git commit: Merge branch 'cassandra-2.1' into trunk
Posted by al...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Conflicts:
src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2b4cd105
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2b4cd105
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2b4cd105
Branch: refs/heads/trunk
Commit: 2b4cd105e8854543e1962888a2cb2fe9fccc4d17
Parents: 39108df 06cd494
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Thu Jan 22 03:22:34 2015 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Thu Jan 22 03:22:34 2015 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/db/ArrayBackedSortedColumns.java | 96 +++++++++++++++--
.../apache/cassandra/db/AtomicBTreeColumns.java | 5 +
.../org/apache/cassandra/db/ColumnFamily.java | 6 ++
.../apache/cassandra/db/ColumnFamilyStore.java | 7 +-
.../cassandra/utils/BatchRemoveIterator.java | 32 ++++++
.../db/ArrayBackedSortedColumnsTest.java | 103 ++++++++++++++++++-
7 files changed, 238 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b4cd105/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b4cd105/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
index e5a731c,64752e3..9574d09
--- a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
@@@ -32,8 -27,8 +27,9 @@@ import org.apache.cassandra.config.CFMe
import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.composites.Composite;
import org.apache.cassandra.db.filter.ColumnSlice;
+ import org.apache.cassandra.utils.BatchRemoveIterator;
import org.apache.cassandra.utils.memory.AbstractAllocator;
+import org.apache.cassandra.utils.SearchIterator;
/**
* A ColumnFamily backed by an array.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b4cd105/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b4cd105/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b4cd105/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b4cd105/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
index 46fe812,18851d4..0fdabe9
--- a/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
+++ b/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
@@@ -27,33 -25,19 +27,36 @@@ import org.junit.Test
import static org.junit.Assert.*;
+ import com.google.common.collect.Sets;
+
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.config.Schema;
- import org.apache.cassandra.exceptions.ConfigurationException;
- import org.apache.cassandra.locator.SimpleStrategy;
- import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.db.composites.*;
import org.apache.cassandra.db.filter.ColumnSlice;
import org.apache.cassandra.db.marshal.Int32Type;
-import org.apache.cassandra.utils.BatchRemoveIterator;
++import org.apache.cassandra.exceptions.ConfigurationException;
++import org.apache.cassandra.locator.SimpleStrategy;
+ import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.SearchIterator;
++import org.apache.cassandra.utils.BatchRemoveIterator;
-public class ArrayBackedSortedColumnsTest extends SchemaLoader
+public class ArrayBackedSortedColumnsTest
{
+ private static final String KEYSPACE1 = "ArrayBackedSortedColumnsTest";
+ private static final String CF_STANDARD1 = "Standard1";
+
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
+ {
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE1,
+ SimpleStrategy.class,
+ KSMetaData.optsWithRF(1),
+ SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1));
+ }
+
@Test
public void testAdd()
{
[2/3] cassandra git commit: Merge branch 'cassandra-2.0' into
cassandra-2.1
Posted by al...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1
Conflicts:
CHANGES.txt
src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
src/java/org/apache/cassandra/db/ColumnFamilyStore.java
test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/06cd494c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/06cd494c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/06cd494c
Branch: refs/heads/trunk
Commit: 06cd494c1496ec96886ed41ff3207847631986c9
Parents: 0c2eaa9 cc5fb19
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Thu Jan 22 03:17:55 2015 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Thu Jan 22 03:17:55 2015 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/db/ArrayBackedSortedColumns.java | 96 +++++++++++++++++--
.../apache/cassandra/db/AtomicBTreeColumns.java | 5 +
.../org/apache/cassandra/db/ColumnFamily.java | 6 ++
.../apache/cassandra/db/ColumnFamilyStore.java | 7 +-
.../cassandra/utils/BatchRemoveIterator.java | 32 +++++++
.../db/ArrayBackedSortedColumnsTest.java | 99 +++++++++++++++++++-
7 files changed, 236 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/06cd494c/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 9cd8189,0d08cce..a94ca04
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,70 -1,9 +1,71 @@@
-2.0.13:
- * Round up time deltas lower than 1ms in BulkLoader (CASSANDRA-8645)
+2.1.3
+ * Duplicate rows returned when in clause has repeated values (CASSANDRA-6707)
+ * Add tooling to detect hot partitions (CASSANDRA-7974)
+ * Fix cassandra-stress user-mode truncation of partition generation (CASSANDRA-8608)
+ * Only stream from unrepaired sstables during inc repair (CASSANDRA-8267)
+ * Don't allow starting multiple inc repairs on the same sstables (CASSANDRA-8316)
+ * Invalidate prepared BATCH statements when related tables
+ or keyspaces are dropped (CASSANDRA-8652)
+ * Fix missing results in secondary index queries on collections
+ with ALLOW FILTERING (CASSANDRA-8421)
+ * Expose EstimatedHistogram metrics for range slices (CASSANDRA-8627)
+ * (cqlsh) Escape clqshrc passwords properly (CASSANDRA-8618)
+ * Fix NPE when passing wrong argument in ALTER TABLE statement (CASSANDRA-8355)
+ * Pig: Refactor and deprecate CqlStorage (CASSANDRA-8599)
+ * Don't reuse the same cleanup strategy for all sstables (CASSANDRA-8537)
+ * Fix case-sensitivity of index name on CREATE and DROP INDEX
+ statements (CASSANDRA-8365)
+ * Better detection/logging for corruption in compressed sstables (CASSANDRA-8192)
+ * Use the correct repairedAt value when closing writer (CASSANDRA-8570)
+ * (cqlsh) Handle a schema mismatch being detected on startup (CASSANDRA-8512)
+ * Properly calculate expected write size during compaction (CASSANDRA-8532)
+ * Invalidate affected prepared statements when a table's columns
+ are altered (CASSANDRA-7910)
+ * Stress - user defined writes should populate sequentally (CASSANDRA-8524)
+ * Fix regression in SSTableRewriter causing some rows to become unreadable
+ during compaction (CASSANDRA-8429)
+ * Run major compactions for repaired/unrepaired in parallel (CASSANDRA-8510)
+ * (cqlsh) Fix compression options in DESCRIBE TABLE output when compression
+ is disabled (CASSANDRA-8288)
+ * (cqlsh) Fix DESCRIBE output after keyspaces are altered (CASSANDRA-7623)
+ * Make sure we set lastCompactedKey correctly (CASSANDRA-8463)
+ * (cqlsh) Fix output of CONSISTENCY command (CASSANDRA-8507)
+ * (cqlsh) Fixed the handling of LIST statements (CASSANDRA-8370)
+ * Make sstablescrub check leveled manifest again (CASSANDRA-8432)
+ * Check first/last keys in sstable when giving out positions (CASSANDRA-8458)
+ * Disable mmap on Windows (CASSANDRA-6993)
+ * Add missing ConsistencyLevels to cassandra-stress (CASSANDRA-8253)
+ * Add auth support to cassandra-stress (CASSANDRA-7985)
+ * Fix ArrayIndexOutOfBoundsException when generating error message
+ for some CQL syntax errors (CASSANDRA-8455)
+ * Scale memtable slab allocation logarithmically (CASSANDRA-7882)
+ * cassandra-stress simultaneous inserts over same seed (CASSANDRA-7964)
+ * Reduce cassandra-stress sampling memory requirements (CASSANDRA-7926)
+ * Ensure memtable flush cannot expire commit log entries from its future (CASSANDRA-8383)
+ * Make read "defrag" async to reclaim memtables (CASSANDRA-8459)
+ * Remove tmplink files for offline compactions (CASSANDRA-8321)
+ * Reduce maxHintsInProgress (CASSANDRA-8415)
+ * BTree updates may call provided update function twice (CASSANDRA-8018)
+ * Release sstable references after anticompaction (CASSANDRA-8386)
+ * Handle abort() in SSTableRewriter properly (CASSANDRA-8320)
+ * Fix high size calculations for prepared statements (CASSANDRA-8231)
+ * Centralize shared executors (CASSANDRA-8055)
+ * Fix filtering for CONTAINS (KEY) relations on frozen collection
+ clustering columns when the query is restricted to a single
+ partition (CASSANDRA-8203)
+ * Do more aggressive entire-sstable TTL expiry checks (CASSANDRA-8243)
+ * Add more log info if readMeter is null (CASSANDRA-8238)
+ * add check of the system wall clock time at startup (CASSANDRA-8305)
+ * Support for frozen collections (CASSANDRA-7859)
+ * Fix overflow on histogram computation (CASSANDRA-8028)
+ * Have paxos reuse the timestamp generation of normal queries (CASSANDRA-7801)
+ * Fix incremental repair not remove parent session on remote (CASSANDRA-8291)
+ * Improve JBOD disk utilization (CASSANDRA-7386)
+ * Log failed host when preparing incremental repair (CASSANDRA-8228)
+ * Force config client mode in CQLSSTableWriter (CASSANDRA-8281)
+Merged from 2.0:
+ * Add batch remove iterator to ABSC (CASSANDRA-8414)
-
-
-2.0.12:
+ * Round up time deltas lower than 1ms in BulkLoader (CASSANDRA-8645)
* Use more efficient slice size for querying internal secondary
index tables (CASSANDRA-8550)
* Fix potentially returning deleted rows with range tombstone (CASSANDRA-8558)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/06cd494c/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
index b5ed8d2,8d553be..64752e3
--- a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
@@@ -17,42 -17,31 +17,38 @@@
*/
package org.apache.cassandra.db;
- import java.util.AbstractCollection;
- import java.util.Arrays;
- import java.util.Collection;
- import java.util.Comparator;
- import java.util.Iterator;
-import java.nio.ByteBuffer;
+ import java.util.*;
import com.google.common.base.Function;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
- import net.nicoulaj.compilecommand.annotations.Inline;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.Composite;
import org.apache.cassandra.db.filter.ColumnSlice;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.utils.Allocator;
+ import org.apache.cassandra.utils.BatchRemoveIterator;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
/**
- * A ColumnFamily backed by an ArrayList.
+ * A ColumnFamily backed by an array.
* This implementation is not synchronized and should only be used when
* thread-safety is not required. This implementation makes sense when the
- * main operations performed are iterating over the map and adding columns
+ * main operations performed are iterating over the cells and adding cells
* (especially if insertion is in sorted order).
*/
-public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
+public class ArrayBackedSortedColumns extends ColumnFamily
{
+ private static final Cell[] EMPTY_ARRAY = new Cell[0];
+ private static final int MINIMAL_CAPACITY = 10;
+
private final boolean reversed;
- private final ArrayList<Column> columns;
+
+ private DeletionInfo deletionInfo;
+ private Cell[] cells;
+ private int size;
+ private int sortedSize;
+ private volatile boolean isSorted;
public static final ColumnFamily.Factory<ArrayBackedSortedColumns> factory = new Factory<ArrayBackedSortedColumns>()
{
@@@ -114,15 -80,15 +110,103 @@@
return reversed;
}
- private Comparator<ByteBuffer> internalComparator()
++ public BatchRemoveIterator<Cell> batchRemoveIterator()
++ {
++ maybeSortCells();
++
++ return new BatchRemoveIterator<Cell>()
++ {
++ private final Iterator<Cell> iter = iterator();
++ private BitSet removedIndexes = new BitSet(size);
++ private int idx = -1;
++ private boolean shouldCallNext = false;
++ private boolean isCommitted = false;
++ private boolean removedAnything = false;
++
++ public void commit()
++ {
++ if (isCommitted)
++ throw new IllegalStateException();
++ isCommitted = true;
++
++ if (!removedAnything)
++ return;
++
++ // the lowest index both not visited and known to be not removed
++ int keepIdx = removedIndexes.nextClearBit(0);
++ // the running total of kept items
++ int resultLength = 0;
++ // start from the first not-removed cell, and shift left.
++ int removeIdx = removedIndexes.nextSetBit(keepIdx + 1);
++ while (removeIdx >= 0)
++ {
++ int length = removeIdx - keepIdx;
++ if (length > 0)
++ {
++ copy(keepIdx, resultLength, length);
++ resultLength += length;
++ }
++ keepIdx = removedIndexes.nextClearBit(removeIdx + 1);
++ if (keepIdx < 0)
++ keepIdx = size;
++ removeIdx = removedIndexes.nextSetBit(keepIdx + 1);
++ }
++ // Copy everything after the last deleted column
++ int length = size - keepIdx;
++ if (length > 0)
++ {
++ copy(keepIdx, resultLength, length);
++ resultLength += length;
++ }
++
++ for (int i = resultLength; i < size; i++)
++ cells[i] = null;
++
++ size = sortedSize = resultLength;
++ }
++
++ private void copy(int src, int dst, int len)
++ {
++ // [src, src+len) and [dst, dst+len) might overlap but it's okay because we're going from left to right
++ assert dst <= src : "dst must not be greater than src";
++
++ if (dst < src)
++ System.arraycopy(cells, src, cells, dst, len);
++ }
++
++ public boolean hasNext()
++ {
++ return iter.hasNext();
++ }
++
++ public Cell next()
++ {
++ idx++;
++ shouldCallNext = false;
++ return iter.next();
++ }
++
++ public void remove()
++ {
++ if (shouldCallNext)
++ throw new IllegalStateException();
++
++ removedIndexes.set(reversed ? size - idx - 1 : idx);
++ removedAnything = true;
++ shouldCallNext = true;
++ }
++ };
++ }
++
+ private Comparator<Composite> internalComparator()
{
- return reversed ? getComparator().reverseComparator : getComparator();
+ return reversed ? getComparator().reverseComparator() : getComparator();
}
- public Column getColumn(ByteBuffer name)
+ private void maybeSortCells()
{
- int pos = binarySearch(name);
- return pos >= 0 ? columns.get(pos) : null;
+ if (!isSorted)
+ sortCells();
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/06cd494c/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
index dc2b5ee,0000000..47f0b85
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
@@@ -1,559 -1,0 +1,564 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.util.AbstractCollection;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterators;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.filter.ColumnSlice;
+import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.btree.BTree;
+import org.apache.cassandra.utils.btree.UpdateFunction;
+import org.apache.cassandra.utils.concurrent.Locks;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.HeapAllocator;
+import org.apache.cassandra.utils.memory.MemtableAllocator;
+import org.apache.cassandra.utils.memory.NativePool;
+
+import static org.apache.cassandra.db.index.SecondaryIndexManager.Updater;
+
+/**
+ * A thread-safe and atomic ISortedColumns implementation.
+ * Operations (in particular addAll) on this implemenation are atomic and
+ * isolated (in the sense of ACID). Typically a addAll is guaranteed that no
+ * other thread can see the state where only parts but not all columns have
+ * been added.
+ * <p/>
+ * WARNING: removing element through getSortedColumns().iterator() is *not* supported
+ */
+public class AtomicBTreeColumns extends ColumnFamily
+{
+ static final long EMPTY_SIZE = ObjectSizes.measure(new AtomicBTreeColumns(CFMetaData.IndexCf, null))
+ + ObjectSizes.measure(new Holder(null, null));
+
+ // Reserved values for wasteTracker field. These values must not be consecutive (see avoidReservedValues)
+ private static final int TRACKER_NEVER_WASTED = 0;
+ private static final int TRACKER_PESSIMISTIC_LOCKING = Integer.MAX_VALUE;
+
+ // The granularity with which we track wasted allocation/work; we round up
+ private static final int ALLOCATION_GRANULARITY_BYTES = 1024;
+ // The number of bytes we have to waste in excess of our acceptable realtime rate of waste (defined below)
+ private static final long EXCESS_WASTE_BYTES = 10 * 1024 * 1024L;
+ private static final int EXCESS_WASTE_OFFSET = (int) (EXCESS_WASTE_BYTES / ALLOCATION_GRANULARITY_BYTES);
+ // Note this is a shift, because dividing a long time and then picking the low 32 bits doesn't give correct rollover behavior
+ private static final int CLOCK_SHIFT = 17;
+ // CLOCK_GRANULARITY = 1^9ns >> CLOCK_SHIFT == 132us == (1/7.63)ms
+
+ /**
+ * (clock + allocation) granularity are combined to give us an acceptable (waste) allocation rate that is defined by
+ * the passage of real time of ALLOCATION_GRANULARITY_BYTES/CLOCK_GRANULARITY, or in this case 7.63Kb/ms, or 7.45Mb/s
+ *
+ * in wasteTracker we maintain within EXCESS_WASTE_OFFSET before the current time; whenever we waste bytes
+ * we increment the current value if it is within this window, and set it to the min of the window plus our waste
+ * otherwise.
+ */
+ private volatile int wasteTracker = TRACKER_NEVER_WASTED;
+
+ private static final AtomicIntegerFieldUpdater<AtomicBTreeColumns> wasteTrackerUpdater = AtomicIntegerFieldUpdater.newUpdater(AtomicBTreeColumns.class, "wasteTracker");
+
+ private static final Function<Cell, CellName> NAME = new Function<Cell, CellName>()
+ {
+ public CellName apply(Cell column)
+ {
+ return column.name();
+ }
+ };
+
+ public static final Factory<AtomicBTreeColumns> factory = new Factory<AtomicBTreeColumns>()
+ {
+ public AtomicBTreeColumns create(CFMetaData metadata, boolean insertReversed, int initialCapacity)
+ {
+ if (insertReversed)
+ throw new IllegalArgumentException();
+ return new AtomicBTreeColumns(metadata);
+ }
+ };
+
+ private static final DeletionInfo LIVE = DeletionInfo.live();
+ // This is a small optimization: DeletionInfo is mutable, but we know that we will always copy it in that class,
+ // so we can safely alias one DeletionInfo.live() reference and avoid some allocations.
+ private static final Holder EMPTY = new Holder(BTree.empty(), LIVE);
+
+ private volatile Holder ref;
+
+ private static final AtomicReferenceFieldUpdater<AtomicBTreeColumns, Holder> refUpdater = AtomicReferenceFieldUpdater.newUpdater(AtomicBTreeColumns.class, Holder.class, "ref");
+
+ private AtomicBTreeColumns(CFMetaData metadata)
+ {
+ this(metadata, EMPTY);
+ }
+
+ private AtomicBTreeColumns(CFMetaData metadata, Holder holder)
+ {
+ super(metadata);
+ this.ref = holder;
+ }
+
+ public Factory getFactory()
+ {
+ return factory;
+ }
+
+ public ColumnFamily cloneMe()
+ {
+ return new AtomicBTreeColumns(metadata, ref);
+ }
+
+ public DeletionInfo deletionInfo()
+ {
+ return ref.deletionInfo;
+ }
+
+ public void delete(DeletionTime delTime)
+ {
+ delete(new DeletionInfo(delTime));
+ }
+
+ protected void delete(RangeTombstone tombstone)
+ {
+ delete(new DeletionInfo(tombstone, getComparator()));
+ }
+
+ public void delete(DeletionInfo info)
+ {
+ if (info.isLive())
+ return;
+
+ // Keeping deletion info for max markedForDeleteAt value
+ while (true)
+ {
+ Holder current = ref;
+ DeletionInfo curDelInfo = current.deletionInfo;
+ DeletionInfo newDelInfo = info.mayModify(curDelInfo) ? curDelInfo.copy().add(info) : curDelInfo;
+ if (refUpdater.compareAndSet(this, current, current.with(newDelInfo)))
+ break;
+ }
+ }
+
+ public void setDeletionInfo(DeletionInfo newInfo)
+ {
+ ref = ref.with(newInfo);
+ }
+
+ public void purgeTombstones(int gcBefore)
+ {
+ while (true)
+ {
+ Holder current = ref;
+ if (!current.deletionInfo.hasPurgeableTombstones(gcBefore))
+ break;
+
+ DeletionInfo purgedInfo = current.deletionInfo.copy();
+ purgedInfo.purge(gcBefore);
+ if (refUpdater.compareAndSet(this, current, current.with(purgedInfo)))
+ break;
+ }
+ }
+
+ /**
+ * This is only called by Memtable.resolve, so only AtomicBTreeColumns needs to implement it.
+ *
+ * @return the difference in size seen after merging the given columns
+ */
+ public Pair<Long, Long> addAllWithSizeDelta(final ColumnFamily cm, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
+ {
+ ColumnUpdater updater = new ColumnUpdater(this, cm.metadata, allocator, writeOp, indexer);
+ DeletionInfo inputDeletionInfoCopy = null;
+
+ boolean monitorOwned = false;
+ try
+ {
+ if (usePessimisticLocking())
+ {
+ Locks.monitorEnterUnsafe(this);
+ monitorOwned = true;
+ }
+ while (true)
+ {
+ Holder current = ref;
+ updater.ref = current;
+ updater.reset();
+
+ DeletionInfo deletionInfo;
+ if (cm.deletionInfo().mayModify(current.deletionInfo))
+ {
+ if (inputDeletionInfoCopy == null)
+ inputDeletionInfoCopy = cm.deletionInfo().copy(HeapAllocator.instance);
+
+ deletionInfo = current.deletionInfo.copy().add(inputDeletionInfoCopy);
+ updater.allocated(deletionInfo.unsharedHeapSize() - current.deletionInfo.unsharedHeapSize());
+ }
+ else
+ {
+ deletionInfo = current.deletionInfo;
+ }
+
+ Object[] tree = BTree.update(current.tree, metadata.comparator.columnComparator(Memtable.MEMORY_POOL instanceof NativePool), cm, cm.getColumnCount(), true, updater);
+
+ if (tree != null && refUpdater.compareAndSet(this, current, new Holder(tree, deletionInfo)))
+ {
+ indexer.updateRowLevelIndexes();
+ updater.finish();
+ return Pair.create(updater.dataSize, updater.colUpdateTimeDelta);
+ }
+ else if (!monitorOwned)
+ {
+ boolean shouldLock = usePessimisticLocking();
+ if (!shouldLock)
+ {
+ shouldLock = updateWastedAllocationTracker(updater.heapSize);
+ }
+ if (shouldLock)
+ {
+ Locks.monitorEnterUnsafe(this);
+ monitorOwned = true;
+ }
+ }
+ }
+ }
+ finally
+ {
+ if (monitorOwned)
+ Locks.monitorExitUnsafe(this);
+ }
+ }
+
+ boolean usePessimisticLocking()
+ {
+ return wasteTracker == TRACKER_PESSIMISTIC_LOCKING;
+ }
+
+ /**
+ * Update the wasted allocation tracker state based on newly wasted allocation information
+ *
+ * @param wastedBytes the number of bytes wasted by this thread
+ * @return true if the caller should now proceed with pessimistic locking because the waste limit has been reached
+ */
+ private boolean updateWastedAllocationTracker(long wastedBytes) {
+ // Early check for huge allocation that exceeds the limit
+ if (wastedBytes < EXCESS_WASTE_BYTES)
+ {
+ // We round up to ensure work < granularity are still accounted for
+ int wastedAllocation = ((int) (wastedBytes + ALLOCATION_GRANULARITY_BYTES - 1)) / ALLOCATION_GRANULARITY_BYTES;
+
+ int oldTrackerValue;
+ while (TRACKER_PESSIMISTIC_LOCKING != (oldTrackerValue = wasteTracker))
+ {
+ // Note this time value has an arbitrary offset, but is a constant rate 32 bit counter (that may wrap)
+ int time = (int) (System.nanoTime() >>> CLOCK_SHIFT);
+ int delta = oldTrackerValue - time;
+ if (oldTrackerValue == TRACKER_NEVER_WASTED || delta >= 0 || delta < -EXCESS_WASTE_OFFSET)
+ delta = -EXCESS_WASTE_OFFSET;
+ delta += wastedAllocation;
+ if (delta >= 0)
+ break;
+ if (wasteTrackerUpdater.compareAndSet(this, oldTrackerValue, avoidReservedValues(time + delta)))
+ return false;
+ }
+ }
+ // We have definitely reached our waste limit so set the state if it isn't already
+ wasteTrackerUpdater.set(this, TRACKER_PESSIMISTIC_LOCKING);
+ // And tell the caller to proceed with pessimistic locking
+ return true;
+ }
+
+ private static int avoidReservedValues(int wasteTracker)
+ {
+ if (wasteTracker == TRACKER_NEVER_WASTED || wasteTracker == TRACKER_PESSIMISTIC_LOCKING)
+ return wasteTracker + 1;
+ return wasteTracker;
+ }
+
+ // no particular reason not to implement these next methods, we just haven't needed them yet
+
+ public void addColumn(Cell column)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void maybeAppendColumn(Cell cell, DeletionInfo.InOrderTester tester, int gcBefore)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void addAll(ColumnFamily cf)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void clear()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public Cell getColumn(CellName name)
+ {
+ return (Cell) BTree.find(ref.tree, asymmetricComparator(), name);
+ }
+
+ private Comparator<Object> asymmetricComparator()
+ {
+ return metadata.comparator.asymmetricColumnComparator(Memtable.MEMORY_POOL instanceof NativePool);
+ }
+
+ public Iterable<CellName> getColumnNames()
+ {
+ return collection(false, NAME);
+ }
+
+ public Collection<Cell> getSortedColumns()
+ {
+ return collection(true, Functions.<Cell>identity());
+ }
+
+ public Collection<Cell> getReverseSortedColumns()
+ {
+ return collection(false, Functions.<Cell>identity());
+ }
+
+ private <V> Collection<V> collection(final boolean forwards, final Function<Cell, V> f)
+ {
+ final Holder ref = this.ref;
+ return new AbstractCollection<V>()
+ {
+ public Iterator<V> iterator()
+ {
+ return Iterators.transform(BTree.<Cell>slice(ref.tree, forwards), f);
+ }
+
+ public int size()
+ {
+ return BTree.slice(ref.tree, true).count();
+ }
+ };
+ }
+
+ public int getColumnCount()
+ {
+ return BTree.slice(ref.tree, true).count();
+ }
+
+ public boolean hasColumns()
+ {
+ return !BTree.isEmpty(ref.tree);
+ }
+
+ public Iterator<Cell> iterator(ColumnSlice[] slices)
+ {
+ return slices.length == 1
+ ? slice(ref.tree, asymmetricComparator(), slices[0].start, slices[0].finish, true)
+ : new SliceIterator(ref.tree, asymmetricComparator(), true, slices);
+ }
+
+ public Iterator<Cell> reverseIterator(ColumnSlice[] slices)
+ {
+ return slices.length == 1
+ ? slice(ref.tree, asymmetricComparator(), slices[0].finish, slices[0].start, false)
+ : new SliceIterator(ref.tree, asymmetricComparator(), false, slices);
+ }
+
+ public boolean isInsertReversed()
+ {
+ return false;
+ }
+
++ public BatchRemoveIterator<Cell> batchRemoveIterator()
++ {
++ throw new UnsupportedOperationException();
++ }
++
+ private static final class Holder
+ {
+ final DeletionInfo deletionInfo;
+ // the btree of columns
+ final Object[] tree;
+
+ Holder(Object[] tree, DeletionInfo deletionInfo)
+ {
+ this.tree = tree;
+ this.deletionInfo = deletionInfo;
+ }
+
+ Holder with(DeletionInfo info)
+ {
+ return new Holder(this.tree, info);
+ }
+ }
+
+ // the function we provide to the btree utilities to perform any column replacements
+ private static final class ColumnUpdater implements UpdateFunction<Cell>
+ {
+ final AtomicBTreeColumns updating;
+ final CFMetaData metadata;
+ final MemtableAllocator allocator;
+ final OpOrder.Group writeOp;
+ final Updater indexer;
+ Holder ref;
+ long dataSize;
+ long heapSize;
+ long colUpdateTimeDelta = Long.MAX_VALUE;
+ final MemtableAllocator.DataReclaimer reclaimer;
+ List<Cell> inserted; // TODO: replace with walk of aborted BTree
+
+ private ColumnUpdater(AtomicBTreeColumns updating, CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
+ {
+ this.updating = updating;
+ this.allocator = allocator;
+ this.writeOp = writeOp;
+ this.indexer = indexer;
+ this.metadata = metadata;
+ this.reclaimer = allocator.reclaimer();
+ }
+
+ public Cell apply(Cell insert)
+ {
+ indexer.insert(insert);
+ insert = insert.localCopy(metadata, allocator, writeOp);
+ this.dataSize += insert.cellDataSize();
+ this.heapSize += insert.unsharedHeapSizeExcludingData();
+ if (inserted == null)
+ inserted = new ArrayList<>();
+ inserted.add(insert);
+ return insert;
+ }
+
+ public Cell apply(Cell existing, Cell update)
+ {
+ Cell reconciled = existing.reconcile(update);
+ indexer.update(existing, reconciled);
+ if (existing != reconciled)
+ {
+ reconciled = reconciled.localCopy(metadata, allocator, writeOp);
+ dataSize += reconciled.cellDataSize() - existing.cellDataSize();
+ heapSize += reconciled.unsharedHeapSizeExcludingData() - existing.unsharedHeapSizeExcludingData();
+ if (inserted == null)
+ inserted = new ArrayList<>();
+ inserted.add(reconciled);
+ discard(existing);
+ //Getting the minimum delta for an update containing multiple columns
+ colUpdateTimeDelta = Math.min(Math.abs(existing.timestamp() - update.timestamp()), colUpdateTimeDelta);
+ }
+ return reconciled;
+ }
+
+ protected void reset()
+ {
+ this.dataSize = 0;
+ this.heapSize = 0;
+ if (inserted != null)
+ {
+ for (Cell cell : inserted)
+ abort(cell);
+ inserted.clear();
+ }
+ reclaimer.cancel();
+ }
+
+ protected void abort(Cell abort)
+ {
+ reclaimer.reclaimImmediately(abort);
+ }
+
+ protected void discard(Cell discard)
+ {
+ reclaimer.reclaim(discard);
+ }
+
+ public boolean abortEarly()
+ {
+ return updating.ref != ref;
+ }
+
+ public void allocated(long heapSize)
+ {
+ this.heapSize += heapSize;
+ }
+
+ protected void finish()
+ {
+ allocator.onHeap().allocate(heapSize, writeOp);
+ reclaimer.commit();
+ }
+ }
+
+ private static class SliceIterator extends AbstractIterator<Cell>
+ {
+ private final Object[] btree;
+ private final boolean forwards;
+ private final Comparator<Object> comparator;
+ private final ColumnSlice[] slices;
+
+ private int idx = 0;
+ private Iterator<Cell> currentSlice;
+
+ SliceIterator(Object[] btree, Comparator<Object> comparator, boolean forwards, ColumnSlice[] slices)
+ {
+ this.btree = btree;
+ this.comparator = comparator;
+ this.slices = slices;
+ this.forwards = forwards;
+ }
+
+ protected Cell computeNext()
+ {
+ while (currentSlice != null || idx < slices.length)
+ {
+ if (currentSlice == null)
+ {
+ ColumnSlice slice = slices[idx++];
+ if (forwards)
+ currentSlice = slice(btree, comparator, slice.start, slice.finish, true);
+ else
+ currentSlice = slice(btree, comparator, slice.finish, slice.start, false);
+ }
+
+ if (currentSlice.hasNext())
+ return currentSlice.next();
+
+ currentSlice = null;
+ }
+
+ return endOfData();
+ }
+ }
+
+ private static Iterator<Cell> slice(Object[] btree, Comparator<Object> comparator, Composite start, Composite finish, boolean forwards)
+ {
+ return BTree.slice(btree,
+ comparator,
+ start.isEmpty() ? null : start,
+ true,
+ finish.isEmpty() ? null : finish,
+ true,
+ forwards);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/06cd494c/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamily.java
index 483ecb0,19f8c16..f21d161
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@@ -514,6 -532,38 +514,12 @@@ public abstract class ColumnFamily impl
return ByteBuffer.wrap(out.getData(), 0, out.getLength());
}
+
+ /**
+ * @return an iterator where the removes are carried out once everything has been iterated
+ */
- public BatchRemoveIterator<Column> batchRemoveIterator()
- {
- // Default implementation is the ordinary iterator
- return new BatchRemoveIterator<Column>()
- {
- private final Iterator<Column> iter = iterator();
-
- public void commit()
- {
- }
-
- public boolean hasNext()
- {
- return iter.hasNext();
- }
-
- public Column next()
- {
- return iter.next();
- }
-
- public void remove()
- {
- iter.remove();
- }
- };
- }
++ public abstract BatchRemoveIterator<Cell> batchRemoveIterator();
+
public abstract static class Factory <T extends ColumnFamily>
{
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/06cd494c/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 0c95b0e,34d3f1d..3822648
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -1237,14 -951,15 +1237,14 @@@ public class ColumnFamilyStore implemen
* columns that have been dropped from the schema (for CQL3 tables only).
* @return the updated ColumnFamily
*/
- public static long removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer)
+ public static ColumnFamily removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer)
{
- Iterator<Cell> iter = cf.iterator();
- BatchRemoveIterator<Column> iter = cf.batchRemoveIterator();
++ BatchRemoveIterator<Cell> iter = cf.batchRemoveIterator();
DeletionInfo.InOrderTester tester = cf.inOrderDeletionTester();
boolean hasDroppedColumns = !cf.metadata.getDroppedColumns().isEmpty();
- long removedBytes = 0;
while (iter.hasNext())
{
- Column c = iter.next();
+ Cell c = iter.next();
// remove columns if
// (a) the column itself is gcable or
// (b) the column is shadowed by a CF tombstone
@@@ -1253,10 -968,16 +1253,10 @@@
{
iter.remove();
indexer.remove(c);
- removedBytes += c.dataSize();
}
}
-
+ iter.commit();
- return removedBytes;
- }
-
- public static long removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore)
- {
- return removeDeletedColumnsOnly(cf, gcBefore, SecondaryIndexManager.nullUpdater);
+ return cf;
}
// returns true if
@@@ -1273,7 -994,7 +1273,7 @@@
if (cf == null || cf.metadata.getDroppedColumns().isEmpty())
return;
- Iterator<Cell> iter = cf.iterator();
- BatchRemoveIterator<Column> iter = cf.batchRemoveIterator();
++ BatchRemoveIterator<Cell> iter = cf.batchRemoveIterator();
while (iter.hasNext())
if (isDroppedColumn(iter.next(), metadata))
iter.remove();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/06cd494c/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
index 83a58e4,90cd70f..18851d4
--- a/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
+++ b/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
@@@ -25,13 -27,16 +25,16 @@@ import org.junit.Test
import static org.junit.Assert.*;
-import com.google.common.base.Functions;
+ import com.google.common.collect.Sets;
+
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
- import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.db.composites.*;
+import org.apache.cassandra.db.filter.ColumnSlice;
+import org.apache.cassandra.db.marshal.Int32Type;
+ import org.apache.cassandra.utils.BatchRemoveIterator;
+ import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.db.filter.ColumnSlice;
-import org.apache.cassandra.utils.HeapAllocator;
public class ArrayBackedSortedColumnsTest extends SchemaLoader
{
@@@ -265,4 -195,95 +268,98 @@@
iter.remove();
assertTrue(!iter.hasNext());
}
+
+ @Test(expected = IllegalStateException.class)
+ public void testBatchRemoveTwice()
+ {
++ CellNameType type = new SimpleDenseCellNameType(Int32Type.instance);
+ ColumnFamily map = ArrayBackedSortedColumns.factory.create(metadata(), false);
- map.addColumn(new Column(ByteBufferUtil.bytes(1)), HeapAllocator.instance);
- map.addColumn(new Column(ByteBufferUtil.bytes(2)), HeapAllocator.instance);
++ map.addColumn(new BufferCell(type.makeCellName(1)));
++ map.addColumn(new BufferCell(type.makeCellName(2)));
+
- BatchRemoveIterator<Column> batchIter = map.batchRemoveIterator();
++ BatchRemoveIterator<Cell> batchIter = map.batchRemoveIterator();
+ batchIter.next();
+ batchIter.remove();
+ batchIter.remove();
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testBatchCommitTwice()
+ {
++ CellNameType type = new SimpleDenseCellNameType(Int32Type.instance);
+ ColumnFamily map = ArrayBackedSortedColumns.factory.create(metadata(), false);
- map.addColumn(new Column(ByteBufferUtil.bytes(1)), HeapAllocator.instance);
- map.addColumn(new Column(ByteBufferUtil.bytes(2)), HeapAllocator.instance);
++ map.addColumn(new BufferCell(type.makeCellName(1)));
++ map.addColumn(new BufferCell(type.makeCellName(2)));
+
- BatchRemoveIterator<Column> batchIter = map.batchRemoveIterator();
++ BatchRemoveIterator<Cell> batchIter = map.batchRemoveIterator();
+ batchIter.next();
+ batchIter.remove();
+ batchIter.commit();
+ batchIter.commit();
+ }
+
+ @Test
+ public void testBatchRemove()
+ {
+ testBatchRemoveInternal(false);
+ testBatchRemoveInternal(true);
+ }
+
+ public void testBatchRemoveInternal(boolean reversed)
+ {
++ CellNameType type = new SimpleDenseCellNameType(Int32Type.instance);
+ ColumnFamily map = ArrayBackedSortedColumns.factory.create(metadata(), reversed);
+ int[] values = new int[]{ 1, 2, 3, 5 };
+
+ for (int i = 0; i < values.length; ++i)
- map.addColumn(new Column(ByteBufferUtil.bytes(values[reversed ? values.length - 1 - i : i])), HeapAllocator.instance);
++ map.addColumn(new BufferCell(type.makeCellName(values[reversed ? values.length - 1 - i : i])));
+
- BatchRemoveIterator<Column> batchIter = map.batchRemoveIterator();
++ BatchRemoveIterator<Cell> batchIter = map.batchRemoveIterator();
+ batchIter.next();
+ batchIter.remove();
+ batchIter.next();
+ batchIter.remove();
+
- assertEquals("1st column before commit", 1, map.iterator().next().name().getInt(0));
++ assertEquals("1st column before commit", 1, map.iterator().next().name().toByteBuffer().getInt(0));
+
+ batchIter.commit();
+
- assertEquals("1st column after commit", 3, map.iterator().next().name().getInt(0));
++ assertEquals("1st column after commit", 3, map.iterator().next().name().toByteBuffer().getInt(0));
+ }
+
+ @Test
+ public void testBatchRemoveCopy()
+ {
+ // Test delete some random columns and check the result
++ CellNameType type = new SimpleDenseCellNameType(Int32Type.instance);
+ ColumnFamily map = ArrayBackedSortedColumns.factory.create(metadata(), false);
+ int n = 127;
+ int[] values = new int[n];
- for (int i = 0; i < n; i++) values[i] = i;
++ for (int i = 0; i < n; i++)
++ values[i] = i;
+ Set<Integer> toRemove = Sets.newHashSet(3, 12, 13, 15, 58, 103, 112);
+
+ for (int value : values)
- map.addColumn(new Column(ByteBufferUtil.bytes(value)), HeapAllocator.instance);
++ map.addColumn(new BufferCell(type.makeCellName(value)));
+
- BatchRemoveIterator<Column> batchIter = map.batchRemoveIterator();
++ BatchRemoveIterator<Cell> batchIter = map.batchRemoveIterator();
+ while (batchIter.hasNext())
- if (toRemove.contains(batchIter.next().name().getInt(0)))
++ if (toRemove.contains(batchIter.next().name().toByteBuffer().getInt(0)))
+ batchIter.remove();
+
+ batchIter.commit();
+
+ int expected = 0;
-
+ while (toRemove.contains(expected))
+ expected++;
+
- for (Column column : map)
++ for (Cell column : map)
+ {
- assertEquals(expected, column.name().getInt(0));
++ assertEquals(expected, column.name().toByteBuffer().getInt(0));
+ expected++;
+ while (toRemove.contains(expected))
+ expected++;
+ }
-
+ assertEquals(expected, n);
+ }
}