You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2018/07/16 16:57:03 UTC
[2/6] cassandra git commit: Fix corrupted static collection deletions
in 3.0 -> 2.{1, 2} messages
Fix corrupted static collection deletions in 3.0 -> 2.{1,2} messages
patch by Benedict; reviewed by Aleksey Yeschenko for CASSANDRA-14568
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d52c7b8c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d52c7b8c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d52c7b8c
Branch: refs/heads/cassandra-3.11
Commit: d52c7b8c595cc0d06fc3607bf16e3f595f016bb6
Parents: 6ce887e
Author: Benedict Elliott Smith <be...@apple.com>
Authored: Mon Jul 16 17:37:24 2018 +0100
Committer: Benedict Elliott Smith <be...@apple.com>
Committed: Mon Jul 16 17:40:09 2018 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/LegacyLayout.java | 15 ++-
.../cassandra/db/marshal/CompositeType.java | 13 ++-
.../db/partitions/PartitionUpdate.java | 40 +++++--
.../apache/cassandra/db/LegacyLayoutTest.java | 112 ++++++++++++++++++-
5 files changed, 155 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d52c7b8c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6f6ae20..47e5cd0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.17
+ * Fix corrupted static collection deletions in 3.0 -> 2.{1,2} messages (CASSANDRA-14568)
* Fix potential IndexOutOfBoundsException with counters (CASSANDRA-14167)
* Restore resumable hints delivery, backport CASSANDRA-11960 (CASSANDRA-14419)
* Always close RT markers returned by ReadCommand#executeLocally() (CASSANDRA-14515)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d52c7b8c/src/java/org/apache/cassandra/db/LegacyLayout.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java b/src/java/org/apache/cassandra/db/LegacyLayout.java
index 912d591..e0f66e3 100644
--- a/src/java/org/apache/cassandra/db/LegacyLayout.java
+++ b/src/java/org/apache/cassandra/db/LegacyLayout.java
@@ -197,6 +197,7 @@ public abstract class LegacyLayout
int clusteringSize = metadata.comparator.size();
+ boolean isStatic = metadata.isCompound() && CompositeType.isStaticName(bound);
List<ByteBuffer> components = CompositeType.splitName(bound);
byte eoc = CompositeType.lastEOC(bound);
@@ -206,8 +207,12 @@ public abstract class LegacyLayout
assert components.size() <= clusteringSize || (!metadata.isCompactTable() && components.size() == clusteringSize + 1);
ColumnDefinition collectionName = null;
- if (components.size() > clusteringSize)
- collectionName = metadata.getColumnDefinition(components.remove(clusteringSize));
+ if (components.size() > (isStatic ? 0 : clusteringSize))
+ {
+ // pop the collection name from the back of the list of clusterings
+ ByteBuffer collectionNameBytes = components.remove(isStatic ? 0 : clusteringSize);
+ collectionName = metadata.getColumnDefinition(collectionNameBytes);
+ }
boolean isInclusive;
if (isStart)
@@ -231,7 +236,7 @@ public abstract class LegacyLayout
Slice.Bound.Kind boundKind = Slice.Bound.boundKind(isStart, isInclusive);
Slice.Bound sb = Slice.Bound.create(boundKind, components.toArray(new ByteBuffer[components.size()]));
- return new LegacyBound(sb, metadata.isCompound() && CompositeType.isStaticName(bound), collectionName);
+ return new LegacyBound(sb, isStatic, collectionName);
}
public static ByteBuffer encodeBound(CFMetaData metadata, Slice.Bound bound, boolean isStart)
@@ -2386,8 +2391,8 @@ public abstract class LegacyLayout
LegacyBound start = starts[i];
LegacyBound end = ends[i];
- CompositeType.Builder startBuilder = type.builder();
- CompositeType.Builder endBuilder = type.builder();
+ CompositeType.Builder startBuilder = type.builder(start.isStatic);
+ CompositeType.Builder endBuilder = type.builder(end.isStatic);
for (int j = 0; j < start.bound.clustering().size(); j++)
{
startBuilder.add(start.bound.get(j));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d52c7b8c/src/java/org/apache/cassandra/db/marshal/CompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CompositeType.java b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
index 52d6d39..d4ddfc0 100644
--- a/src/java/org/apache/cassandra/db/marshal/CompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
@@ -339,6 +339,11 @@ public class CompositeType extends AbstractCompositeType
return new Builder(this);
}
+ public Builder builder(boolean isStatic)
+ {
+ return new Builder(this, isStatic);
+ }
+
public static ByteBuffer build(ByteBuffer... buffers)
{
return build(false, buffers);
@@ -376,12 +381,12 @@ public class CompositeType extends AbstractCompositeType
public Builder(CompositeType composite)
{
- this(composite, new ArrayList<ByteBuffer>(composite.types.size()), new byte[composite.types.size()], false);
+ this(composite, false);
}
- public static Builder staticBuilder(CompositeType composite)
+ public Builder(CompositeType composite, boolean isStatic)
{
- return new Builder(composite, new ArrayList<ByteBuffer>(composite.types.size()), new byte[composite.types.size()], true);
+ this(composite, new ArrayList<>(composite.types.size()), new byte[composite.types.size()], isStatic);
}
private Builder(CompositeType composite, List<ByteBuffer> components, byte[] endOfComponents, boolean isStatic)
@@ -398,7 +403,7 @@ public class CompositeType extends AbstractCompositeType
private Builder(Builder b)
{
- this(b.composite, new ArrayList<ByteBuffer>(b.components), Arrays.copyOf(b.endOfComponents, b.endOfComponents.length), b.isStatic);
+ this(b.composite, new ArrayList<>(b.components), Arrays.copyOf(b.endOfComponents, b.endOfComponents.length), b.isStatic);
this.serializedSize = b.serializedSize;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d52c7b8c/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index 93b3568..7363c24 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -157,23 +157,39 @@ public class PartitionUpdate extends AbstractBTreePartition
*
* @param metadata the metadata for the created update.
* @param key the partition key for the partition to update.
- * @param row the row for the update.
+ * @param row the row for the update (may be null).
+ * @param row the static row for the update (may be null).
*
* @return the newly created partition update containing only {@code row}.
*/
- public static PartitionUpdate singleRowUpdate(CFMetaData metadata, DecoratedKey key, Row row)
+ public static PartitionUpdate singleRowUpdate(CFMetaData metadata, DecoratedKey key, Row row, Row staticRow)
{
MutableDeletionInfo deletionInfo = MutableDeletionInfo.live();
- if (row.isStatic())
- {
- Holder holder = new Holder(new PartitionColumns(Columns.from(row.columns()), Columns.NONE), BTree.empty(), deletionInfo, row, EncodingStats.NO_STATS);
- return new PartitionUpdate(metadata, key, holder, deletionInfo, false);
- }
- else
- {
- Holder holder = new Holder(new PartitionColumns(Columns.NONE, Columns.from(row.columns())), BTree.singleton(row), deletionInfo, Rows.EMPTY_STATIC_ROW, EncodingStats.NO_STATS);
- return new PartitionUpdate(metadata, key, holder, deletionInfo, false);
- }
+ Holder holder = new Holder(
+ new PartitionColumns(
+ staticRow == null ? Columns.NONE : Columns.from(staticRow.columns()),
+ row == null ? Columns.NONE : Columns.from(row.columns())
+ ),
+ row == null ? BTree.empty() : BTree.singleton(row),
+ deletionInfo,
+ staticRow == null ? Rows.EMPTY_STATIC_ROW : staticRow,
+ EncodingStats.NO_STATS
+ );
+ return new PartitionUpdate(metadata, key, holder, deletionInfo, false);
+ }
+
+ /**
+ * Creates an immutable partition update that contains a single row update.
+ *
+ * @param metadata the metadata for the created update.
+ * @param key the partition key for the partition to update.
+ * @param row the row for the update (may be static).
+ *
+ * @return the newly created partition update containing only {@code row}.
+ */
+ public static PartitionUpdate singleRowUpdate(CFMetaData metadata, DecoratedKey key, Row row)
+ {
+ return singleRowUpdate(metadata, key, row.isStatic() ? null : row, row.isStatic() ? row : null);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d52c7b8c/test/unit/org/apache/cassandra/db/LegacyLayoutTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/LegacyLayoutTest.java b/test/unit/org/apache/cassandra/db/LegacyLayoutTest.java
index 715d7c9..44d8a70 100644
--- a/test/unit/org/apache/cassandra/db/LegacyLayoutTest.java
+++ b/test/unit/org/apache/cassandra/db/LegacyLayoutTest.java
@@ -18,11 +18,28 @@
package org.apache.cassandra.db;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.rows.BufferCell;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIteratorSerializer;
+import org.apache.cassandra.db.transform.FilteredRows;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.serializers.Int32Serializer;
+import org.apache.cassandra.serializers.UTF8Serializer;
+import org.apache.cassandra.utils.FBUtilities;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
@@ -45,6 +62,16 @@ import static org.junit.Assert.*;
public class LegacyLayoutTest
{
+ static final String KEYSPACE = "Keyspace1";
+
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
+ {
+ DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+ SchemaLoader.loadSchema();
+ SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1));
+ }
+
@Test
public void testFromUnfilteredRowIterator() throws Throwable
{
@@ -113,11 +140,6 @@ public class LegacyLayoutTest
@Test
public void testRTBetweenColumns() throws Throwable
{
- String KEYSPACE = "Keyspace1";
- DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
-
- SchemaLoader.loadSchema();
- SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1));
QueryProcessor.executeInternal(String.format("CREATE TABLE \"%s\".legacy_ka_repeated_rt (k1 int, c1 int, c2 int, val1 text, val2 text, val3 text, primary key (k1, c1, c2))", KEYSPACE));
Keyspace keyspace = Keyspace.open(KEYSPACE);
@@ -151,4 +173,84 @@ public class LegacyLayoutTest
}
}
+
+
+ private static UnfilteredRowIterator roundTripVia21(UnfilteredRowIterator partition) throws IOException
+ {
+ try (DataOutputBuffer out = new DataOutputBuffer())
+ {
+ LegacyLayout.serializeAsLegacyPartition(null, partition, out, MessagingService.VERSION_21);
+ try (DataInputBuffer in = new DataInputBuffer(out.buffer(), false))
+ {
+ return LegacyLayout.deserializeLegacyPartition(in, MessagingService.VERSION_21, SerializationHelper.Flag.LOCAL, partition.partitionKey().getKey());
+ }
+ }
+ }
+
+
+ @Test
+ public void testStaticRangeTombstoneRoundTripUnexpectedDeletion() throws Throwable
+ {
+ // this variant of the bug deletes a row with the same clustering key value as the name of the static collection
+ QueryProcessor.executeInternal(String.format("CREATE TABLE \"%s\".legacy_static_rt_rt_1 (pk int, ck1 text, ck2 text, v int, s set<text> static, primary key (pk, ck1, ck2))", KEYSPACE));
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ CFMetaData table = keyspace.getColumnFamilyStore("legacy_static_rt_rt_1").metadata;
+ ColumnDefinition v = table.getColumnDefinition(new ColumnIdentifier("v", false));
+ ColumnDefinition bug = table.getColumnDefinition(new ColumnIdentifier("s", false));
+
+ Row.Builder builder;
+ builder = BTreeRow.unsortedBuilder(0);
+ builder.newRow(Clustering.STATIC_CLUSTERING);
+ builder.addComplexDeletion(bug, new DeletionTime(1L, 1));
+ Row staticRow = builder.build();
+
+ builder = BTreeRow.unsortedBuilder(0);
+ builder.newRow(new Clustering(UTF8Serializer.instance.serialize("s"), UTF8Serializer.instance.serialize("anything")));
+ builder.addCell(new BufferCell(v, 1L, Cell.NO_TTL, Cell.NO_DELETION_TIME, Int32Serializer.instance.serialize(1), null));
+ Row row = builder.build();
+
+ DecoratedKey pk = table.decorateKey(ByteBufferUtil.bytes(1));
+ PartitionUpdate upd = PartitionUpdate.singleRowUpdate(table, pk, row, staticRow);
+
+ try (RowIterator before = FilteredRows.filter(upd.unfilteredIterator(), FBUtilities.nowInSeconds());
+ RowIterator after = FilteredRows.filter(roundTripVia21(upd.unfilteredIterator()), FBUtilities.nowInSeconds()))
+ {
+ while (before.hasNext() || after.hasNext())
+ assertEquals(before.hasNext() ? before.next() : null, after.hasNext() ? after.next() : null);
+ }
+ }
+
+ @Test
+ public void testStaticRangeTombstoneRoundTripCorruptRead() throws Throwable
+ {
+ // this variant of the bug corrupts the byte stream of the partition, so that a sequential read starting before
+ // this partition will fail with a CorruptSSTableException, and possible yield junk results
+ QueryProcessor.executeInternal(String.format("CREATE TABLE \"%s\".legacy_static_rt_rt_2 (pk int, ck int, nameWithLengthGreaterThan4 set<int> static, primary key (pk, ck))", KEYSPACE));
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ CFMetaData table = keyspace.getColumnFamilyStore("legacy_static_rt_rt_2").metadata;
+
+ ColumnDefinition bug = table.getColumnDefinition(new ColumnIdentifier("nameWithLengthGreaterThan4", false));
+
+ Row.Builder builder = BTreeRow.unsortedBuilder(0);
+ builder.newRow(Clustering.STATIC_CLUSTERING);
+ builder.addComplexDeletion(bug, new DeletionTime(1L, 1));
+ Row row = builder.build();
+
+ DecoratedKey pk = table.decorateKey(ByteBufferUtil.bytes(1));
+ PartitionUpdate upd = PartitionUpdate.singleRowUpdate(table, pk, row);
+
+ UnfilteredRowIterator afterRoundTripVia32 = roundTripVia21(upd.unfilteredIterator());
+ try (DataOutputBuffer out = new DataOutputBuffer())
+ {
+ // we only encounter a corruption/serialization error after writing this to a 3.0 format and reading it back
+ UnfilteredRowIteratorSerializer.serializer.serialize(afterRoundTripVia32, ColumnFilter.all(table), out, MessagingService.current_version);
+ try (DataInputBuffer in = new DataInputBuffer(out.buffer(), false);
+ UnfilteredRowIterator afterSerialization = UnfilteredRowIteratorSerializer.serializer.deserialize(in, MessagingService.current_version, table, ColumnFilter.all(table), SerializationHelper.Flag.LOCAL))
+ {
+ while (afterSerialization.hasNext())
+ afterSerialization.next();
+ }
+ }
+ }
+
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org