You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2014/06/16 22:12:49 UTC

[1/2] git commit: Fix memtable copy bug causing rangetombstones to be missed

Repository: cassandra
Updated Branches:
  refs/heads/trunk 45afe1d93 -> 5bd5e25fb


Fix memtable copy bug causing rangetombstones to be missed

patch by tjake; reviewed by benedict got CASSANDRA-7371


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a14a01c9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a14a01c9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a14a01c9

Branch: refs/heads/trunk
Commit: a14a01c924e4db8e084ba313149d35149dc53825
Parents: a9da3fd
Author: Jake Luciani <ja...@apache.org>
Authored: Mon Jun 16 16:10:06 2014 -0400
Committer: Jake Luciani <ja...@apache.org>
Committed: Mon Jun 16 16:10:06 2014 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +-
 .../cql3/statements/BatchStatement.java         |  17 ++-
 .../apache/cassandra/db/AtomicBTreeColumns.java |   8 +-
 .../org/apache/cassandra/db/DeletionInfo.java   |  12 ++
 .../apache/cassandra/db/RangeTombstoneList.java |  21 +++
 test/conf/cassandra.yaml                        |   2 +
 .../org/apache/cassandra/cql3/DeleteTest.java   | 148 +++++++++++++++++++
 7 files changed, 201 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a14a01c9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 35c02f9..fd7c62b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -25,7 +25,7 @@ Merged from 1.2:
  * Check internal addresses for seeds (CASSANDRA-6523)
  * Fix potential / by 0 in HHOM page size calculation (CASSANDRA-7354)
  * Use LOCAL_ONE for non-superuser auth queries (CASSANDRA-7328)
-
+ * Fix RangeTombstone copy bug (CASSANDRA-7371)
 
 2.1.0-rc1
  * Revert flush directory (CASSANDRA-6357)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a14a01c9/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index e513aef..3cec81b 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -23,6 +23,7 @@ import java.util.*;
 import com.google.common.base.Function;
 import com.google.common.collect.*;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.transport.Frame;
 import org.github.jamm.MemoryMeter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -122,7 +123,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
         return statements;
     }
 
-    private Collection<? extends IMutation> getMutations(BatchQueryOptions options, boolean local, long now)
+    private Collection<? extends IMutation> getMutations(BatchQueryOptions options, boolean local, long now, Frame sourceFrame)
     throws RequestExecutionException, RequestValidationException
     {
         Map<String, Map<ByteBuffer, IMutation>> mutations = new HashMap<>();
@@ -131,7 +132,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
             ModificationStatement statement = statements.get(i);
             QueryOptions statementOptions = options.forStatement(i);
             long timestamp = attrs.getTimestamp(now, statementOptions);
-            addStatementMutations(statement, statementOptions, local, timestamp, mutations);
+            addStatementMutations(statement, statementOptions, local, timestamp, mutations, sourceFrame);
         }
         return unzipMutations(mutations);
     }
@@ -152,7 +153,8 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
                                        QueryOptions options,
                                        boolean local,
                                        long now,
