You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2018/07/16 16:57:04 UTC

[3/6] cassandra git commit: Fix corrupted static collection deletions in 3.0 -> 2.{1, 2} messages

Fix corrupted static collection deletions in 3.0 -> 2.{1,2} messages

patch by Benedict; reviewed by Aleksey Yeschenko for CASSANDRA-14568


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d52c7b8c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d52c7b8c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d52c7b8c

Branch: refs/heads/trunk
Commit: d52c7b8c595cc0d06fc3607bf16e3f595f016bb6
Parents: 6ce887e
Author: Benedict Elliott Smith <be...@apple.com>
Authored: Mon Jul 16 17:37:24 2018 +0100
Committer: Benedict Elliott Smith <be...@apple.com>
Committed: Mon Jul 16 17:40:09 2018 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/db/LegacyLayout.java   |  15 ++-
 .../cassandra/db/marshal/CompositeType.java     |  13 ++-
 .../db/partitions/PartitionUpdate.java          |  40 +++++--
 .../apache/cassandra/db/LegacyLayoutTest.java   | 112 ++++++++++++++++++-
 5 files changed, 155 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d52c7b8c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6f6ae20..47e5cd0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.17
+ * Fix corrupted static collection deletions in 3.0 -> 2.{1,2} messages (CASSANDRA-14568)
  * Fix potential IndexOutOfBoundsException with counters (CASSANDRA-14167)
  * Restore resumable hints delivery, backport CASSANDRA-11960 (CASSANDRA-14419)
  * Always close RT markers returned by ReadCommand#executeLocally() (CASSANDRA-14515)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d52c7b8c/src/java/org/apache/cassandra/db/LegacyLayout.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java b/src/java/org/apache/cassandra/db/LegacyLayout.java
index 912d591..e0f66e3 100644
--- a/src/java/org/apache/cassandra/db/LegacyLayout.java
+++ b/src/java/org/apache/cassandra/db/LegacyLayout.java
@@ -197,6 +197,7 @@ public abstract class LegacyLayout
 
         int clusteringSize = metadata.comparator.size();
 
+        boolean isStatic = metadata.isCompound() && CompositeType.isStaticName(bound);
         List<ByteBuffer> components = CompositeType.splitName(bound);
         byte eoc = CompositeType.lastEOC(bound);
 
@@ -206,8 +207,12 @@ public abstract class LegacyLayout
         assert components.size() <= clusteringSize || (!metadata.isCompactTable() && components.size() == clusteringSize + 1);
 
         ColumnDefinition collectionName = null;
-        if (components.size() > clusteringSize)
-            collectionName = metadata.getColumnDefinition(components.remove(clusteringSize));
+        if (components.size() > (isStatic ? 0 : clusteringSize))
+        {
+            // pop the collection name from the back of the list of clusterings
+            ByteBuffer collectionNameBytes = components.remove(isStatic ? 0 : clusteringSize);
+            collectionName = metadata.getColumnDefinition(collectionNameBytes);
+        }
 
         boolean isInclusive;
         if (isStart)
@@ -231,7 +236,7 @@ public abstract class LegacyLayout
 
         Slice.Bound.Kind boundKind = Slice.Bound.boundKind(isStart, isInclusive);
         Slice.Bound sb = Slice.Bound.create(boundKind, components.toArray(new ByteBuffer[components.size()]));
-        return new LegacyBound(sb, metadata.isCompound() && CompositeType.isStaticName(bound), collectionName);
+        return new LegacyBound(sb, isStatic, collectionName);
     }
 
     public static ByteBuffer encodeBound(CFMetaData metadata, Slice.Bound bound, boolean isStart)
@@ -2386,8 +2391,8 @@ public abstract class LegacyLayout
                 LegacyBound start = starts[i];
                 LegacyBound end = ends[i];
 
-                CompositeType.Builder startBuilder = type.builder();
-                CompositeType.Builder endBuilder = type.builder();
+                CompositeType.Builder startBuilder = type.builder(start.isStatic);
+                CompositeType.Builder endBuilder = type.builder(end.isStatic);
                 for (int j = 0; j < start.bound.clustering().size(); j++)
                 {
                     startBuilder.add(start.bound.get(j));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d52c7b8c/src/java/org/apache/cassandra/db/marshal/CompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CompositeType.java b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
index 52d6d39..d4ddfc0 100644
--- a/src/java/org/apache/cassandra/db/marshal/CompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
@@ -339,6 +339,11 @@ public class CompositeType extends AbstractCompositeType
         return new Builder(this);
     }
 
