You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/02/06 02:58:39 UTC

git commit: Replace UnsortedColumns usage with ArrayBackedSortedColumns

Updated Branches:
  refs/heads/trunk fe4247e58 -> 812504713


Replace UnsortedColumns usage with ArrayBackedSortedColumns

patch by Aleksey Yeschenko; reviewed by Sylvain Lebresne for
CASSANDRA-6630


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/81250471
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/81250471
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/81250471

Branch: refs/heads/trunk
Commit: 812504713523c2b8fbff394fbf4448ea30b5e4a3
Parents: fe4247e
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Thu Feb 6 04:57:56 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Thu Feb 6 04:57:56 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cql3/statements/UpdateStatement.java        |  2 +-
 .../cassandra/db/ArrayBackedSortedColumns.java  | 14 ------
 .../apache/cassandra/db/AtomicBTreeColumns.java |  9 +---
 .../org/apache/cassandra/db/ColumnFamily.java   |  6 ++-
 src/java/org/apache/cassandra/db/Mutation.java  |  2 +-
 .../apache/cassandra/service/paxos/Commit.java  |  5 ++-
 .../service/paxos/PrepareResponse.java          | 10 +++--
 .../cassandra/thrift/CassandraServer.java       |  4 +-
 .../apache/cassandra/db/CounterCacheTest.java   |  2 +-
 .../cassandra/db/CounterMutationTest.java       | 46 +++++++++++++++-----
 11 files changed, 56 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/81250471/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7d628b5..a139fdc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -25,6 +25,7 @@
  * CF id is changed to be non-deterministic. Data dir/key cache are created
    uniquely for CF id (CASSANDRA-5202)
  * New counters implementation (CASSANDRA-6504)
+ * Replace UnsortedColumns usage with ArrayBackedSortedColumns (CASSANDRA-6630)
 
 
 2.0.6

http://git-wip-us.apache.org/repos/asf/cassandra/blob/81250471/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index 1102c09..6ed0e33 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -99,7 +99,7 @@ public class UpdateStatement extends ModificationStatement
     public ColumnFamily updateForKey(ByteBuffer key, Composite prefix, UpdateParameters params)
     throws InvalidRequestException
     {
-        ColumnFamily cf = UnsortedColumns.factory.create(cfm);
+        ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfm);
         addUpdateForKey(cf, key, prefix, params);
         return cf;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/81250471/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
index 7bcbe25..b81e403 100644
--- a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
@@ -91,16 +91,6 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
         return pos >= 0 ? cells.get(pos) : null;
     }
 
-    /**
-     * AddColumn throws an exception if the cell added does not sort after
-     * the last cell in the map.
-     * The reasoning is that this implementation can get slower if too much
-     * insertions are done in unsorted order and right now we only use it when
-     * *all* insertion (with this method) are done in sorted order. The
-     * assertion throwing is thus a protection against performance regression
-     * without knowing about (we can revisit that decision later if we have
-     * use cases where most insert are in sorted order but a few are not).
-     */
     public void addColumn(Cell cell, AbstractAllocator allocator)
     {
         if (cells.isEmpty())
@@ -109,11 +99,7 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
             return;
         }
 
-        // Fast path if inserting at the tail
         int c = internalComparator().compare(cells.get(getColumnCount() - 1).name(), cell.name());
-        // note that we want an assertion here (see addColumn javadoc), but we also want that if
-        // assertion are disabled, addColumn works correctly with unsorted input
-        assert c <= 0 : "Added cell does not sort as the " + (reversed ? "first" : "last") + " cell";
 
         if (c < 0)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/81250471/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
index fd7d4bc..c1c7b66 100644
--- a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
@@ -234,14 +234,7 @@ public class AtomicBTreeColumns extends ColumnFamily
     public Delta addAllWithSizeDelta(final ColumnFamily cm, AbstractAllocator allocator, Function<Cell, Cell> transformation, Updater indexer, Delta delta)
     {
         boolean transformed = false;
-        Collection<Cell> insert;
-        if (cm instanceof UnsortedColumns)
-        {
-            insert = transform(metadata.comparator.columnComparator(), cm, transformation, true);
-            transformed = true;
-        }
-        else
-            insert = cm.getSortedColumns();
+        Collection<Cell> insert = cm.getSortedColumns();
 
         while (true)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/81250471/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java
index 9d2856d..2df3fbf 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -491,7 +491,6 @@ public abstract class ColumnFamily implements Iterable<Cell>, IRowCacheEntry
         return builder.build();
     }
 
