You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/09/01 10:53:21 UTC
[1/2] cassandra git commit: Avoid digest mismatches on upgrade to 3.0
Repository: cassandra
Updated Branches:
refs/heads/trunk 8302ef7a8 -> 4e57b8200
Avoid digest mismatches on upgrade to 3.0
patch by slebresne; reviewed by blambov for CASSANDRA-9554
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/782a1c3a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/782a1c3a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/782a1c3a
Branch: refs/heads/trunk
Commit: 782a1c3aecffb8674338fbd9ceeb56a357ecc3f2
Parents: 627e939
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu Jul 23 23:42:54 2015 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Sep 1 10:52:57 2015 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/cql3/UpdateParameters.java | 9 +-
.../org/apache/cassandra/db/DeletionTime.java | 4 +-
.../org/apache/cassandra/db/LegacyLayout.java | 72 +++++++-
.../cassandra/db/PartitionRangeReadCommand.java | 13 +-
.../org/apache/cassandra/db/ReadCommand.java | 76 ++++++--
.../org/apache/cassandra/db/ReadResponse.java | 28 +--
.../db/SinglePartitionNamesCommand.java | 9 +-
.../db/SinglePartitionReadCommand.java | 13 +-
.../db/SinglePartitionSliceCommand.java | 7 +-
.../UnfilteredPartitionIterators.java | 11 +-
.../apache/cassandra/db/rows/AbstractCell.java | 1 -
.../db/rows/UnfilteredRowIterators.java | 20 +-
.../org/apache/cassandra/repair/Validator.java | 2 +-
.../cassandra/service/AbstractReadExecutor.java | 9 +
.../cassandra/service/DigestResolver.java | 2 +-
.../cassandra/thrift/CassandraServer.java | 5 +-
.../cassandra/cache/CacheProviderTest.java | 5 +-
.../org/apache/cassandra/db/PartitionTest.java | 82 +++++----
.../rows/DigestBackwardCompatibilityTest.java | 182 +++++++++++++++++++
20 files changed, 452 insertions(+), 99 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/782a1c3a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 08b7df0..390255c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.0-beta2
+ * Avoid digest mismatch on upgrade to 3.0 (CASSANDRA-9554)
* Fix Materialized View builder when adding multiple MVs (CASSANDRA-10156)
* Choose better poolingOptions for protocol v4 in cassandra-stress (CASSANDRA-10182)
* Fix LWW bug affecting Materialized Views (CASSANDRA-10197)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/782a1c3a/src/java/org/apache/cassandra/cql3/UpdateParameters.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
index 1cdb64d..045b1e1 100644
--- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java
+++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
@@ -146,7 +146,14 @@ public class UpdateParameters
public void addRowDeletion()
{
- builder.addRowDeletion(deletionTime);
+ // For compact tables, at the exclusion of the static row (of static compact tables), each row ever has a single column,
+ // the "compact" one. As such, deleting the row or deleting that single cell is equivalent. We favor the later however
+ // because that makes it easier when translating back to the old format layout (for thrift and pre-3.0 backward
+ // compatibility) as we don't have to special case for the row deletion. This is also in line with what we use to do pre-3.0.
+ if (metadata.isCompactTable() && builder.clustering() != Clustering.STATIC_CLUSTERING)
+ addTombstone(metadata.compactValueColumn());
+ else
+ builder.addRowDeletion(deletionTime);
}
public void addTombstone(ColumnDefinition column) throws InvalidRequestException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/782a1c3a/src/java/org/apache/cassandra/db/DeletionTime.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java b/src/java/org/apache/cassandra/db/DeletionTime.java
index 3e9ca80..343a6c2 100644
--- a/src/java/org/apache/cassandra/db/DeletionTime.java
+++ b/src/java/org/apache/cassandra/db/DeletionTime.java
@@ -83,8 +83,10 @@ public class DeletionTime implements Comparable<DeletionTime>, IMeasurableMemory
public void digest(MessageDigest digest)
{
+ // localDeletionTime is basically a metadata of the deletion time that tells us when it's ok to purge it.
+ // It's thus intrinsically a local information and shouldn't be part of the digest (which exists for
+ // cross-nodes comparisons).
FBUtilities.updateWithLong(digest, markedForDeleteAt());
- FBUtilities.updateWithInt(digest, localDeletionTime());
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/782a1c3a/src/java/org/apache/cassandra/db/LegacyLayout.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java b/src/java/org/apache/cassandra/db/LegacyLayout.java
index 628ac75..7b03e46 100644
--- a/src/java/org/apache/cassandra/db/LegacyLayout.java
+++ b/src/java/org/apache/cassandra/db/LegacyLayout.java
@@ -21,6 +21,7 @@ import java.io.DataInput;
import java.io.IOException;
import java.io.IOError;
import java.nio.ByteBuffer;
+import java.security.MessageDigest;
import java.util.*;
import org.apache.cassandra.utils.AbstractIterator;
@@ -919,7 +920,6 @@ public abstract class LegacyLayout
};
}
-
public static LegacyAtom readLegacyAtom(CFMetaData metadata, DataInputPlus in, boolean readAllAsDynamic) throws IOException
{
while (true)
@@ -1187,8 +1187,40 @@ public abstract class LegacyLayout
this.rangeTombstones = rangeTombstones;
this.cells = cells;
}
- }
+ public void digest(CFMetaData metadata, MessageDigest digest)
+ {
+ for (LegacyCell cell : cells)
+ {
+ digest.update(cell.name.encode(metadata).duplicate());
+
+ if (cell.isCounter())
+ CounterContext.instance().updateDigest(digest, cell.value);
+ else
+ digest.update(cell.value.duplicate());
+
+ FBUtilities.updateWithLong(digest, cell.timestamp);
+ FBUtilities.updateWithByte(digest, cell.serializationFlags());
+
+ if (cell.isExpiring())
+ FBUtilities.updateWithInt(digest, cell.ttl);
+
+ if (cell.isCounter())
+ {
+ // Counters used to have the timestampOfLastDelete field, which we stopped using long ago and has been hard-coded
+ // to Long.MIN_VALUE but was still taken into account in 2.2 counter digests (to maintain backward compatibility
+ // in the first place).
+ FBUtilities.updateWithLong(digest, Long.MIN_VALUE);
+ }
+ }
+
+ if (partitionDeletion.markedForDeleteAt() != Long.MIN_VALUE)
+ digest.update(ByteBufferUtil.bytes(partitionDeletion.markedForDeleteAt()));
+
+ if (!rangeTombstones.isEmpty())
+ rangeTombstones.updateDigest(digest);
+ }
+ }
public static class LegacyCellName
{
@@ -1285,6 +1317,12 @@ public abstract class LegacyLayout
*/
public static class LegacyCell implements LegacyAtom
{
+ private final static int DELETION_MASK = 0x01;
+ private final static int EXPIRATION_MASK = 0x02;
+ private final static int COUNTER_MASK = 0x04;
+ private final static int COUNTER_UPDATE_MASK = 0x08;
+ private final static int RANGE_TOMBSTONE_MASK = 0x10;
+
public enum Kind { REGULAR, EXPIRING, DELETED, COUNTER }
public final Kind kind;
@@ -1337,6 +1375,17 @@ public abstract class LegacyLayout
return new LegacyCell(Kind.COUNTER, name, value, FBUtilities.timestampMicros(), Cell.NO_DELETION_TIME, Cell.NO_TTL);
}
+ public byte serializationFlags()
+ {
+ if (isExpiring())
+ return EXPIRATION_MASK;
+ if (isTombstone())
+ return DELETION_MASK;
+ if (isCounter())
+ return COUNTER_MASK;
+ return 0;
+ }
+
public ClusteringPrefix clustering()
{
return name.clustering;
@@ -1973,6 +2022,25 @@ public abstract class LegacyLayout
delTimes[i] = delTime;
}
+ public void updateDigest(MessageDigest digest)
+ {
+ ByteBuffer longBuffer = ByteBuffer.allocate(8);
+ for (int i = 0; i < size; i++)
+ {
+ for (int j = 0; j < starts[i].bound.size(); j++)
+ digest.update(starts[i].bound.get(j).duplicate());
+ if (starts[i].collectionName != null)
+ digest.update(starts[i].collectionName.name.bytes.duplicate());
+ for (int j = 0; j < ends[i].bound.size(); j++)
+ digest.update(ends[i].bound.get(j).duplicate());
+ if (ends[i].collectionName != null)
+ digest.update(ends[i].collectionName.name.bytes.duplicate());
+
+ longBuffer.putLong(0, markedAts[i]);
+ digest.update(longBuffer.array(), 0, 8);
+ }
+ }
+
public void serialize(DataOutputPlus out, CFMetaData metadata) throws IOException
{
out.writeInt(size);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/782a1c3a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index 2ba45e7..da62557 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -57,6 +57,7 @@ public class PartitionRangeReadCommand extends ReadCommand
private int oldestUnrepairedTombstone = Integer.MAX_VALUE;
public PartitionRangeReadCommand(boolean isDigest,
+ int digestVersion,
boolean isForThrift,
CFMetaData metadata,
int nowInSec,
@@ -65,7 +66,7 @@ public class PartitionRangeReadCommand extends ReadCommand
DataLimits limits,
DataRange dataRange)
{
- super(Kind.PARTITION_RANGE, isDigest, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits);
+ super(Kind.PARTITION_RANGE, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits);
this.dataRange = dataRange;
}
@@ -76,7 +77,7 @@ public class PartitionRangeReadCommand extends ReadCommand
DataLimits limits,
DataRange dataRange)
{
- this(false, false, metadata, nowInSec, columnFilter, rowFilter, limits, dataRange);
+ this(false, 0, false, metadata, nowInSec, columnFilter, rowFilter, limits, dataRange);
}
/**
@@ -114,12 +115,12 @@ public class PartitionRangeReadCommand extends ReadCommand
public PartitionRangeReadCommand forSubRange(AbstractBounds<PartitionPosition> range)
{
- return new PartitionRangeReadCommand(isDigestQuery(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange().forSubRange(range));
+ return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange().forSubRange(range));
}
public PartitionRangeReadCommand copy()
{
- return new PartitionRangeReadCommand(isDigestQuery(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange());
+ return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange());
}
public PartitionRangeReadCommand withUpdatedLimit(DataLimits newLimits)
@@ -302,11 +303,11 @@ public class PartitionRangeReadCommand extends ReadCommand
private static class Deserializer extends SelectionDeserializer
{
- public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits)
+ public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits)
throws IOException
{
DataRange range = DataRange.serializer.deserialize(in, version, metadata);
- return new PartitionRangeReadCommand(isDigest, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, range);
+ return new PartitionRangeReadCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, range);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/782a1c3a/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 1d5d477..0bc8cea 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -71,11 +71,13 @@ public abstract class ReadCommand implements ReadQuery
private final DataLimits limits;
private boolean isDigestQuery;
+ // if a digest query, the version for which the digest is expected. Ignored if not a digest.
+ private int digestVersion;
private final boolean isForThrift;
protected static abstract class SelectionDeserializer
{
- public abstract ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits) throws IOException;
+ public abstract ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits) throws IOException;
}
protected enum Kind
@@ -93,6 +95,7 @@ public abstract class ReadCommand implements ReadQuery
protected ReadCommand(Kind kind,
boolean isDigestQuery,
+ int digestVersion,
boolean isForThrift,
CFMetaData metadata,
int nowInSec,
@@ -102,6 +105,7 @@ public abstract class ReadCommand implements ReadQuery
{
this.kind = kind;
this.isDigestQuery = isDigestQuery;
+ this.digestVersion = digestVersion;
this.isForThrift = isForThrift;
this.metadata = metadata;
this.nowInSec = nowInSec;
@@ -192,6 +196,17 @@ public abstract class ReadCommand implements ReadQuery
}
/**
+ * If the query is a digest one, the requested digest version.
+ *
+ * @return the requested digest version if the query is a digest. Otherwise, this can return
+ * anything.
+ */
+ public int digestVersion()
+ {
+ return digestVersion;
+ }
+
+ /**
* Sets whether this command should be a digest one or not.
*
* @param isDigestQuery whether the command should be set as a digest one or not.
@@ -204,6 +219,22 @@ public abstract class ReadCommand implements ReadQuery
}
/**
+ * Sets the digest version, for when digest for that command is requested.
+ * <p>
+ * Note that we allow setting this independently of setting the command as a digest query as
+ * this allows us to use the command as a carrier of the digest version even if we only call
+ * setIsDigestQuery on some copy of it.
+ *
+ * @param digestVersion the version for the digest is this command is used for digest query..
+ * @return this read command.
+ */
+ public ReadCommand setDigestVersion(int digestVersion)
+ {
+ this.digestVersion = digestVersion;
+ return this;
+ }
+
+ /**
* Whether this query is for thrift or not.
*
* @return whether this query is for thrift.
@@ -252,7 +283,7 @@ public abstract class ReadCommand implements ReadQuery
public ReadResponse createResponse(UnfilteredPartitionIterator iterator, ColumnFilter selection)
{
return isDigestQuery()
- ? ReadResponse.createDigestResponse(iterator)
+ ? ReadResponse.createDigestResponse(iterator, digestVersion)
: ReadResponse.createDataResponse(iterator, selection);
}
@@ -481,6 +512,8 @@ public abstract class ReadCommand implements ReadQuery
out.writeByte(command.kind.ordinal());
out.writeByte(digestFlag(command.isDigestQuery()) | thriftFlag(command.isForThrift()));
+ if (command.isDigestQuery())
+ out.writeVInt(command.digestVersion());
CFMetaData.serializer.serialize(command.metadata(), out, version);
out.writeInt(command.nowInSec());
ColumnFilter.serializer.serialize(command.columnFilter(), out, version);
@@ -499,13 +532,14 @@ public abstract class ReadCommand implements ReadQuery
int flags = in.readByte();
boolean isDigest = isDigest(flags);
boolean isForThrift = isForThrift(flags);
+ int digestVersion = isDigest ? (int)in.readVInt() : 0;
CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
int nowInSec = in.readInt();
ColumnFilter columnFilter = ColumnFilter.serializer.deserialize(in, version, metadata);
RowFilter rowFilter = RowFilter.serializer.deserialize(in, version, metadata);
DataLimits limits = DataLimits.serializer.deserialize(in, version);
- return kind.selectionDeserializer.deserialize(in, version, isDigest, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits);
+ return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits);
}
public long serializedSize(ReadCommand command, int version)
@@ -514,6 +548,7 @@ public abstract class ReadCommand implements ReadQuery
assert version >= MessagingService.VERSION_30;
return 2 // kind + flags
+ + (command.isDigestQuery() ? TypeSizes.sizeofVInt(command.digestVersion()) : 0)
+ CFMetaData.serializer.serializedSize(command.metadata(), version)
+ TypeSizes.sizeof(command.nowInSec())
+ ColumnFilter.serializer.serializedSize(command.columnFilter(), version)
@@ -704,7 +739,7 @@ public abstract class ReadCommand implements ReadQuery
else
limits = DataLimits.cqlLimits(maxResults);
- return new PartitionRangeReadCommand(false, true, metadata, nowInSec, selection, rowFilter, limits, new DataRange(keyRange, filter));
+ return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, new DataRange(keyRange, filter));
}
static void serializeRowFilter(DataOutputPlus out, RowFilter rowFilter) throws IOException
@@ -814,7 +849,7 @@ public abstract class ReadCommand implements ReadQuery
ClusteringIndexSliceFilter sliceFilter = LegacyReadCommandSerializer.convertNamesFilterToSliceFilter(filter, metadata);
DataRange newRange = new DataRange(command.dataRange().keyRange(), sliceFilter);
return new PartitionRangeReadCommand(
- command.isDigestQuery(), command.isForThrift(), metadata, command.nowInSec(),
+ command.isDigestQuery(), command.digestVersion(), command.isForThrift(), metadata, command.nowInSec(),
command.columnFilter(), command.rowFilter(), command.limits(), newRange);
}
@@ -963,10 +998,18 @@ public abstract class ReadCommand implements ReadQuery
limits = limits.forPaging(maxResults);
- // pre-3.0 nodes normally expect pages to include the last cell from the previous page, but they handle it
- // missing without any problems, so we can safely always set "inclusive" to false in the data range
- DataRange dataRange = new DataRange(keyRange, filter).forPaging(keyRange, metadata.comparator, startClustering, false);
- return new PartitionRangeReadCommand(false, true, metadata, nowInSec, selection, rowFilter, limits, dataRange);
+ // The pagedRangeCommand is used in pre-3.0 for both the first page and the following ones. On the first page, the startBound will be
+ // the start of the overall slice and will not be a proper Clustering. So detect that case and just return a non-paging DataRange, which
+ // is what 3.0 does.
+ DataRange dataRange = new DataRange(keyRange, filter);
+ Slices slices = filter.requestedSlices();
+ if (startBound != LegacyLayout.LegacyBound.BOTTOM && !startBound.bound.equals(slices.get(0).start()))
+ {
+ // pre-3.0 nodes normally expect pages to include the last cell from the previous page, but they handle it
+ // missing without any problems, so we can safely always set "inclusive" to false in the data range
+ dataRange = dataRange.forPaging(keyRange, metadata.comparator, startBound.getAsClustering(metadata), false);
+ }
+ return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, dataRange);
}
public long serializedSize(ReadCommand command, int version)
@@ -1065,9 +1108,9 @@ public abstract class ReadCommand implements ReadQuery
switch (msgType)
{
case GET_BY_NAMES:
- return deserializeNamesCommand(in, isDigest, metadata, dk, nowInSeconds);
+ return deserializeNamesCommand(in, isDigest, metadata, dk, nowInSeconds, version);
case GET_SLICES:
- return deserializeSliceCommand(in, isDigest, metadata, dk, nowInSeconds);
+ return deserializeSliceCommand(in, isDigest, metadata, dk, nowInSeconds, version);
default:
throw new AssertionError();
}
@@ -1101,7 +1144,6 @@ public abstract class ReadCommand implements ReadQuery
serializeNamesFilter(command, command.clusteringIndexFilter(), out);
}
-
private static void serializeNamesFilter(ReadCommand command, ClusteringIndexNamesFilter filter, DataOutputPlus out) throws IOException
{
PartitionColumns columns = command.columnFilter().fetchedColumns();
@@ -1157,13 +1199,13 @@ public abstract class ReadCommand implements ReadQuery
return size + TypeSizes.sizeof(true); // countCql3Rows
}
- private SinglePartitionNamesCommand deserializeNamesCommand(DataInputPlus in, boolean isDigest, CFMetaData metadata, DecoratedKey key, int nowInSeconds) throws IOException
+ private SinglePartitionNamesCommand deserializeNamesCommand(DataInputPlus in, boolean isDigest, CFMetaData metadata, DecoratedKey key, int nowInSeconds, int version) throws IOException
{
Pair<ColumnFilter, ClusteringIndexNamesFilter> selectionAndFilter = deserializeNamesSelectionAndFilter(in, metadata);
// messages from old nodes will expect the thrift format, so always use 'true' for isForThrift
return new SinglePartitionNamesCommand(
- isDigest, true, metadata, nowInSeconds, selectionAndFilter.left, RowFilter.NONE, DataLimits.NONE,
+ isDigest, version, true, metadata, nowInSeconds, selectionAndFilter.left, RowFilter.NONE, DataLimits.NONE,
key, selectionAndFilter.right);
}
@@ -1243,7 +1285,7 @@ public abstract class ReadCommand implements ReadQuery
out.writeInt(compositesToGroup);
}
- private SinglePartitionSliceCommand deserializeSliceCommand(DataInputPlus in, boolean isDigest, CFMetaData metadata, DecoratedKey key, int nowInSeconds) throws IOException
+ private SinglePartitionSliceCommand deserializeSliceCommand(DataInputPlus in, boolean isDigest, CFMetaData metadata, DecoratedKey key, int nowInSeconds, int version) throws IOException
{
ClusteringIndexSliceFilter filter = deserializeSlicePartitionFilter(in, metadata);
int count = in.readInt();
@@ -1266,7 +1308,7 @@ public abstract class ReadCommand implements ReadQuery
limits = DataLimits.cqlLimits(count);
// messages from old nodes will expect the thrift format, so always use 'true' for isForThrift
- return new SinglePartitionSliceCommand(isDigest, true, metadata, nowInSeconds, columnFilter, RowFilter.NONE, limits, key, filter);
+ return new SinglePartitionSliceCommand(isDigest, version, true, metadata, nowInSeconds, columnFilter, RowFilter.NONE, limits, key, filter);
}
private long serializedSliceCommandSize(SinglePartitionSliceCommand command)
@@ -1423,7 +1465,7 @@ public abstract class ReadCommand implements ReadQuery
ClusteringIndexNamesFilter filter = ((SinglePartitionNamesCommand) command).clusteringIndexFilter();
ClusteringIndexSliceFilter sliceFilter = convertNamesFilterToSliceFilter(filter, metadata);
return new SinglePartitionSliceCommand(
- command.isDigestQuery(), command.isForThrift(), metadata, command.nowInSec(),
+ command.isDigestQuery(), command.digestVersion(), command.isForThrift(), metadata, command.nowInSec(),
command.columnFilter(), command.rowFilter(), command.limits(), command.partitionKey(), sliceFilter);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/782a1c3a/src/java/org/apache/cassandra/db/ReadResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java
index 21f6106..547e7f4 100644
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@ -70,20 +70,20 @@ public abstract class ReadResponse
return new RemoteDataResponse(LocalDataResponse.build(data, selection));
}
- public static ReadResponse createDigestResponse(UnfilteredPartitionIterator data)
+ public static ReadResponse createDigestResponse(UnfilteredPartitionIterator data, int version)
{
- return new DigestResponse(makeDigest(data));
+ return new DigestResponse(makeDigest(data, version));
}
public abstract UnfilteredPartitionIterator makeIterator(CFMetaData metadata, ReadCommand command);
public abstract ByteBuffer digest(CFMetaData metadata, ReadCommand command);
- public abstract boolean isDigestQuery();
+ public abstract boolean isDigestResponse();
- protected static ByteBuffer makeDigest(UnfilteredPartitionIterator iterator)
+ protected static ByteBuffer makeDigest(UnfilteredPartitionIterator iterator, int version)
{
MessageDigest digest = FBUtilities.threadLocalMD5Digest();
- UnfilteredPartitionIterators.digest(iterator, digest);
+ UnfilteredPartitionIterators.digest(iterator, digest, version);
return ByteBuffer.wrap(digest.digest());
}
@@ -105,10 +105,14 @@ public abstract class ReadResponse
public ByteBuffer digest(CFMetaData metadata, ReadCommand command)
{
+ // We assume that the digest is in the proper version, which bug excluded should be true since this is called with
+ // ReadCommand.digestVersion() as argument and that's also what we use to produce the digest in the first place.
+ // Validating it's the proper digest in this method would require sending back the digest version along with the
+ // digest which would waste bandwith for little gain.
return digest;
}
- public boolean isDigestQuery()
+ public boolean isDigestResponse()
{
return true;
}
@@ -201,11 +205,11 @@ public abstract class ReadResponse
{
try (UnfilteredPartitionIterator iterator = makeIterator(metadata, command))
{
- return makeDigest(iterator);
+ return makeDigest(iterator, command.digestVersion());
}
}
- public boolean isDigestQuery()
+ public boolean isDigestResponse()
{
return false;
}
@@ -268,11 +272,11 @@ public abstract class ReadResponse
{
try (UnfilteredPartitionIterator iterator = makeIterator(metadata, command))
{
- return makeDigest(iterator);
+ return makeDigest(iterator, command.digestVersion());
}
}
- public boolean isDigestQuery()
+ public boolean isDigestResponse()
{
return false;
}
@@ -284,7 +288,6 @@ public abstract class ReadResponse
{
boolean isDigest = response instanceof DigestResponse;
ByteBuffer digest = isDigest ? ((DigestResponse)response).digest : ByteBufferUtil.EMPTY_BYTE_BUFFER;
-
if (version < MessagingService.VERSION_30)
{
out.writeInt(digest.remaining());
@@ -310,9 +313,6 @@ public abstract class ReadResponse
ByteBufferUtil.writeWithVIntLength(digest, out);
if (!isDigest)
{
- // Note that we can only get there if version == 3.0, which is the current_version. When we'll change the
- // version, we'll have to deserialize/re-serialize the data to be in the proper version.
- assert version == MessagingService.VERSION_30;
ByteBuffer data = ((DataResponse)response).data;
ByteBufferUtil.writeWithVIntLength(data, out);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/782a1c3a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
index f40da5b..cee3fc4 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
@@ -46,6 +46,7 @@ public class SinglePartitionNamesCommand extends SinglePartitionReadCommand<Clus
{
private int oldestUnrepairedDeletionTime = Integer.MAX_VALUE;
protected SinglePartitionNamesCommand(boolean isDigest,
+ int digestVersion,
boolean isForThrift,
CFMetaData metadata,
int nowInSec,
@@ -55,7 +56,7 @@ public class SinglePartitionNamesCommand extends SinglePartitionReadCommand<Clus
DecoratedKey partitionKey,
ClusteringIndexNamesFilter clusteringIndexFilter)
{
- super(isDigest, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter);
+ super(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter);
}
public SinglePartitionNamesCommand(CFMetaData metadata,
@@ -66,7 +67,7 @@ public class SinglePartitionNamesCommand extends SinglePartitionReadCommand<Clus
DecoratedKey partitionKey,
ClusteringIndexNamesFilter clusteringIndexFilter)
{
- this(false, false, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter);
+ this(false, 0, false, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter);
}
public SinglePartitionNamesCommand(CFMetaData metadata,
@@ -77,12 +78,12 @@ public class SinglePartitionNamesCommand extends SinglePartitionReadCommand<Clus
ByteBuffer key,
ClusteringIndexNamesFilter clusteringIndexFilter)
{
- this(false, false, metadata, nowInSec, columnFilter, rowFilter, limits, metadata.decorateKey(key), clusteringIndexFilter);
+ this(metadata, nowInSec, columnFilter, rowFilter, limits, metadata.decorateKey(key), clusteringIndexFilter);
}
public SinglePartitionNamesCommand copy()
{
- return new SinglePartitionNamesCommand(isDigestQuery(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), partitionKey(), clusteringIndexFilter());
+ return new SinglePartitionNamesCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), partitionKey(), clusteringIndexFilter());
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/782a1c3a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index ca135f8..7b62f5a 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -50,6 +50,7 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
private final F clusteringIndexFilter;
protected SinglePartitionReadCommand(boolean isDigest,
+ int digestVersion,
boolean isForThrift,
CFMetaData metadata,
int nowInSec,
@@ -59,7 +60,7 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
DecoratedKey partitionKey,
F clusteringIndexFilter)
{
- super(Kind.SINGLE_PARTITION, isDigest, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits);
+ super(Kind.SINGLE_PARTITION, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits);
assert partitionKey.getPartitioner() == metadata.partitioner;
this.partitionKey = partitionKey;
this.clusteringIndexFilter = clusteringIndexFilter;
@@ -113,10 +114,10 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
ClusteringIndexFilter clusteringIndexFilter)
{
if (clusteringIndexFilter instanceof ClusteringIndexSliceFilter)
- return new SinglePartitionSliceCommand(false, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, (ClusteringIndexSliceFilter) clusteringIndexFilter);
+ return new SinglePartitionSliceCommand(false, 0, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, (ClusteringIndexSliceFilter) clusteringIndexFilter);
assert clusteringIndexFilter instanceof ClusteringIndexNamesFilter;
- return new SinglePartitionNamesCommand(false, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, (ClusteringIndexNamesFilter) clusteringIndexFilter);
+ return new SinglePartitionNamesCommand(false, 0, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, (ClusteringIndexNamesFilter) clusteringIndexFilter);
}
/**
@@ -506,15 +507,15 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
private static class Deserializer extends SelectionDeserializer
{
- public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits)
+ public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits)
throws IOException
{
DecoratedKey key = metadata.decorateKey(metadata.getKeyValidator().readValue(in));
ClusteringIndexFilter filter = ClusteringIndexFilter.serializer.deserialize(in, version, metadata);
if (filter instanceof ClusteringIndexNamesFilter)
- return new SinglePartitionNamesCommand(isDigest, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, key, (ClusteringIndexNamesFilter)filter);
+ return new SinglePartitionNamesCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, key, (ClusteringIndexNamesFilter)filter);
else
- return new SinglePartitionSliceCommand(isDigest, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, key, (ClusteringIndexSliceFilter)filter);
+ return new SinglePartitionSliceCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, key, (ClusteringIndexSliceFilter)filter);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/782a1c3a/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
index 2dbf7b1..27aab62 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
@@ -44,6 +44,7 @@ public class SinglePartitionSliceCommand extends SinglePartitionReadCommand<Clus
private int oldestUnrepairedTombstone = Integer.MAX_VALUE;
public SinglePartitionSliceCommand(boolean isDigest,
+ int digestVersion,
boolean isForThrift,
CFMetaData metadata,
int nowInSec,
@@ -53,7 +54,7 @@ public class SinglePartitionSliceCommand extends SinglePartitionReadCommand<Clus
DecoratedKey partitionKey,
ClusteringIndexSliceFilter clusteringIndexFilter)
{
- super(isDigest, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter);
+ super(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter);
}
public SinglePartitionSliceCommand(CFMetaData metadata,
@@ -64,7 +65,7 @@ public class SinglePartitionSliceCommand extends SinglePartitionReadCommand<Clus
DecoratedKey partitionKey,
ClusteringIndexSliceFilter clusteringIndexFilter)
{
- this(false, false, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter);
+ this(false, 0, false, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter);
}
/**
@@ -118,7 +119,7 @@ public class SinglePartitionSliceCommand extends SinglePartitionReadCommand<Clus
public SinglePartitionSliceCommand copy()
{
- return new SinglePartitionSliceCommand(isDigestQuery(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), partitionKey(), clusteringIndexFilter());
+ return new SinglePartitionSliceCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), partitionKey(), clusteringIndexFilter());
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/782a1c3a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index f7ee5ee..900b17a 100644
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -319,7 +319,14 @@ public abstract class UnfilteredPartitionIterators
};
}
- public static void digest(UnfilteredPartitionIterator iterator, MessageDigest digest)
+ /**
+ * Digests the the provided iterator.
+ *
+ * @param iterator the iterator to digest.
+ * @param digest the {@code MessageDigest} to use for the digest.
+ * @param version the messaging protocol to use when producing the digest.
+ */
+ public static void digest(UnfilteredPartitionIterator iterator, MessageDigest digest, int version)
{
try (UnfilteredPartitionIterator iter = iterator)
{
@@ -327,7 +334,7 @@ public abstract class UnfilteredPartitionIterators
{
try (UnfilteredRowIterator partition = iter.next())
{
- UnfilteredRowIterators.digest(partition, digest);
+ UnfilteredRowIterators.digest(partition, digest, version);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/782a1c3a/src/java/org/apache/cassandra/db/rows/AbstractCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractCell.java b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
index f53322a..e804b7a 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractCell.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
@@ -44,7 +44,6 @@ public abstract class AbstractCell extends Cell
{
digest.update(value().duplicate());
FBUtilities.updateWithLong(digest, timestamp());
- FBUtilities.updateWithInt(digest, localDeletionTime());
FBUtilities.updateWithInt(digest, ttl());
FBUtilities.updateWithBoolean(digest, isCounterCell());
if (path() != null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/782a1c3a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index 48e00f9..477eac9 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.db.*;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.IMergeIterator;
import org.apache.cassandra.utils.MergeIterator;
@@ -157,12 +158,21 @@ public abstract class UnfilteredRowIterators
};
}
- public static void digest(UnfilteredRowIterator iterator, MessageDigest digest)
+ /**
+ * Digests the partition represented by the provided iterator.
+ *
+ * @param iterator the iterator to digest.
+ * @param digest the {@code MessageDigest} to use for the digest.
+ * @param version the messaging protocol to use when producing the digest.
+ */
+ public static void digest(UnfilteredRowIterator iterator, MessageDigest digest, int version)
{
- // TODO: we're not computing digest the same way that old nodes. This
- // means we'll have digest mismatches during upgrade. We should pass the messaging version of
- // the node this is for (which might mean computing the digest last, and won't work
- // for schema (where we announce the version through gossip to everyone))
+ if (version < MessagingService.VERSION_30)
+ {
+ LegacyLayout.fromUnfilteredRowIterator(iterator).digest(iterator.metadata(), digest);
+ return;
+ }
+
digest.update(iterator.partitionKey().getKey().duplicate());
iterator.partitionLevelDeletion().digest(digest);
iterator.columns().digest(digest);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/782a1c3a/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java
index 7d6c787..d206305 100644
--- a/src/java/org/apache/cassandra/repair/Validator.java
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@ -209,7 +209,7 @@ public class Validator implements Runnable
validated++;
// MerkleTree uses XOR internally, so we want lots of output bits here
CountingDigest digest = new CountingDigest(FBUtilities.newMessageDigest("SHA-256"));
- UnfilteredRowIterators.digest(partition, digest);
+ UnfilteredRowIterators.digest(partition, digest, MessagingService.current_version);
// only return new hash for merkle tree in case digest was updated - see CASSANDRA-8979
return digest.count > 0
? new MerkleTree.RowHash(partition.partitionKey().getToken(), digest.digest(), digest.count)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/782a1c3a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index 16a3e6e..41d7bc6 100644
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@ -69,6 +69,15 @@ public abstract class AbstractReadExecutor
this.targetReplicas = targetReplicas;
this.handler = new ReadCallback(new DigestResolver(keyspace, command, consistencyLevel, targetReplicas.size()), consistencyLevel, command, targetReplicas);
this.traceState = Tracing.instance.get();
+
+ // Set the digest version (if we request some digests). This is the smallest version amongst all our target replicas since new nodes
+ // knows how to produce older digest but the reverse is not true.
+ // TODO: we need this when talking with pre-3.0 nodes. So if we preserve the digest format moving forward, we can get rid of this once
+ // we stop being compatible with pre-3.0 nodes.
+ int digestVersion = MessagingService.current_version;
+ for (InetAddress replica : targetReplicas)
+ digestVersion = Math.min(digestVersion, MessagingService.instance().getVersion(replica));
+ command.setDigestVersion(digestVersion);
}
protected void makeDataRequests(Iterable<InetAddress> endpoints)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/782a1c3a/src/java/org/apache/cassandra/service/DigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DigestResolver.java b/src/java/org/apache/cassandra/service/DigestResolver.java
index db8adf3..572df6f 100644
--- a/src/java/org/apache/cassandra/service/DigestResolver.java
+++ b/src/java/org/apache/cassandra/service/DigestResolver.java
@@ -38,7 +38,7 @@ public class DigestResolver extends ResponseResolver
public void preprocess(MessageIn<ReadResponse> message)
{
super.preprocess(message);
- if (dataResponse == null && !message.payload.isDigestQuery())
+ if (dataResponse == null && !message.payload.isDigestResponse())
dataResponse = message.payload;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/782a1c3a/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index fc1b638..038384e 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -1513,6 +1513,7 @@ public class CassandraServer implements Cassandra.Iface
ClusteringIndexFilter filter = toInternalFilter(metadata, column_parent, predicate);
DataLimits limits = getLimits(range.count, metadata.isSuper() && !column_parent.isSetSuper_column(), predicate);
PartitionRangeReadCommand cmd = new PartitionRangeReadCommand(false,
+ 0,
true,
metadata,
nowInSec,
@@ -1605,6 +1606,7 @@ public class CassandraServer implements Cassandra.Iface
? new Clustering(start_column)
: LegacyLayout.decodeCellName(metadata, start_column).clustering;
PartitionRangeReadCommand cmd = new PartitionRangeReadCommand(false,
+ 0,
true,
metadata,
nowInSec,
@@ -1695,6 +1697,7 @@ public class CassandraServer implements Cassandra.Iface
ClusteringIndexFilter filter = toInternalFilter(metadata, column_parent, column_predicate);
DataLimits limits = getLimits(index_clause.count, metadata.isSuper() && !column_parent.isSetSuper_column(), column_predicate);
PartitionRangeReadCommand cmd = new PartitionRangeReadCommand(false,
+ 0,
true,
metadata,
nowInSec,
@@ -2511,7 +2514,7 @@ public class CassandraServer implements Cassandra.Iface
// We want to know if the partition exists, so just fetch a single cell.
ClusteringIndexSliceFilter filter = new ClusteringIndexSliceFilter(Slices.ALL, false);
DataLimits limits = DataLimits.thriftLimits(1, 1);
- return new SinglePartitionSliceCommand(false, true, metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, limits, key, filter);
+ return new SinglePartitionSliceCommand(false, 0, true, metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, limits, key, filter);
}
// Gather the clustering for the expected values and query those.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/782a1c3a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
index 5030029..21a41c4 100644
--- a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
+++ b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
@@ -35,6 +35,7 @@ import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.marshal.AsciiType;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.utils.FBUtilities;
@@ -96,8 +97,8 @@ public class CacheProviderTest
{
MessageDigest d1 = MessageDigest.getInstance("MD5");
MessageDigest d2 = MessageDigest.getInstance("MD5");
- UnfilteredRowIterators.digest(((CachedBTreePartition) one).unfilteredIterator(), d1);
- UnfilteredRowIterators.digest(((CachedBTreePartition) two).unfilteredIterator(), d2);
+ UnfilteredRowIterators.digest(((CachedBTreePartition) one).unfilteredIterator(), d1, MessagingService.current_version);
+ UnfilteredRowIterators.digest(((CachedBTreePartition) two).unfilteredIterator(), d2, MessagingService.current_version);
assertTrue(MessageDigest.isEqual(d1.digest(), d2.digest()));
}
catch (NoSuchAlgorithmException e)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/782a1c3a/test/unit/org/apache/cassandra/db/PartitionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/PartitionTest.java b/test/unit/org/apache/cassandra/db/PartitionTest.java
index f0a63a8..623ff0e 100644
--- a/test/unit/org/apache/cassandra/db/PartitionTest.java
+++ b/test/unit/org/apache/cassandra/db/PartitionTest.java
@@ -47,7 +47,6 @@ import static org.junit.Assert.assertFalse;
public class PartitionTest
{
- static int version = MessagingService.current_version;
private static final String KEYSPACE1 = "Keyspace1";
private static final String CF_STANDARD1 = "Standard1";
private static final String CF_TENCOL = "TenColumns";
@@ -117,39 +116,58 @@ public class PartitionTest
@Test
public void testDigest() throws NoSuchAlgorithmException
{
+ testDigest(MessagingService.current_version);
+ }
+
+ @Test
+ public void testLegacyDigest() throws NoSuchAlgorithmException
+ {
+ testDigest(MessagingService.VERSION_22);
+ }
+
+ public void testDigest(int version) throws NoSuchAlgorithmException
+ {
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_TENCOL);
- RowUpdateBuilder builder = new RowUpdateBuilder(cfs.metadata, 5, "key1").clustering("c").add("val", "val1");
- for (int i = 0; i < 10; i++)
- builder.add("val" + i, "val" + i);
- builder.build().applyUnsafe();
- new RowUpdateBuilder(cfs.metadata, 5, "key2").clustering("c").add("val", "val2").build().applyUnsafe();
-
- ImmutableBTreePartition p1 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key1").build());
- ImmutableBTreePartition p2 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build());
-
- MessageDigest digest1 = MessageDigest.getInstance("MD5");
- MessageDigest digest2 = MessageDigest.getInstance("MD5");
- UnfilteredRowIterators.digest(p1.unfilteredIterator(), digest1);
- UnfilteredRowIterators.digest(p2.unfilteredIterator(), digest2);
- assertFalse(Arrays.equals(digest1.digest(), digest2.digest()));
-
- p1 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build());
- p2 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build());
- digest1 = MessageDigest.getInstance("MD5");
- digest2 = MessageDigest.getInstance("MD5");
- UnfilteredRowIterators.digest(p1.unfilteredIterator(), digest1);
- UnfilteredRowIterators.digest(p2.unfilteredIterator(), digest2);
- assertTrue(Arrays.equals(digest1.digest(), digest2.digest()));
-
- p1 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build());
- RowUpdateBuilder.deleteRow(cfs.metadata, 6, "key2", "c").applyUnsafe();
- p2 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build());
- digest1 = MessageDigest.getInstance("MD5");
- digest2 = MessageDigest.getInstance("MD5");
- UnfilteredRowIterators.digest(p1.unfilteredIterator(), digest1);
- UnfilteredRowIterators.digest(p2.unfilteredIterator(), digest2);
- assertFalse(Arrays.equals(digest1.digest(), digest2.digest()));
+ try
+ {
+ RowUpdateBuilder builder = new RowUpdateBuilder(cfs.metadata, 5, "key1").clustering("c").add("val", "val1");
+ for (int i = 0; i < 10; i++)
+ builder.add("val" + i, "val" + i);
+ builder.build().applyUnsafe();
+
+ new RowUpdateBuilder(cfs.metadata, 5, "key2").clustering("c").add("val", "val2").build().applyUnsafe();
+
+ ImmutableBTreePartition p1 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key1").build());
+ ImmutableBTreePartition p2 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build());
+
+ MessageDigest digest1 = MessageDigest.getInstance("MD5");
+ MessageDigest digest2 = MessageDigest.getInstance("MD5");
+ UnfilteredRowIterators.digest(p1.unfilteredIterator(), digest1, version);
+ UnfilteredRowIterators.digest(p2.unfilteredIterator(), digest2, version);
+ assertFalse(Arrays.equals(digest1.digest(), digest2.digest()));
+
+ p1 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build());
+ p2 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build());
+ digest1 = MessageDigest.getInstance("MD5");
+ digest2 = MessageDigest.getInstance("MD5");
+ UnfilteredRowIterators.digest(p1.unfilteredIterator(), digest1, version);
+ UnfilteredRowIterators.digest(p2.unfilteredIterator(), digest2, version);
+ assertTrue(Arrays.equals(digest1.digest(), digest2.digest()));
+
+ p1 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build());
+ RowUpdateBuilder.deleteRow(cfs.metadata, 6, "key2", "c").applyUnsafe();
+ p2 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build());
+ digest1 = MessageDigest.getInstance("MD5");
+ digest2 = MessageDigest.getInstance("MD5");
+ UnfilteredRowIterators.digest(p1.unfilteredIterator(), digest1, version);
+ UnfilteredRowIterators.digest(p2.unfilteredIterator(), digest2, version);
+ assertFalse(Arrays.equals(digest1.digest(), digest2.digest()));
+ }
+ finally
+ {
+ cfs.truncateBlocking();
+ }
}
@Test
http://git-wip-us.apache.org/repos/asf/cassandra/blob/782a1c3a/test/unit/org/apache/cassandra/db/rows/DigestBackwardCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/rows/DigestBackwardCompatibilityTest.java b/test/unit/org/apache/cassandra/db/rows/DigestBackwardCompatibilityTest.java
new file mode 100644
index 0000000..5503cfb
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/rows/DigestBackwardCompatibilityTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.security.MessageDigest;
+
+import org.junit.Test;
+
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.context.CounterContext;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.CounterId;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test that digest for pre-3.0 versions are properly computed (they match the value computed on pre-3.0 nodes).
+ *
+ * The concreted 'hard-coded' digests this file tests against have been generated on a 2.2 node using basically
+ * the same test file but with 2 modifications:
+ * 1. readAndDigest is modified to work on 2.2 (the actual modification is in the method as a comment)
+ * 2. the assertions are replace by simple println() of the generated digest.
+ *
+ * Note that we only compare against 2.2 since digests should be fixed between version before 3.0 (this would be a bug
+ * of previous version otherwise).
+ */
+public class DigestBackwardCompatibilityTest extends CQLTester
+{
+ private ByteBuffer readAndDigest(String partitionKey)
+ {
+ /*
+ * In 2.2, this must be replaced by:
+ * ColumnFamily partition = getCurrentColumnFamilyStore().getColumnFamily(QueryFilter.getIdentityFilter(Util.dk(partitionKey), currentTable(), System.currentTimeMillis()));
+ * return ColumnFamily.digest(partition);
+ */
+
+ ImmutableBTreePartition partition = Util.getOnlyPartitionUnfiltered(Util.cmd(getCurrentColumnFamilyStore(), partitionKey).build());
+ MessageDigest digest = FBUtilities.threadLocalMD5Digest();
+ UnfilteredRowIterators.digest(partition.unfilteredIterator(), digest, MessagingService.VERSION_22);
+ return ByteBuffer.wrap(digest.digest());
+ }
+
+ private void assertDigest(String expected, ByteBuffer actual)
+ {
+ String toTest = ByteBufferUtil.bytesToHex(actual);
+ assertEquals(String.format("[digest from 2.2] %s != %s [digest from 3.0]", expected, toTest), expected, toTest);
+ }
+
+ @Test
+ public void testCQLTable() throws Throwable
+ {
+ createTable("CREATE TABLE %s (k text, t int, v1 text, v2 int, PRIMARY KEY (k, t))");
+
+ String key = "someKey";
+ int N = 10;
+
+ for (int i = 0; i < 10; i++)
+ execute("INSERT INTO %s(k, t, v1, v2) VALUES (?, ?, ?, ?) USING TIMESTAMP ? AND TTL ?", key, i, "v" + i, i, 1L, 200);
+
+ // ColumnFamily(table_0 [0::false:0@1!200,0:v1:false:2@1!200,0:v2:false:4@1!200,1::false:0@1!200,1:v1:false:2@1!200,1:v2:false:4@1!200,2::false:0@1!200,2:v1:false:2@1!200,2:v2:false:4@1!200,3::false:0@1!200,3:v1:false:2@1!200,3:v2:false:4@1!200,4::false:0@1!200,4:v1:false:2@1!200,4:v2:false:4@1!200,5::false:0@1!200,5:v1:false:2@1!200,5:v2:false:4@1!200,6::false:0@1!200,6:v1:false:2@1!200,6:v2:false:4@1!200,7::false:0@1!200,7:v1:false:2@1!200,7:v2:false:4@1!200,8::false:0@1!200,8:v1:false:2@1!200,8:v2:false:4@1!200,9::false:0@1!200,9:v1:false:2@1!200,9:v2:false:4@1!200,])
+ assertDigest("aa608035cf6574a97061b5c166b64939", readAndDigest(key));
+
+ // This is a cell deletion
+ execute("DELETE v1 FROM %s USING TIMESTAMP ? WHERE k = ? AND t = ?", 2L, key, 2);
+
+ // This is a range tombstone
+ execute("DELETE FROM %s USING TIMESTAMP ? WHERE k = ? AND t = ?", 3L, key, 4);
+
+ // This is a partition level deletion (but we use an older tombstone so it doesn't get rid of everything and keeps the test interesting)
+ execute("DELETE FROM %s USING TIMESTAMP ? WHERE k = ?", 0L, key);
+
+ // ColumnFamily(table_0 -{deletedAt=0, localDeletion=1441012270, ranges=[4:_-4:!, deletedAt=3, localDeletion=1441012270]}- [0::false:0@1!200,0:v1:false:2@1!200,0:v2:false:4@1!200,1::false:0@1!200,1:v1:false:2@1!200,1:v2:false:4@1!200,2::false:0@1!200,2:v1:true:4@2,2:v2:false:4@1!200,3::false:0@1!200,3:v1:false:2@1!200,3:v2:false:4@1!200,5::false:0@1!200,5:v1:false:2@1!200,5:v2:false:4@1!200,6::false:0@1!200,6:v1:false:2@1!200,6:v2:false:4@1!200,7::false:0@1!200,7:v1:false:2@1!200,7:v2:false:4@1!200,8::false:0@1!200,8:v1:false:2@1!200,8:v2:false:4@1!200,9::false:0@1!200,9:v1:false:2@1!200,9:v2:false:4@1!200,])
+ assertDigest("b5f38d2dc7b917d221f98ab1641f82bf", readAndDigest(key));
+ }
+
+ @Test
+ public void testCompactTable() throws Throwable
+ {
+ createTable("CREATE TABLE %s (k text, t int, v text, PRIMARY KEY (k, t)) WITH COMPACT STORAGE");
+
+ String key = "someKey";
+ int N = 10;
+
+ for (int i = 0; i < 10; i++)
+ execute("INSERT INTO %s(k, t, v) VALUES (?, ?, ?) USING TIMESTAMP ? AND TTL ?", key, i, "v" + i, 1L, 200);
+
+ assertDigest("44785ddd7c62c73287b448b6063645e5", readAndDigest(key));
+
+ // This is a cell deletion
+ execute("DELETE FROM %s USING TIMESTAMP ? WHERE k = ? AND t = ?", 2L, key, 2);
+
+ // This is a partition level deletion (but we use an older tombstone so it doesn't get rid of everything and keeps the test interesting)
+ execute("DELETE FROM %s USING TIMESTAMP ? WHERE k = ?", 0L, key);
+
+ assertDigest("55d9bd6335276395d83b18eb17f9abe7", readAndDigest(key));
+ }
+
+ @Test
+ public void testStaticCompactTable() throws Throwable
+ {
+ createTable("CREATE TABLE %s (k text PRIMARY KEY, v1 text, v2 int) WITH COMPACT STORAGE");
+
+ String key = "someKey";
+ execute("INSERT INTO %s(k, v1, v2) VALUES (?, ?, ?) USING TIMESTAMP ?", key, "v", 0, 1L);
+
+ assertDigest("d2080f9f57d6edf92da1fdaaa76573d3", readAndDigest(key));
+ }
+
+ @Test
+ public void testTableWithCollection() throws Throwable
+ {
+ createTable("CREATE TABLE %s (k text PRIMARY KEY, m map<text, text>)");
+
+ String key = "someKey";
+
+ execute("INSERT INTO %s(k, m) VALUES (?, { 'foo' : 'value1', 'bar' : 'value2' }) USING TIMESTAMP ?", key, 1L);
+
+ // ColumnFamily(table_2 -{deletedAt=-9223372036854775808, localDeletion=2147483647, ranges=[m:_-m:!, deletedAt=0, localDeletion=1441012271]}- [:false:0@1,m:626172:false:6@1,m:666f6f:false:6@1,])
+ assertDigest("708f3fc8bc8149cc3513eef300bf0182", readAndDigest(key));
+
+ // This is a collection range tombstone
+ execute("DELETE m FROM %s USING TIMESTAMP ? WHERE k = ?", 2L, key);
+
+ // ColumnFamily(table_2 -{deletedAt=-9223372036854775808, localDeletion=2147483647, ranges=[m:_-m:!, deletedAt=2, localDeletion=1441012271]}- [:false:0@1,])
+ assertDigest("f39937fc3ed96956ef507e81717fa5cd", readAndDigest(key));
+ }
+
+ @Test
+ public void testCounterTable() throws Throwable
+ {
+ /*
+ * We can't use CQL to insert counters as both the timestamp and counter ID are automatically assigned and unpredictable.
+ * So we need to built it ourselves in a way that is totally equivalent between 2.2 and 3.0 which makes the test a little
+ * bit less readable. In any case, the code to generate the equivalent mutation on 2.2 is:
+ * ColumnFamily cf = ArrayBackedSortedColumns.factory.create(getCurrentColumnFamilyStore().metadata);
+ * ByteBuffer value = CounterContext.instance().createGlobal(CounterId.fromInt(1), 1L, 42L);
+ * cf.addColumn(new BufferCounterCell(CellNames.simpleSparse(new ColumnIdentifier("c", true)) , value, 0L, Long.MIN_VALUE));
+ * new Mutation(KEYSPACE, ByteBufferUtil.bytes(key), cf).applyUnsafe();
+ *
+ * Also note that we use COMPACT STORAGE only because it has no bearing on the test and was slightly easier in 2.2 to create
+ * the mutation.
+ */
+
+ createTable("CREATE TABLE %s (k text PRIMARY KEY, c counter) WITH COMPACT STORAGE");
+
+ String key = "someKey";
+
+ CFMetaData metadata = getCurrentColumnFamilyStore().metadata;
+ ColumnDefinition column = metadata.getColumnDefinition(ByteBufferUtil.bytes("c"));
+ ByteBuffer value = CounterContext.instance().createGlobal(CounterId.fromInt(1), 1L, 42L);
+ Row row = BTreeRow.singleCellRow(Clustering.STATIC_CLUSTERING, BufferCell.live(metadata, column, 0L, value));
+
+ new Mutation(PartitionUpdate.singleRowUpdate(metadata, Util.dk(key), row)).applyUnsafe();
+
+ assertDigest("3a5f7b48c320538b4cd2f829e05c6db3", readAndDigest(key));
+ }
+}
[2/2] cassandra git commit: Merge branch 'cassandra-3.0' into trunk
Posted by sl...@apache.org.
Merge branch 'cassandra-3.0' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4e57b820
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4e57b820
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4e57b820
Branch: refs/heads/trunk
Commit: 4e57b8200e0dbcaa73ebd55ef413ab63b4612cc8
Parents: 8302ef7 782a1c3
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Tue Sep 1 10:53:12 2015 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Sep 1 10:53:12 2015 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/cql3/UpdateParameters.java | 9 +-
.../org/apache/cassandra/db/DeletionTime.java | 4 +-
.../org/apache/cassandra/db/LegacyLayout.java | 72 +++++++-
.../cassandra/db/PartitionRangeReadCommand.java | 13 +-
.../org/apache/cassandra/db/ReadCommand.java | 76 ++++++--
.../org/apache/cassandra/db/ReadResponse.java | 28 +--
.../db/SinglePartitionNamesCommand.java | 9 +-
.../db/SinglePartitionReadCommand.java | 13 +-
.../db/SinglePartitionSliceCommand.java | 7 +-
.../UnfilteredPartitionIterators.java | 11 +-
.../apache/cassandra/db/rows/AbstractCell.java | 1 -
.../db/rows/UnfilteredRowIterators.java | 20 +-
.../org/apache/cassandra/repair/Validator.java | 2 +-
.../cassandra/service/AbstractReadExecutor.java | 9 +
.../cassandra/service/DigestResolver.java | 2 +-
.../cassandra/thrift/CassandraServer.java | 5 +-
.../cassandra/cache/CacheProviderTest.java | 5 +-
.../org/apache/cassandra/db/PartitionTest.java | 82 +++++----
.../rows/DigestBackwardCompatibilityTest.java | 182 +++++++++++++++++++
20 files changed, 452 insertions(+), 99 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e57b820/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 08d6411,390255c..a55a5af
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,8 -1,5 +1,9 @@@
+3.2
+ * Add transparent data encryption core classes (CASSANDRA-9945)
+
+
3.0.0-beta2
+ * Avoid digest mismatch on upgrade to 3.0 (CASSANDRA-9554)
* Fix Materialized View builder when adding multiple MVs (CASSANDRA-10156)
* Choose better poolingOptions for protocol v4 in cassandra-stress (CASSANDRA-10182)
* Fix LWW bug affecting Materialized Views (CASSANDRA-10197)