+    public Builder builder(boolean isStatic)
+    {
+        return new Builder(this, isStatic);
+    }
+
     public static ByteBuffer build(ByteBuffer... buffers)
     {
         return build(false, buffers);
@@ -376,12 +381,12 @@ public class CompositeType extends AbstractCompositeType
 
         public Builder(CompositeType composite)
         {
-            this(composite, new ArrayList<ByteBuffer>(composite.types.size()), new byte[composite.types.size()], false);
+            this(composite, false);
         }
 
-        public static Builder staticBuilder(CompositeType composite)
+        public Builder(CompositeType composite, boolean isStatic)
         {
-            return new Builder(composite, new ArrayList<ByteBuffer>(composite.types.size()), new byte[composite.types.size()], true);
+            this(composite, new ArrayList<>(composite.types.size()), new byte[composite.types.size()], isStatic);
         }
 
         private Builder(CompositeType composite, List<ByteBuffer> components, byte[] endOfComponents, boolean isStatic)
@@ -398,7 +403,7 @@ public class CompositeType extends AbstractCompositeType
 
         private Builder(Builder b)
         {
-            this(b.composite, new ArrayList<ByteBuffer>(b.components), Arrays.copyOf(b.endOfComponents, b.endOfComponents.length), b.isStatic);
+            this(b.composite, new ArrayList<>(b.components), Arrays.copyOf(b.endOfComponents, b.endOfComponents.length), b.isStatic);
             this.serializedSize = b.serializedSize;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d52c7b8c/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index 93b3568..7363c24 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -157,23 +157,39 @@ public class PartitionUpdate extends AbstractBTreePartition
      *
      * @param metadata the metadata for the created update.
      * @param key the partition key for the partition to update.
-     * @param row the row for the update.
+     * @param row the row for the update (may be null).
+     * @param row the static row for the update (may be null).
      *
      * @return the newly created partition update containing only {@code row}.
      */
-    public static PartitionUpdate singleRowUpdate(CFMetaData metadata, DecoratedKey key, Row row)
+    public static PartitionUpdate singleRowUpdate(CFMetaData metadata, DecoratedKey key, Row row, Row staticRow)
     {
         MutableDeletionInfo deletionInfo = MutableDeletionInfo.live();
-        if (row.isStatic())
-        {
-            Holder holder = new Holder(new PartitionColumns(Columns.from(row.columns()), Columns.NONE), BTree.empty(), deletionInfo, row, EncodingStats.NO_STATS);
-            return new PartitionUpdate(metadata, key, holder, deletionInfo, false);
-        }
-        else
-        {
-            Holder holder = new Holder(new PartitionColumns(Columns.NONE, Columns.from(row.columns())), BTree.singleton(row), deletionInfo, Rows.EMPTY_STATIC_ROW, EncodingStats.NO_STATS);
-            return new PartitionUpdate(metadata, key, holder, deletionInfo, false);
-        }
+        Holder holder = new Holder(
+            new PartitionColumns(
+                staticRow == null ? Columns.NONE : Columns.from(staticRow.columns()),
+                row == null ? Columns.NONE : Columns.from(row.columns())
+            ),
+            row == null ? BTree.empty() : BTree.singleton(row),
+            deletionInfo,
+            staticRow == null ? Rows.EMPTY_STATIC_ROW : staticRow,
+            EncodingStats.NO_STATS
+        );
+        return new PartitionUpdate(metadata, key, holder, deletionInfo, false);
+    }
+
+    /**
+     * Creates an immutable partition update that contains a single row update.
+     *
+     * @param metadata the metadata for the created update.
+     * @param key the partition key for the partition to update.
+     * @param row the row for the update (may be static).
+     *
+     * @return the newly created partition update containing only {@code row}.
+     */
+    public static PartitionUpdate singleRowUpdate(CFMetaData metadata, DecoratedKey key, Row row)
+    {
+        return singleRowUpdate(metadata, key, row.isStatic() ? null : row, row.isStatic() ? row : null);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d52c7b8c/test/unit/org/apache/cassandra/db/LegacyLayoutTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/LegacyLayoutTest.java b/test/unit/org/apache/cassandra/db/LegacyLayoutTest.java
index 715d7c9..44d8a70 100644
--- a/test/unit/org/apache/cassandra/db/LegacyLayoutTest.java
+++ b/test/unit/org/apache/cassandra/db/LegacyLayoutTest.java
@@ -18,11 +18,28 @@
 
 package org.apache.cassandra.db;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.rows.BufferCell;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIteratorSerializer;
+import org.apache.cassandra.db.transform.FilteredRows;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.serializers.Int32Serializer;
+import org.apache.cassandra.serializers.UTF8Serializer;
+import org.apache.cassandra.utils.FBUtilities;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
@@ -45,6 +62,16 @@ import static org.junit.Assert.*;
 
 public class LegacyLayoutTest
 {
+    static final String KEYSPACE = "Keyspace1";
+
+    @BeforeClass
+    public static void defineSchema() throws ConfigurationException
+    {
+        DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+        SchemaLoader.loadSchema();
+        SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1));
+    }
+
     @Test
     public void testFromUnfilteredRowIterator() throws Throwable
     {
@@ -113,11 +140,6 @@ public class LegacyLayoutTest
     @Test
     public void testRTBetweenColumns() throws Throwable
     {
-        String KEYSPACE = "Keyspace1";
-        DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
-
-        SchemaLoader.loadSchema();
-        SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1));
         QueryProcessor.executeInternal(String.format("CREATE TABLE \"%s\".legacy_ka_repeated_rt (k1 int, c1 int, c2 int, val1 text, val2 text, val3 text, primary key (k1, c1, c2))", KEYSPACE));
 
         Keyspace keyspace = Keyspace.open(KEYSPACE);
@@ -151,4 +173,84 @@ public class LegacyLayoutTest
         }
 
     }