-    // Note: the returned ColumnFamily will be an UnsortedColumns.
     public static ColumnFamily fromBytes(ByteBuffer bytes)
     {
         if (bytes == null)
@@ -499,7 +498,10 @@ public abstract class ColumnFamily implements Iterable<Cell>, IRowCacheEntry
 
         try
         {
-            return serializer.deserialize(new DataInputStream(ByteBufferUtil.inputStream(bytes)), UnsortedColumns.factory, ColumnSerializer.Flag.LOCAL, MessagingService.current_version);
+            return serializer.deserialize(new DataInputStream(ByteBufferUtil.inputStream(bytes)),
+                                                              ArrayBackedSortedColumns.factory,
+                                                              ColumnSerializer.Flag.LOCAL,
+                                                              MessagingService.current_version);
         }
         catch (IOException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/81250471/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java
index 31d9503..ef9b02d 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -310,7 +310,7 @@ public class Mutation implements IMutation
 
         private ColumnFamily deserializeOneCf(DataInput in, int version, ColumnSerializer.Flag flag) throws IOException
         {
-            ColumnFamily cf = ColumnFamily.serializer.deserialize(in, UnsortedColumns.factory, flag, version);
+            ColumnFamily cf = ColumnFamily.serializer.deserialize(in, ArrayBackedSortedColumns.factory, flag, version);
             // We don't allow Mutation with null column family, so we should never get null back.
             assert cf != null;
             return cf;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/81250471/src/java/org/apache/cassandra/service/paxos/Commit.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/Commit.java b/src/java/org/apache/cassandra/service/paxos/Commit.java
index 23f35db..aae9b72 100644
--- a/src/java/org/apache/cassandra/service/paxos/Commit.java
+++ b/src/java/org/apache/cassandra/service/paxos/Commit.java
@@ -139,7 +139,10 @@ public class Commit
         {
             return new Commit(ByteBufferUtil.readWithShortLength(in),
                               UUIDSerializer.serializer.deserialize(in, version),
-                              ColumnFamily.serializer.deserialize(in, UnsortedColumns.factory, ColumnSerializer.Flag.LOCAL, version));
+                              ColumnFamily.serializer.deserialize(in,
+                                                                  ArrayBackedSortedColumns.factory,
+                                                                  ColumnSerializer.Flag.LOCAL,
+                                                                  version));
         }
 
         public long serializedSize(Commit commit, int version)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/81250471/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java b/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java
index d2bd835..14e0bc7 100644
--- a/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java
+++ b/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java
@@ -26,9 +26,9 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import org.apache.cassandra.db.ArrayBackedSortedColumns;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.ColumnSerializer;
-import org.apache.cassandra.db.UnsortedColumns;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.UUIDSerializer;
@@ -82,10 +82,14 @@ public class PrepareResponse
             return new PrepareResponse(success,
                                        new Commit(key,
                                                   UUIDSerializer.serializer.deserialize(in, version),
-                                                  ColumnFamily.serializer.deserialize(in, UnsortedColumns.factory, ColumnSerializer.Flag.LOCAL, version)),
+                                                  ColumnFamily.serializer.deserialize(in,
+                                                                                      ArrayBackedSortedColumns.factory,
+                                                                                      ColumnSerializer.Flag.LOCAL, version)),
                                        new Commit(key,
                                                   UUIDSerializer.serializer.deserialize(in, version),
-                                                  ColumnFamily.serializer.deserialize(in, UnsortedColumns.factory, ColumnSerializer.Flag.LOCAL, version)));
+                                                  ColumnFamily.serializer.deserialize(in,
+                                                                                      ArrayBackedSortedColumns.factory,
+                                                                                      ColumnSerializer.Flag.LOCAL, version)));
         }
 
         public long serializedSize(PrepareResponse response, int version)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/81250471/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index fe9dc3f..44fb22e 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -754,7 +754,7 @@ public class CassandraServer implements Cassandra.Iface
                 ThriftValidation.validateColumnData(metadata, null, column);
 
             CFMetaData cfm = Schema.instance.getCFMetaData(cState.getKeyspace(), column_family);