-                                       Map<String, Map<ByteBuffer, IMutation>> mutations)
+                                       Map<String, Map<ByteBuffer, IMutation>> mutations,
+                                       Frame sourceFrame)
     throws RequestExecutionException, RequestValidationException
     {
         String ksName = statement.keyspace();
@@ -177,6 +179,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
             if (mutation == null)
             {
                 mut = new Mutation(ksName, key);
+                mut.setSourceFrame(sourceFrame);
                 mutation = type == Type.COUNTER ? new CounterMutation(mut, options.getConsistency()) : mut;
                 ksMap.put(key, mutation);
             }
@@ -219,10 +222,10 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
 
     public ResultMessage execute(QueryState queryState, BatchQueryOptions options) throws RequestExecutionException, RequestValidationException
     {
-        return execute(options, false, options.getTimestamp(queryState));
+        return execute(queryState, options, false, options.getTimestamp(queryState));
     }
 
-    public ResultMessage execute(BatchQueryOptions options, boolean local, long now)
+    private ResultMessage execute(QueryState queryState, BatchQueryOptions options, boolean local, long now)
     throws RequestExecutionException, RequestValidationException
     {
         if (options.getConsistency() == null)
@@ -233,7 +236,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
         if (hasConditions)
             return executeWithConditions(options, now);
 
-        executeWithoutConditions(getMutations(options, local, now), options.getConsistency());
+        executeWithoutConditions(getMutations(options, local, now, queryState.getSourceFrame()), options.getConsistency());
         return new ResultMessage.Void();
     }
 
@@ -309,7 +312,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
     public ResultMessage executeInternal(QueryState queryState, QueryOptions options) throws RequestValidationException, RequestExecutionException
     {
         assert !hasConditions;
-        for (IMutation mutation : getMutations(BatchQueryOptions.withoutPerStatementVariables(options), true, queryState.getTimestamp()))
+        for (IMutation mutation : getMutations(BatchQueryOptions.withoutPerStatementVariables(options), true, queryState.getTimestamp(), queryState.getSourceFrame()))
         {
             // We don't use counters internally.
             assert mutation instanceof Mutation;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a14a01c9/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
index 27eb46d..0e38784 100644
--- a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.btree.BTree;
 import org.apache.cassandra.utils.btree.UpdateFunction;
 import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.HeapAllocator;
 import org.apache.cassandra.utils.memory.MemtableAllocator;
 
 import static org.apache.cassandra.db.index.SecondaryIndexManager.Updater;
@@ -163,6 +164,8 @@ public class AtomicBTreeColumns extends ColumnFamily
     public long addAllWithSizeDelta(final ColumnFamily cm, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
     {
         ColumnUpdater updater = new ColumnUpdater(this, cm.metadata, allocator, writeOp, indexer);
+        DeletionInfo inputDeletionInfoCopy = null;
+
         while (true)
         {
             Holder current = ref;
@@ -172,7 +175,10 @@ public class AtomicBTreeColumns extends ColumnFamily
             DeletionInfo deletionInfo;
             if (cm.deletionInfo().mayModify(current.deletionInfo))
             {
-                deletionInfo = current.deletionInfo.copy().add(cm.deletionInfo());
+                if (inputDeletionInfoCopy == null)
+                    inputDeletionInfoCopy = cm.deletionInfo().copy(HeapAllocator.instance);
+
+                deletionInfo = current.deletionInfo.copy().add(inputDeletionInfoCopy);
                 updater.allocated(deletionInfo.unsharedHeapSize() - current.deletionInfo.unsharedHeapSize());
             }
             else

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a14a01c9/src/java/org/apache/cassandra/db/DeletionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionInfo.java b/src/java/org/apache/cassandra/db/DeletionInfo.java
index a167b85..b8988ec 100644
--- a/src/java/org/apache/cassandra/db/DeletionInfo.java
+++ b/src/java/org/apache/cassandra/db/DeletionInfo.java
@@ -33,6 +33,8 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
+import org.apache.cassandra.utils.memory.MemtableAllocator;
 
 /**
  * A combination of a top-level (or row) tombstone and range tombstones describing the deletions
@@ -102,6 +104,16 @@ public class DeletionInfo implements IMeasurableMemory
         return new DeletionInfo(topLevel, ranges == null ? null : ranges.copy());
     }
 
+    public DeletionInfo copy(AbstractAllocator allocator)
+    {
+
+        RangeTombstoneList rangesCopy = null;
+        if (ranges != null)
+             rangesCopy = ranges.copy(allocator);
+
+        return new DeletionInfo(topLevel, rangesCopy);
+    }
+
     /**
      * Returns whether this DeletionInfo is live, that is deletes no columns.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a14a01c9/src/java/org/apache/cassandra/db/RangeTombstoneList.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstoneList.java b/src/java/org/apache/cassandra/db/RangeTombstoneList.java
index b06c520..757a1d0 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstoneList.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstoneList.java
@@ -37,6 +37,8 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
+import org.apache.cassandra.utils.memory.HeapPool;
 
 /**
  * Data structure holding the range tombstones of a ColumnFamily.
@@ -114,6 +116,25 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
                                       boundaryHeapSize, size);
     }
 
+    public RangeTombstoneList copy(AbstractAllocator allocator)
+    {
+        RangeTombstoneList copy =  new RangeTombstoneList(comparator,
+                                      new Composite[size],
+                                      new Composite[size],
+                                      Arrays.copyOf(markedAts, size),
+                                      Arrays.copyOf(delTimes, size),
+                                      boundaryHeapSize, size);
+
+
+        for (int i = 0; i < size; i++)
+        {
+            copy.starts[i] = starts[i].copy(null, allocator);
+            copy.ends[i] = ends[i].copy(null, allocator);
+        }
+
+        return copy;
+    }
+
     public void add(RangeTombstone tombstone)
     {
         add(tombstone.min, tombstone.max, tombstone.data.markedForDeleteAt, tombstone.data.localDeletionTime);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a14a01c9/test/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index b766a64..72a357d 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -11,6 +11,8 @@ partitioner: org.apache.cassandra.dht.ByteOrderedPartitioner
 listen_address: 127.0.0.1
 storage_port: 7010
 rpc_port: 9170
+start_native_transport: true
+native_transport_port: 9042
 column_index_size_in_kb: 4
 commitlog_directory: build/test/cassandra/commitlog
 saved_caches_directory: build/test/cassandra/saved_caches

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a14a01c9/test/unit/org/apache/cassandra/cql3/DeleteTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/DeleteTest.java b/test/unit/org/apache/cassandra/cql3/DeleteTest.java
new file mode 100644
index 0000000..3395dcc
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/DeleteTest.java
@@ -0,0 +1,148 @@
+package org.apache.cassandra.cql3;
+
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.service.EmbeddedCassandraService;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class DeleteTest extends SchemaLoader
+{
+
+    private static EmbeddedCassandraService cassandra;
+
+    private static Cluster cluster;
+    private static Session session;
+    private static PreparedStatement pstmtI;
+    private static PreparedStatement pstmtU;
+    private static PreparedStatement pstmtD;
+    private static PreparedStatement pstmt1;
+    private static PreparedStatement pstmt2;
+    private static PreparedStatement pstmt3;
+    private static PreparedStatement pstmt4;
+    private static PreparedStatement pstmt5;
+
+    @BeforeClass()
+    public static void setup() throws ConfigurationException, IOException
+    {
+
+        Schema.instance.clear(); // Schema are now written on disk and will be reloaded
+        cassandra = new EmbeddedCassandraService();
+        cassandra.start();
+
+        cluster = Cluster.builder().addContactPoint("127.0.0.1").withPort(DatabaseDescriptor.getNativeTransportPort()).build();
+        session = cluster.connect();
+
+        session.execute("drop keyspace if exists junit;");
+        session.execute("create keyspace junit WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 2 };");
+        session.execute("CREATE TABLE junit.tpc_base (\n" +
+                "  id int ,\n" +
+                "  cid int ,\n" +
+                "  val text ,\n" +
+                "  PRIMARY KEY ( ( id ), cid )\n" +
+                ");");
+        session.execute("CREATE TABLE junit.tpc_inherit_a (\n" +
+                "  id int ,\n" +
+                "  cid int ,\n" +
+                "  inh_a text ,\n" +
+                "  val text ,\n" +
+                "  PRIMARY KEY ( ( id ), cid )\n" +
+                ");");
+        session.execute("CREATE TABLE junit.tpc_inherit_b (\n" +
+                "  id int ,\n" +
+                "  cid int ,\n" +
+                "  inh_b text ,\n" +
+                "  val text ,\n" +
+                "  PRIMARY KEY ( ( id ), cid )\n" +
+                ");");
+        session.execute("CREATE TABLE junit.tpc_inherit_b2 (\n" +
+                "  id int ,\n" +
+                "  cid int ,\n" +
+                "  inh_b text ,\n" +
+                "  inh_b2 text ,\n" +
+                "  val text ,\n" +
+                "  PRIMARY KEY ( ( id ), cid )\n" +
+                ");");
+        session.execute("CREATE TABLE junit.tpc_inherit_c (\n" +
+                "  id int ,\n" +
+                "  cid int ,\n" +
+                "  inh_c text ,\n" +
+                "  val text ,\n" +
+                "  PRIMARY KEY ( ( id ), cid )\n" +
+                ");");
+        try {
+            Thread.sleep(2000L);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+
+        pstmtI = session.prepare("insert into junit.tpc_inherit_b ( id, cid, inh_b, val) values (?, ?, ?, ?)");
+        pstmtU = session.prepare("update junit.tpc_inherit_b set inh_b=?, val=? where id=? and cid=?");
+        pstmtD = session.prepare("delete from junit.tpc_inherit_b where id=? and cid=?");
+        pstmt1 = session.prepare("select id, cid, val from junit.tpc_base where id=? and cid=?");
+        pstmt2 = session.prepare("select id, cid, inh_a, val from junit.tpc_inherit_a where id=? and cid=?");
+        pstmt3 = session.prepare("select id, cid, inh_b, val from junit.tpc_inherit_b where id=? and cid=?");
+        pstmt4 = session.prepare("select id, cid, inh_b, inh_b2, val from junit.tpc_inherit_b2 where id=? and cid=?");
+        pstmt5 = session.prepare("select id, cid, inh_c, val from junit.tpc_inherit_c where id=? and cid=?");
+    }
+
+
+
+    @Test
+    public void lostDeletesTest()
+    {
+
+        for (int i = 0; i < 500; i++)
+        {
+            session.execute(pstmtI.bind(1, 1, "inhB", "valB"));
+
+            ResultSetFuture[] futures = load();
+
+            Assert.assertTrue(futures[0].getUninterruptibly().isExhausted());
+            Assert.assertTrue(futures[1].getUninterruptibly().isExhausted());
+            Assert.assertNotNull(futures[2].getUninterruptibly().one());
+            Assert.assertTrue(futures[3].getUninterruptibly().isExhausted());
+            Assert.assertTrue(futures[4].getUninterruptibly().isExhausted());
+
+            session.execute(pstmtU.bind("inhBu", "valBu", 1, 1));
+
+            futures = load();
+
+            Assert.assertTrue(futures[0].getUninterruptibly().isExhausted());
+            Assert.assertTrue(futures[1].getUninterruptibly().isExhausted());
+            Assert.assertNotNull(futures[2].getUninterruptibly().one());
+            Assert.assertTrue(futures[3].getUninterruptibly().isExhausted());
+            Assert.assertTrue(futures[4].getUninterruptibly().isExhausted());
+
+            session.execute(pstmtD.bind(1, 1));
+
+            futures = load();
+
+            Assert.assertTrue(futures[0].getUninterruptibly().isExhausted());
+            Assert.assertTrue(futures[1].getUninterruptibly().isExhausted());
+            Assert.assertTrue(futures[2].getUninterruptibly().isExhausted());
+            Assert.assertTrue(futures[3].getUninterruptibly().isExhausted());
+            Assert.assertTrue(futures[4].getUninterruptibly().isExhausted());
+        }
+    }
+
+    private ResultSetFuture[] load() {
+        return new ResultSetFuture[]{
+                session.executeAsync(pstmt1.bind(1, 1)),
+                session.executeAsync(pstmt2.bind(1, 1)),
+                session.executeAsync(pstmt3.bind(1, 1)),
+                session.executeAsync(pstmt4.bind(1, 1)),
+                session.executeAsync(pstmt5.bind(1, 1))
+        };
+    }
+}


[2/2] git commit: Merge branch 'cassandra-2.1' into trunk

Posted by ja...@apache.org.
Merge branch 'cassandra-2.1' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5bd5e25f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5bd5e25f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5bd5e25f

Branch: refs/heads/trunk
Commit: 5bd5e25fbbb490acd047adf495f75ae1ca49cf0f
Parents: 45afe1d a14a01c
Author: Jake Luciani <ja...@apache.org>
Authored: Mon Jun 16 16:12:27 2014 -0400
Committer: Jake Luciani <ja...@apache.org>
Committed: Mon Jun 16 16:12:27 2014 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +-
 .../cql3/statements/BatchStatement.java         |  17 ++-
 .../apache/cassandra/db/AtomicBTreeColumns.java |   8 +-
 .../org/apache/cassandra/db/DeletionInfo.java   |  12 ++
 .../apache/cassandra/db/RangeTombstoneList.java |  21 +++
 test/conf/cassandra.yaml                        |   2 +
 .../org/apache/cassandra/cql3/DeleteTest.java   | 148 +++++++++++++++++++
 7 files changed, 201 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5bd5e25f/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5bd5e25f/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
index 4eb4940,0e38784..58734e8
--- a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
@@@ -35,11 -35,10 +35,12 @@@ import org.apache.cassandra.db.composit
  import org.apache.cassandra.db.composites.Composite;
  import org.apache.cassandra.db.filter.ColumnSlice;
  import org.apache.cassandra.utils.ObjectSizes;
 +import org.apache.cassandra.utils.SearchIterator;
  import org.apache.cassandra.utils.btree.BTree;
 +import org.apache.cassandra.utils.btree.BTreeSearchIterator;
  import org.apache.cassandra.utils.btree.UpdateFunction;
  import org.apache.cassandra.utils.concurrent.OpOrder;
+ import org.apache.cassandra.utils.memory.HeapAllocator;
  import org.apache.cassandra.utils.memory.MemtableAllocator;
  
  import static org.apache.cassandra.db.index.SecondaryIndexManager.Updater;