+
+
+    private static UnfilteredRowIterator roundTripVia21(UnfilteredRowIterator partition) throws IOException
+    {
+        try (DataOutputBuffer out = new DataOutputBuffer())
+        {
+            LegacyLayout.serializeAsLegacyPartition(null, partition, out, MessagingService.VERSION_21);
+            try (DataInputBuffer in = new DataInputBuffer(out.buffer(), false))
+            {
+                return LegacyLayout.deserializeLegacyPartition(in, MessagingService.VERSION_21, SerializationHelper.Flag.LOCAL, partition.partitionKey().getKey());
+            }
+        }
+    }
+
+
+    @Test
+    public void testStaticRangeTombstoneRoundTripUnexpectedDeletion() throws Throwable
+    {
+        // this variant of the bug deletes a row with the same clustering key value as the name of the static collection
+        QueryProcessor.executeInternal(String.format("CREATE TABLE \"%s\".legacy_static_rt_rt_1 (pk int, ck1 text, ck2 text, v int, s set<text> static, primary key (pk, ck1, ck2))", KEYSPACE));
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        CFMetaData table = keyspace.getColumnFamilyStore("legacy_static_rt_rt_1").metadata;
+        ColumnDefinition v = table.getColumnDefinition(new ColumnIdentifier("v", false));
+        ColumnDefinition bug = table.getColumnDefinition(new ColumnIdentifier("s", false));
+
+        Row.Builder builder;
+        builder = BTreeRow.unsortedBuilder(0);
+        builder.newRow(Clustering.STATIC_CLUSTERING);
+        builder.addComplexDeletion(bug, new DeletionTime(1L, 1));
+        Row staticRow = builder.build();
+
+        builder = BTreeRow.unsortedBuilder(0);
+        builder.newRow(new Clustering(UTF8Serializer.instance.serialize("s"), UTF8Serializer.instance.serialize("anything")));
+        builder.addCell(new BufferCell(v, 1L, Cell.NO_TTL, Cell.NO_DELETION_TIME, Int32Serializer.instance.serialize(1), null));
+        Row row = builder.build();
+
+        DecoratedKey pk = table.decorateKey(ByteBufferUtil.bytes(1));
+        PartitionUpdate upd = PartitionUpdate.singleRowUpdate(table, pk, row, staticRow);
+
+        try (RowIterator before = FilteredRows.filter(upd.unfilteredIterator(), FBUtilities.nowInSeconds());
+             RowIterator after = FilteredRows.filter(roundTripVia21(upd.unfilteredIterator()), FBUtilities.nowInSeconds()))
+        {
+            while (before.hasNext() || after.hasNext())
+                assertEquals(before.hasNext() ? before.next() : null, after.hasNext() ? after.next() : null);
+        }
+    }
+
+    @Test
+    public void testStaticRangeTombstoneRoundTripCorruptRead() throws Throwable
+    {
+        // this variant of the bug corrupts the byte stream of the partition, so that a sequential read starting before
+        // this partition will fail with a CorruptSSTableException, and possible yield junk results
+        QueryProcessor.executeInternal(String.format("CREATE TABLE \"%s\".legacy_static_rt_rt_2 (pk int, ck int, nameWithLengthGreaterThan4 set<int> static, primary key (pk, ck))", KEYSPACE));
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        CFMetaData table = keyspace.getColumnFamilyStore("legacy_static_rt_rt_2").metadata;
+
+        ColumnDefinition bug = table.getColumnDefinition(new ColumnIdentifier("nameWithLengthGreaterThan4", false));
+
+        Row.Builder builder = BTreeRow.unsortedBuilder(0);
+        builder.newRow(Clustering.STATIC_CLUSTERING);
+        builder.addComplexDeletion(bug, new DeletionTime(1L, 1));
+        Row row = builder.build();
+
+        DecoratedKey pk = table.decorateKey(ByteBufferUtil.bytes(1));
+        PartitionUpdate upd = PartitionUpdate.singleRowUpdate(table, pk, row);
+
+        UnfilteredRowIterator afterRoundTripVia32 = roundTripVia21(upd.unfilteredIterator());
+        try (DataOutputBuffer out = new DataOutputBuffer())
+        {
+            // we only encounter a corruption/serialization error after writing this to a 3.0 format and reading it back
+            UnfilteredRowIteratorSerializer.serializer.serialize(afterRoundTripVia32, ColumnFilter.all(table), out, MessagingService.current_version);
+            try (DataInputBuffer in = new DataInputBuffer(out.buffer(), false);
+                 UnfilteredRowIterator afterSerialization = UnfilteredRowIteratorSerializer.serializer.deserialize(in, MessagingService.current_version, table, ColumnFilter.all(table), SerializationHelper.Flag.LOCAL))
+            {
+                while (afterSerialization.hasNext())
+                    afterSerialization.next();
+            }
+        }
+    }
+
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org