-            UnsortedColumns cfUpdates = UnsortedColumns.factory.create(cfm);
+            ColumnFamily cfUpdates = ArrayBackedSortedColumns.factory.create(cfm);
             for (Column column : updates)
                 cfUpdates.addColumn(cfm.comparator.cellFromByteBuffer(column.name), column.value, column.timestamp);
 
@@ -765,7 +765,7 @@ public class CassandraServer implements Cassandra.Iface
             }
             else
             {
-                cfExpected = TreeMapBackedSortedColumns.factory.create(cfm);
+                cfExpected = ArrayBackedSortedColumns.factory.create(cfm);
                 for (Column column : expected)
                     cfExpected.addColumn(cfm.comparator.cellFromByteBuffer(column.name), column.value, column.timestamp);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/81250471/test/unit/org/apache/cassandra/db/CounterCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CounterCacheTest.java b/test/unit/org/apache/cassandra/db/CounterCacheTest.java
index 78e7c80..a015a43 100644
--- a/test/unit/org/apache/cassandra/db/CounterCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/CounterCacheTest.java
@@ -74,7 +74,7 @@ public class CounterCacheTest extends SchemaLoader
         ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF);
         CacheService.instance.invalidateCounterCache();
 
-        ColumnFamily cells = UnsortedColumns.factory.create(cfs.metadata);
+        ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
         cells.addColumn(new CounterUpdateCell(cellname(1), 1L, FBUtilities.timestampMicros()));
         cells.addColumn(new CounterUpdateCell(cellname(2), 2L, FBUtilities.timestampMicros()));
         new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/81250471/test/unit/org/apache/cassandra/db/CounterMutationTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CounterMutationTest.java b/test/unit/org/apache/cassandra/db/CounterMutationTest.java
index 3676ef9..431531c 100644
--- a/test/unit/org/apache/cassandra/db/CounterMutationTest.java
+++ b/test/unit/org/apache/cassandra/db/CounterMutationTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.cassandra.db;
 
+import java.nio.ByteBuffer;
+
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
@@ -45,21 +47,21 @@ public class CounterMutationTest extends SchemaLoader
         cfs.truncateBlocking();
 
         // Do the initial update (+1)
-        ColumnFamily cells = UnsortedColumns.factory.create(cfs.metadata);
+        ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
         cells.addCounter(cellname(1), 1L);
         new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
         ColumnFamily current = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(bytes(1)), CF1, System.currentTimeMillis()));
         assertEquals(1L, CounterContext.instance().total(current.getColumn(cellname(1)).value()));
 
         // Make another increment (+2)
-        cells = UnsortedColumns.factory.create(cfs.metadata);
+        cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
         cells.addCounter(cellname(1), 2L);
         new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
         current = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(bytes(1)), CF1, System.currentTimeMillis()));
         assertEquals(3L, CounterContext.instance().total(current.getColumn(cellname(1)).value()));
 
         // Decrement to 0 (-3)
-        cells = UnsortedColumns.factory.create(cfs.metadata);
+        cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
         cells.addCounter(cellname(1), -3L);
         new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
         current = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(bytes(1)), CF1, System.currentTimeMillis()));
@@ -74,7 +76,7 @@ public class CounterMutationTest extends SchemaLoader
         cfs.truncateBlocking();
 
         // Do the initial update (+1, -1)
-        ColumnFamily cells = UnsortedColumns.factory.create(cfs.metadata);
+        ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
         cells.addCounter(cellname(1), 1L);
         cells.addCounter(cellname(2), -1L);
         new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
@@ -83,7 +85,7 @@ public class CounterMutationTest extends SchemaLoader
         assertEquals(-1L, CounterContext.instance().total(current.getColumn(cellname(2)).value()));
 
         // Make another increment (+2, -2)
-        cells = UnsortedColumns.factory.create(cfs.metadata);
+        cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
         cells.addCounter(cellname(1), 2L);
         cells.addCounter(cellname(2), -2L);
         new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
