You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/02/06 02:58:39 UTC
git commit: Replace UnsortedColumns usage with
ArrayBackedSortedColumns
Updated Branches:
refs/heads/trunk fe4247e58 -> 812504713
Replace UnsortedColumns usage with ArrayBackedSortedColumns
patch by Aleksey Yeschenko; reviewed by Sylvain Lebresne for
CASSANDRA-6630
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/81250471
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/81250471
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/81250471
Branch: refs/heads/trunk
Commit: 812504713523c2b8fbff394fbf4448ea30b5e4a3
Parents: fe4247e
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Thu Feb 6 04:57:56 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Thu Feb 6 04:57:56 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cql3/statements/UpdateStatement.java | 2 +-
.../cassandra/db/ArrayBackedSortedColumns.java | 14 ------
.../apache/cassandra/db/AtomicBTreeColumns.java | 9 +---
.../org/apache/cassandra/db/ColumnFamily.java | 6 ++-
src/java/org/apache/cassandra/db/Mutation.java | 2 +-
.../apache/cassandra/service/paxos/Commit.java | 5 ++-
.../service/paxos/PrepareResponse.java | 10 +++--
.../cassandra/thrift/CassandraServer.java | 4 +-
.../apache/cassandra/db/CounterCacheTest.java | 2 +-
.../cassandra/db/CounterMutationTest.java | 46 +++++++++++++++-----
11 files changed, 56 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/81250471/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7d628b5..a139fdc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -25,6 +25,7 @@
* CF id is changed to be non-deterministic. Data dir/key cache are created
uniquely for CF id (CASSANDRA-5202)
* New counters implementation (CASSANDRA-6504)
+ * Replace UnsortedColumns usage with ArrayBackedSortedColumns (CASSANDRA-6630)
2.0.6
http://git-wip-us.apache.org/repos/asf/cassandra/blob/81250471/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index 1102c09..6ed0e33 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -99,7 +99,7 @@ public class UpdateStatement extends ModificationStatement
public ColumnFamily updateForKey(ByteBuffer key, Composite prefix, UpdateParameters params)
throws InvalidRequestException
{
- ColumnFamily cf = UnsortedColumns.factory.create(cfm);
+ ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfm);
addUpdateForKey(cf, key, prefix, params);
return cf;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/81250471/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
index 7bcbe25..b81e403 100644
--- a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
@@ -91,16 +91,6 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
return pos >= 0 ? cells.get(pos) : null;
}
- /**
- * AddColumn throws an exception if the cell added does not sort after
- * the last cell in the map.
- * The reasoning is that this implementation can get slower if too much
- * insertions are done in unsorted order and right now we only use it when
- * *all* insertion (with this method) are done in sorted order. The
- * assertion throwing is thus a protection against performance regression
- * without knowing about (we can revisit that decision later if we have
- * use cases where most insert are in sorted order but a few are not).
- */
public void addColumn(Cell cell, AbstractAllocator allocator)
{
if (cells.isEmpty())
@@ -109,11 +99,7 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
return;
}
- // Fast path if inserting at the tail
int c = internalComparator().compare(cells.get(getColumnCount() - 1).name(), cell.name());
- // note that we want an assertion here (see addColumn javadoc), but we also want that if
- // assertion are disabled, addColumn works correctly with unsorted input
- assert c <= 0 : "Added cell does not sort as the " + (reversed ? "first" : "last") + " cell";
if (c < 0)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/81250471/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
index fd7d4bc..c1c7b66 100644
--- a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
@@ -234,14 +234,7 @@ public class AtomicBTreeColumns extends ColumnFamily
public Delta addAllWithSizeDelta(final ColumnFamily cm, AbstractAllocator allocator, Function<Cell, Cell> transformation, Updater indexer, Delta delta)
{
boolean transformed = false;
- Collection<Cell> insert;
- if (cm instanceof UnsortedColumns)
- {
- insert = transform(metadata.comparator.columnComparator(), cm, transformation, true);
- transformed = true;
- }
- else
- insert = cm.getSortedColumns();
+ Collection<Cell> insert = cm.getSortedColumns();
while (true)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/81250471/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java
index 9d2856d..2df3fbf 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -491,7 +491,6 @@ public abstract class ColumnFamily implements Iterable<Cell>, IRowCacheEntry
return builder.build();
}
- // Note: the returned ColumnFamily will be an UnsortedColumns.
public static ColumnFamily fromBytes(ByteBuffer bytes)
{
if (bytes == null)
@@ -499,7 +498,10 @@ public abstract class ColumnFamily implements Iterable<Cell>, IRowCacheEntry
try
{
- return serializer.deserialize(new DataInputStream(ByteBufferUtil.inputStream(bytes)), UnsortedColumns.factory, ColumnSerializer.Flag.LOCAL, MessagingService.current_version);
+ return serializer.deserialize(new DataInputStream(ByteBufferUtil.inputStream(bytes)),
+ ArrayBackedSortedColumns.factory,
+ ColumnSerializer.Flag.LOCAL,
+ MessagingService.current_version);
}
catch (IOException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/81250471/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java
index 31d9503..ef9b02d 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -310,7 +310,7 @@ public class Mutation implements IMutation
private ColumnFamily deserializeOneCf(DataInput in, int version, ColumnSerializer.Flag flag) throws IOException
{
- ColumnFamily cf = ColumnFamily.serializer.deserialize(in, UnsortedColumns.factory, flag, version);
+ ColumnFamily cf = ColumnFamily.serializer.deserialize(in, ArrayBackedSortedColumns.factory, flag, version);
// We don't allow Mutation with null column family, so we should never get null back.
assert cf != null;
return cf;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/81250471/src/java/org/apache/cassandra/service/paxos/Commit.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/Commit.java b/src/java/org/apache/cassandra/service/paxos/Commit.java
index 23f35db..aae9b72 100644
--- a/src/java/org/apache/cassandra/service/paxos/Commit.java
+++ b/src/java/org/apache/cassandra/service/paxos/Commit.java
@@ -139,7 +139,10 @@ public class Commit
{
return new Commit(ByteBufferUtil.readWithShortLength(in),
UUIDSerializer.serializer.deserialize(in, version),
- ColumnFamily.serializer.deserialize(in, UnsortedColumns.factory, ColumnSerializer.Flag.LOCAL, version));
+ ColumnFamily.serializer.deserialize(in,
+ ArrayBackedSortedColumns.factory,
+ ColumnSerializer.Flag.LOCAL,
+ version));
}
public long serializedSize(Commit commit, int version)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/81250471/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java b/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java
index d2bd835..14e0bc7 100644
--- a/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java
+++ b/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java
@@ -26,9 +26,9 @@ import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.cassandra.db.ArrayBackedSortedColumns;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnSerializer;
-import org.apache.cassandra.db.UnsortedColumns;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.UUIDSerializer;
@@ -82,10 +82,14 @@ public class PrepareResponse
return new PrepareResponse(success,
new Commit(key,
UUIDSerializer.serializer.deserialize(in, version),
- ColumnFamily.serializer.deserialize(in, UnsortedColumns.factory, ColumnSerializer.Flag.LOCAL, version)),
+ ColumnFamily.serializer.deserialize(in,
+ ArrayBackedSortedColumns.factory,
+ ColumnSerializer.Flag.LOCAL, version)),
new Commit(key,
UUIDSerializer.serializer.deserialize(in, version),
- ColumnFamily.serializer.deserialize(in, UnsortedColumns.factory, ColumnSerializer.Flag.LOCAL, version)));
+ ColumnFamily.serializer.deserialize(in,
+ ArrayBackedSortedColumns.factory,
+ ColumnSerializer.Flag.LOCAL, version)));
}
public long serializedSize(PrepareResponse response, int version)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/81250471/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index fe9dc3f..44fb22e 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -754,7 +754,7 @@ public class CassandraServer implements Cassandra.Iface
ThriftValidation.validateColumnData(metadata, null, column);
CFMetaData cfm = Schema.instance.getCFMetaData(cState.getKeyspace(), column_family);
- UnsortedColumns cfUpdates = UnsortedColumns.factory.create(cfm);
+ ColumnFamily cfUpdates = ArrayBackedSortedColumns.factory.create(cfm);
for (Column column : updates)
cfUpdates.addColumn(cfm.comparator.cellFromByteBuffer(column.name), column.value, column.timestamp);
@@ -765,7 +765,7 @@ public class CassandraServer implements Cassandra.Iface
}
else
{
- cfExpected = TreeMapBackedSortedColumns.factory.create(cfm);
+ cfExpected = ArrayBackedSortedColumns.factory.create(cfm);
for (Column column : expected)
cfExpected.addColumn(cfm.comparator.cellFromByteBuffer(column.name), column.value, column.timestamp);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/81250471/test/unit/org/apache/cassandra/db/CounterCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CounterCacheTest.java b/test/unit/org/apache/cassandra/db/CounterCacheTest.java
index 78e7c80..a015a43 100644
--- a/test/unit/org/apache/cassandra/db/CounterCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/CounterCacheTest.java
@@ -74,7 +74,7 @@ public class CounterCacheTest extends SchemaLoader
ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF);
CacheService.instance.invalidateCounterCache();
- ColumnFamily cells = UnsortedColumns.factory.create(cfs.metadata);
+ ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
cells.addColumn(new CounterUpdateCell(cellname(1), 1L, FBUtilities.timestampMicros()));
cells.addColumn(new CounterUpdateCell(cellname(2), 2L, FBUtilities.timestampMicros()));
new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/81250471/test/unit/org/apache/cassandra/db/CounterMutationTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CounterMutationTest.java b/test/unit/org/apache/cassandra/db/CounterMutationTest.java
index 3676ef9..431531c 100644
--- a/test/unit/org/apache/cassandra/db/CounterMutationTest.java
+++ b/test/unit/org/apache/cassandra/db/CounterMutationTest.java
@@ -17,6 +17,8 @@
*/
package org.apache.cassandra.db;
+import java.nio.ByteBuffer;
+
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
@@ -45,21 +47,21 @@ public class CounterMutationTest extends SchemaLoader
cfs.truncateBlocking();
// Do the initial update (+1)
- ColumnFamily cells = UnsortedColumns.factory.create(cfs.metadata);
+ ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
cells.addCounter(cellname(1), 1L);
new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
ColumnFamily current = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(bytes(1)), CF1, System.currentTimeMillis()));
assertEquals(1L, CounterContext.instance().total(current.getColumn(cellname(1)).value()));
// Make another increment (+2)
- cells = UnsortedColumns.factory.create(cfs.metadata);
+ cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
cells.addCounter(cellname(1), 2L);
new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
current = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(bytes(1)), CF1, System.currentTimeMillis()));
assertEquals(3L, CounterContext.instance().total(current.getColumn(cellname(1)).value()));
// Decrement to 0 (-3)
- cells = UnsortedColumns.factory.create(cfs.metadata);
+ cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
cells.addCounter(cellname(1), -3L);
new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
current = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(bytes(1)), CF1, System.currentTimeMillis()));
@@ -74,7 +76,7 @@ public class CounterMutationTest extends SchemaLoader
cfs.truncateBlocking();
// Do the initial update (+1, -1)
- ColumnFamily cells = UnsortedColumns.factory.create(cfs.metadata);
+ ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
cells.addCounter(cellname(1), 1L);
cells.addCounter(cellname(2), -1L);
new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
@@ -83,7 +85,7 @@ public class CounterMutationTest extends SchemaLoader
assertEquals(-1L, CounterContext.instance().total(current.getColumn(cellname(2)).value()));
// Make another increment (+2, -2)
- cells = UnsortedColumns.factory.create(cfs.metadata);
+ cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
cells.addCounter(cellname(1), 2L);
cells.addCounter(cellname(2), -2L);
new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
@@ -91,7 +93,7 @@ public class CounterMutationTest extends SchemaLoader
assertEquals(3L, CounterContext.instance().total(current.getColumn(cellname(1)).value()));
// Decrement to 0 (-3, +3)
- cells = UnsortedColumns.factory.create(cfs.metadata);
+ cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
cells.addCounter(cellname(1), -3L);
cells.addCounter(cellname(2), 3L);
new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
@@ -114,11 +116,11 @@ public class CounterMutationTest extends SchemaLoader
cfs2.truncateBlocking();
// Do the update (+1, -1), (+2, -2)
- ColumnFamily cells1 = UnsortedColumns.factory.create(cfs1.metadata);
+ ColumnFamily cells1 = ArrayBackedSortedColumns.factory.create(cfs1.metadata);
cells1.addCounter(cellname(1), 1L);
cells1.addCounter(cellname(2), -1L);
- ColumnFamily cells2 = UnsortedColumns.factory.create(cfs2.metadata);
+ ColumnFamily cells2 = ArrayBackedSortedColumns.factory.create(cfs2.metadata);
cells2.addCounter(cellname(1), 2L);
cells2.addCounter(cellname(2), -2L);
@@ -151,7 +153,7 @@ public class CounterMutationTest extends SchemaLoader
cfs.truncateBlocking();
// Do the initial update (+1, -1)
- ColumnFamily cells = UnsortedColumns.factory.create(cfs.metadata);
+ ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
cells.addCounter(cellname(1), 1L);
cells.addCounter(cellname(2), 1L);
new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
@@ -160,7 +162,7 @@ public class CounterMutationTest extends SchemaLoader
assertEquals(1L, CounterContext.instance().total(current.getColumn(cellname(2)).value()));
// Remove the first counter, increment the second counter
- cells = UnsortedColumns.factory.create(cfs.metadata);
+ cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
cells.addTombstone(cellname(1), (int) System.currentTimeMillis() / 1000, FBUtilities.timestampMicros());
cells.addCounter(cellname(2), 1L);
new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
@@ -169,7 +171,7 @@ public class CounterMutationTest extends SchemaLoader
assertEquals(2L, CounterContext.instance().total(current.getColumn(cellname(2)).value()));
// Increment the first counter, make sure it's still shadowed by the tombstone
- cells = UnsortedColumns.factory.create(cfs.metadata);
+ cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
cells.addCounter(cellname(1), 1L);
new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
current = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(bytes(1)), CF1, System.currentTimeMillis()));
@@ -184,7 +186,7 @@ public class CounterMutationTest extends SchemaLoader
assertNull(current.getColumn(cellname(2)));
// Increment both counters, ensure that both stay dead
- cells = UnsortedColumns.factory.create(cfs.metadata);
+ cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
cells.addCounter(cellname(1), 1L);
cells.addCounter(cellname(2), 1L);
new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
@@ -192,4 +194,24 @@ public class CounterMutationTest extends SchemaLoader
assertNull(current.getColumn(cellname(1)));
assertNull(current.getColumn(cellname(2)));
}
+
+ @Test
+ public void testDuplicateCells() throws WriteTimeoutException
+ {
+ ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF1);
+ cfs.truncateBlocking();
+
+ ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
+ cells.addCounter(cellname(1), 1L);
+ cells.addCounter(cellname(1), 2L);
+ cells.addCounter(cellname(1), 3L);
+ cells.addCounter(cellname(1), 4L);
+ new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
+
+ ColumnFamily current = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(bytes(1)), CF1, System.currentTimeMillis()));
+ ByteBuffer context = current.getColumn(cellname(1)).value();
+ assertEquals(10L, CounterContext.instance().total(context));
+ assertEquals(ClockAndCount.create(1L, 10L), CounterContext.instance().getLocalClockAndCount(context));
+ assertEquals(ClockAndCount.create(1L, 10L), cfs.getCachedCounter(bytes(1), cellname(1)));
+ }
}