@@ -91,7 +93,7 @@ public class CounterMutationTest extends SchemaLoader
         assertEquals(3L, CounterContext.instance().total(current.getColumn(cellname(1)).value()));
 
         // Decrement to 0 (-3, +3)
-        cells = UnsortedColumns.factory.create(cfs.metadata);
+        cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
         cells.addCounter(cellname(1), -3L);
         cells.addCounter(cellname(2), 3L);
         new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
@@ -114,11 +116,11 @@ public class CounterMutationTest extends SchemaLoader
         cfs2.truncateBlocking();
 
         // Do the update (+1, -1), (+2, -2)
-        ColumnFamily cells1 = UnsortedColumns.factory.create(cfs1.metadata);
+        ColumnFamily cells1 = ArrayBackedSortedColumns.factory.create(cfs1.metadata);
         cells1.addCounter(cellname(1), 1L);
         cells1.addCounter(cellname(2), -1L);
 
-        ColumnFamily cells2 = UnsortedColumns.factory.create(cfs2.metadata);
+        ColumnFamily cells2 = ArrayBackedSortedColumns.factory.create(cfs2.metadata);
         cells2.addCounter(cellname(1), 2L);
         cells2.addCounter(cellname(2), -2L);
 
@@ -151,7 +153,7 @@ public class CounterMutationTest extends SchemaLoader
         cfs.truncateBlocking();
 
         // Do the initial update (+1, -1)
-        ColumnFamily cells = UnsortedColumns.factory.create(cfs.metadata);
+        ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
         cells.addCounter(cellname(1), 1L);
         cells.addCounter(cellname(2), 1L);
         new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
@@ -160,7 +162,7 @@ public class CounterMutationTest extends SchemaLoader
         assertEquals(1L, CounterContext.instance().total(current.getColumn(cellname(2)).value()));
 
         // Remove the first counter, increment the second counter
-        cells = UnsortedColumns.factory.create(cfs.metadata);
+        cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
         cells.addTombstone(cellname(1), (int) System.currentTimeMillis() / 1000, FBUtilities.timestampMicros());
         cells.addCounter(cellname(2), 1L);
         new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
@@ -169,7 +171,7 @@ public class CounterMutationTest extends SchemaLoader
         assertEquals(2L, CounterContext.instance().total(current.getColumn(cellname(2)).value()));
 
         // Increment the first counter, make sure it's still shadowed by the tombstone
-        cells = UnsortedColumns.factory.create(cfs.metadata);
+        cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
         cells.addCounter(cellname(1), 1L);
         new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
         current = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(bytes(1)), CF1, System.currentTimeMillis()));
@@ -184,7 +186,7 @@ public class CounterMutationTest extends SchemaLoader
         assertNull(current.getColumn(cellname(2)));
 
         // Increment both counters, ensure that both stay dead
-        cells = UnsortedColumns.factory.create(cfs.metadata);
+        cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
         cells.addCounter(cellname(1), 1L);
         cells.addCounter(cellname(2), 1L);
         new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
@@ -192,4 +194,24 @@ public class CounterMutationTest extends SchemaLoader
         assertNull(current.getColumn(cellname(1)));
         assertNull(current.getColumn(cellname(2)));
     }
+
+    @Test
+    public void testDuplicateCells() throws WriteTimeoutException
+    {
+        ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF1);
+        cfs.truncateBlocking();
+
+        ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
+        cells.addCounter(cellname(1), 1L);
+        cells.addCounter(cellname(1), 2L);
+        cells.addCounter(cellname(1), 3L);
+        cells.addCounter(cellname(1), 4L);
+        new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
+
+        ColumnFamily current = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(bytes(1)), CF1, System.currentTimeMillis()));
+        ByteBuffer context = current.getColumn(cellname(1)).value();
+        assertEquals(10L, CounterContext.instance().total(context));
+        assertEquals(ClockAndCount.create(1L, 10L), CounterContext.instance().getLocalClockAndCount(context));
+        assertEquals(ClockAndCount.create(1L, 10L), cfs.getCachedCounter(bytes(1), cellname(1)));
+    }
 }