You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/01/29 22:48:53 UTC
cassandra git commit: Token serialization should accept partitioner
explicitly
Repository: cassandra
Updated Branches:
refs/heads/trunk 6145d50f5 -> 806facc8c
Token serialization should accept partitioner explicitly
patch by Branimir Lambov; reviewed by Aleksey Yeschenko for
CASSANDRA-8268
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/806facc8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/806facc8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/806facc8
Branch: refs/heads/trunk
Commit: 806facc8ca87a8d1f6fa14056c68ac43dc5bde5c
Parents: 6145d50
Author: Branimir Lambov <br...@datastax.com>
Authored: Fri Jan 30 00:46:44 2015 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Jan 30 00:48:12 2015 +0300
----------------------------------------------------------------------
CHANGES.txt | 3 +-
.../apache/cassandra/db/PagedRangeCommand.java | 3 +-
.../apache/cassandra/db/RangeSliceCommand.java | 3 +-
.../org/apache/cassandra/db/RowPosition.java | 16 ++---
.../apache/cassandra/dht/AbstractBounds.java | 33 +++++------
.../dht/IPartitionerDependentSerializer.java | 61 ++++++++++++++++++++
src/java/org/apache/cassandra/dht/Token.java | 19 +++---
.../apache/cassandra/net/MessagingService.java | 14 +++++
.../apache/cassandra/repair/RepairJobDesc.java | 3 +-
.../repair/messages/AnticompactionRequest.java | 10 +++-
.../repair/messages/PrepareMessage.java | 8 ++-
.../repair/messages/RepairMessage.java | 2 +-
.../cassandra/repair/messages/SyncRequest.java | 6 +-
.../cassandra/streaming/StreamRequest.java | 14 +++--
.../org/apache/cassandra/utils/MerkleTree.java | 43 +++++++-------
15 files changed, 163 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0cd0b4d..a85a6e7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0
+ * Avoid accessing partitioner through StorageProxy (CASSANDRA-8244, 8268)
* Upgrade Metrics library and remove depricated metrics (CASSANDRA-5657)
* Serializing Row cache alternative, fully off heap (CASSANDRA-7438)
* Duplicate rows returned when in clause has repeated values (CASSANDRA-6707)
@@ -137,8 +138,6 @@
* Force config client mode in CQLSSTableWriter (CASSANDRA-8281)
* Fix sstableupgrade throws exception (CASSANDRA-8688)
Merged from 2.0:
-=======
-2.0.13:
* Fix SSTableSimpleUnsortedWriter ConcurrentModificationException (CASSANDRA-8619)
* Round up time deltas lower than 1ms in BulkLoader (CASSANDRA-8645)
* Add batch remove iterator to ABSC (CASSANDRA-8414, 8666)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/src/java/org/apache/cassandra/db/PagedRangeCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PagedRangeCommand.java b/src/java/org/apache/cassandra/db/PagedRangeCommand.java
index 614f0f7..ebedecf 100644
--- a/src/java/org/apache/cassandra/db/PagedRangeCommand.java
+++ b/src/java/org/apache/cassandra/db/PagedRangeCommand.java
@@ -129,6 +129,7 @@ public class PagedRangeCommand extends AbstractRangeCommand
out.writeUTF(cmd.columnFamily);
out.writeLong(cmd.timestamp);
+ MessagingService.validatePartitioner(cmd.keyRange);
AbstractBounds.serializer.serialize(cmd.keyRange, out, version);
CFMetaData metadata = Schema.instance.getCFMetaData(cmd.keyspace, cmd.columnFamily);
@@ -158,7 +159,7 @@ public class PagedRangeCommand extends AbstractRangeCommand
String columnFamily = in.readUTF();
long timestamp = in.readLong();
- AbstractBounds<RowPosition> keyRange = AbstractBounds.serializer.deserialize(in, version).toRowBounds();
+ AbstractBounds<RowPosition> keyRange = AbstractBounds.serializer.deserialize(in, MessagingService.globalPartitioner(), version).toRowBounds();
CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/src/java/org/apache/cassandra/db/RangeSliceCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeSliceCommand.java b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
index 4d2955b..6009524 100644
--- a/src/java/org/apache/cassandra/db/RangeSliceCommand.java
+++ b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
@@ -172,6 +172,7 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
expr.writeTo(out);
}
}
+ MessagingService.validatePartitioner(sliceCommand.keyRange);
AbstractBounds.serializer.serialize(sliceCommand.keyRange, out, version);
out.writeInt(sliceCommand.maxResults);
out.writeBoolean(sliceCommand.countCQL3Rows);
@@ -195,7 +196,7 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
{
rowFilter.add(IndexExpression.readFrom(in));
}
- AbstractBounds<RowPosition> range = AbstractBounds.serializer.deserialize(in, version).toRowBounds();
+ AbstractBounds<RowPosition> range = AbstractBounds.serializer.deserialize(in, MessagingService.globalPartitioner(), version).toRowBounds();
int maxResults = in.readInt();
boolean countCQL3Rows = in.readBoolean();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/src/java/org/apache/cassandra/db/RowPosition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowPosition.java b/src/java/org/apache/cassandra/db/RowPosition.java
index 3bcd627..3fa0465 100644
--- a/src/java/org/apache/cassandra/db/RowPosition.java
+++ b/src/java/org/apache/cassandra/db/RowPosition.java
@@ -56,7 +56,7 @@ public interface RowPosition extends RingPosition<RowPosition>
public Kind kind();
public boolean isMinimum();
- public static class RowPositionSerializer implements ISerializer<RowPosition>
+ public static class RowPositionSerializer implements IPartitionerDependentSerializer<RowPosition>
{
/*
* We need to be able to serialize both Token.KeyBound and
@@ -69,17 +69,17 @@ public interface RowPosition extends RingPosition<RowPosition>
* token is recreated on the other side). In the other cases, we then
* serialize the token.
*/
- public void serialize(RowPosition pos, DataOutputPlus out) throws IOException
+ public void serialize(RowPosition pos, DataOutputPlus out, int version) throws IOException
{
Kind kind = pos.kind();
out.writeByte(kind.ordinal());
if (kind == Kind.ROW_KEY)
ByteBufferUtil.writeWithShortLength(((DecoratedKey)pos).getKey(), out);
else
- Token.serializer.serialize(pos.getToken(), out);
+ Token.serializer.serialize(pos.getToken(), out, version);
}
- public RowPosition deserialize(DataInput in) throws IOException
+ public RowPosition deserialize(DataInput in, IPartitioner p, int version) throws IOException
{
Kind kind = Kind.fromOrdinal(in.readByte());
if (kind == Kind.ROW_KEY)
@@ -89,23 +89,23 @@ public interface RowPosition extends RingPosition<RowPosition>
}
else
{
- Token t = Token.serializer.deserialize(in);
+ Token t = Token.serializer.deserialize(in, p, version);
return kind == Kind.MIN_BOUND ? t.minKeyBound() : t.maxKeyBound();
}
}
- public long serializedSize(RowPosition pos, TypeSizes typeSizes)
+ public long serializedSize(RowPosition pos, int version)
{
Kind kind = pos.kind();
int size = 1; // 1 byte for enum
if (kind == Kind.ROW_KEY)
{
int keySize = ((DecoratedKey)pos).getKey().remaining();
- size += typeSizes.sizeof((short) keySize) + keySize;
+ size += TypeSizes.NATIVE.sizeof((short) keySize) + keySize;
}
else
{
- size += Token.serializer.serializedSize(pos.getToken(), typeSizes);
+ size += Token.serializer.serializedSize(pos.getToken(), version);
}
return size;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/src/java/org/apache/cassandra/dht/AbstractBounds.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/AbstractBounds.java b/src/java/org/apache/cassandra/dht/AbstractBounds.java
index c7a3505..f045acf 100644
--- a/src/java/org/apache/cassandra/dht/AbstractBounds.java
+++ b/src/java/org/apache/cassandra/dht/AbstractBounds.java
@@ -20,13 +20,12 @@ package org.apache.cassandra.dht;
import java.io.DataInput;
import java.io.IOException;
import java.io.Serializable;
-import java.util.*;
+import java.util.List;
import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.db.RowPosition;
+import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.utils.Pair;
@@ -122,7 +121,7 @@ public abstract class AbstractBounds<T extends RingPosition<T>> implements Seria
public abstract AbstractBounds<T> withNewRight(T newRight);
- public static class AbstractBoundsSerializer implements IVersionedSerializer<AbstractBounds<?>>
+ public static class AbstractBoundsSerializer implements IPartitionerDependentSerializer<AbstractBounds<?>>
{
public void serialize(AbstractBounds<?> range, DataOutputPlus out, int version) throws IOException
{
@@ -133,13 +132,13 @@ public abstract class AbstractBounds<T extends RingPosition<T>> implements Seria
out.writeInt(kindInt(range));
if (range.left instanceof Token)
{
- Token.serializer.serialize((Token) range.left, out);
- Token.serializer.serialize((Token) range.right, out);
+ Token.serializer.serialize((Token) range.left, out, version);
+ Token.serializer.serialize((Token) range.right, out, version);
}
else
{
- RowPosition.serializer.serialize((RowPosition) range.left, out);
- RowPosition.serializer.serialize((RowPosition) range.right, out);
+ RowPosition.serializer.serialize((RowPosition) range.left, out, version);
+ RowPosition.serializer.serialize((RowPosition) range.right, out, version);
}
}
@@ -151,7 +150,7 @@ public abstract class AbstractBounds<T extends RingPosition<T>> implements Seria
return kind;
}
- public AbstractBounds<?> deserialize(DataInput in, int version) throws IOException
+ public AbstractBounds<?> deserialize(DataInput in, IPartitioner p, int version) throws IOException
{
int kind = in.readInt();
boolean isToken = kind >= 0;
@@ -161,13 +160,13 @@ public abstract class AbstractBounds<T extends RingPosition<T>> implements Seria
RingPosition<?> left, right;
if (isToken)
{
- left = Token.serializer.deserialize(in);
- right = Token.serializer.deserialize(in);
+ left = Token.serializer.deserialize(in, p, version);
+ right = Token.serializer.deserialize(in, p, version);
}
else
{
- left = RowPosition.serializer.deserialize(in);
- right = RowPosition.serializer.deserialize(in);
+ left = RowPosition.serializer.deserialize(in, p, version);
+ right = RowPosition.serializer.deserialize(in, p, version);
}
if (kind == Type.RANGE.ordinal())
@@ -180,13 +179,13 @@ public abstract class AbstractBounds<T extends RingPosition<T>> implements Seria
int size = TypeSizes.NATIVE.sizeof(kindInt(ab));
if (ab.left instanceof Token)
{
- size += Token.serializer.serializedSize((Token) ab.left, TypeSizes.NATIVE);
- size += Token.serializer.serializedSize((Token) ab.right, TypeSizes.NATIVE);
+ size += Token.serializer.serializedSize((Token) ab.left, version);
+ size += Token.serializer.serializedSize((Token) ab.right, version);
}
else
{
- size += RowPosition.serializer.serializedSize((RowPosition) ab.left, TypeSizes.NATIVE);
- size += RowPosition.serializer.serializedSize((RowPosition) ab.right, TypeSizes.NATIVE);
+ size += RowPosition.serializer.serializedSize((RowPosition) ab.left, version);
+ size += RowPosition.serializer.serializedSize((RowPosition) ab.right, version);
}
return size;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/src/java/org/apache/cassandra/dht/IPartitionerDependentSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/IPartitionerDependentSerializer.java b/src/java/org/apache/cassandra/dht/IPartitionerDependentSerializer.java
new file mode 100644
index 0000000..3a9a768
--- /dev/null
+++ b/src/java/org/apache/cassandra/dht/IPartitionerDependentSerializer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.dht;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+/**
+ * Versioned serializer where the serialization depends on partitioner.
+ *
+ * On serialization the partitioner is given by the entity being serialized. To deserialize the partitioner used must
+ * be known to the calling method.
+ */
+public interface IPartitionerDependentSerializer<T>
+{
+ /**
+ * Serialize the specified type into the specified DataOutputStream instance.
+ *
+ * @param t type that needs to be serialized
+ * @param out DataOutput into which serialization needs to happen.
+ * @param version protocol version
+ * @throws java.io.IOException if serialization fails
+ */
+ public void serialize(T t, DataOutputPlus out, int version) throws IOException;
+
+ /**
+ * Deserialize into the specified DataInputStream instance.
+ * @param in DataInput from which deserialization needs to happen.
+ * @param p Partitioner that will be used to construct tokens. Needs to match the partitioner that was used to
+ * serialize the token.
+ * @param version protocol version
+ * @return the type that was deserialized
+ * @throws IOException if deserialization fails
+ */
+ public T deserialize(DataInput in, IPartitioner p, int version) throws IOException;
+
+ /**
+ * Calculate serialized size of object without actually serializing.
+ * @param t object to calculate serialized size
+ * @param version protocol version
+ * @return serialized size of object t
+ */
+ public long serializedSize(T t, int version);
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/src/java/org/apache/cassandra/dht/Token.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Token.java b/src/java/org/apache/cassandra/dht/Token.java
index 719fd46..76918a7 100644
--- a/src/java/org/apache/cassandra/dht/Token.java
+++ b/src/java/org/apache/cassandra/dht/Token.java
@@ -22,12 +22,10 @@ import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.db.RowPosition;
-import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
public abstract class Token implements RingPosition<Token>, Serializable
@@ -46,27 +44,26 @@ public abstract class Token implements RingPosition<Token>, Serializable
public abstract void validate(String token) throws ConfigurationException;
}
- public static class TokenSerializer implements ISerializer<Token>
+ public static class TokenSerializer implements IPartitionerDependentSerializer<Token>
{
- public void serialize(Token token, DataOutputPlus out) throws IOException
+ public void serialize(Token token, DataOutputPlus out, int version) throws IOException
{
- IPartitioner p = StorageService.getPartitioner();
+ IPartitioner p = token.getPartitioner();
ByteBuffer b = p.getTokenFactory().toByteArray(token);
ByteBufferUtil.writeWithLength(b, out);
}
- public Token deserialize(DataInput in) throws IOException
+ public Token deserialize(DataInput in, IPartitioner p, int version) throws IOException
{
- IPartitioner p = StorageService.getPartitioner();
int size = in.readInt();
byte[] bytes = new byte[size];
in.readFully(bytes);
return p.getTokenFactory().fromByteArray(ByteBuffer.wrap(bytes));
}
- public long serializedSize(Token object, TypeSizes typeSizes)
+ public long serializedSize(Token object, int version)
{
- IPartitioner p = StorageService.getPartitioner();
+ IPartitioner p = object.getPartitioner();
ByteBuffer b = p.getTokenFactory().toByteArray(object);
return TypeSizes.NATIVE.sizeof(b.remaining()) + b.remaining();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index b33cf81..c333b04 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -35,6 +35,7 @@ import com.google.common.base.Function;
import com.google.common.collect.Lists;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,7 +46,9 @@ import org.apache.cassandra.concurrent.TracingAwareExecutorService;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.BootStrapper;
+import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.gms.EchoMessage;
import org.apache.cassandra.gms.GossipDigestAck;
@@ -1039,4 +1042,15 @@ public final class MessagingService implements MessagingServiceMBean
}
return result;
}
+
+ public static IPartitioner globalPartitioner()
+ {
+ return DatabaseDescriptor.getPartitioner();
+ }
+
+ public static void validatePartitioner(AbstractBounds<?> bounds)
+ {
+ if (globalPartitioner() != bounds.left.getPartitioner())
+ throw new AssertionError();
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/src/java/org/apache/cassandra/repair/RepairJobDesc.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJobDesc.java b/src/java/org/apache/cassandra/repair/RepairJobDesc.java
index 5ce5969..c4a713d 100644
--- a/src/java/org/apache/cassandra/repair/RepairJobDesc.java
+++ b/src/java/org/apache/cassandra/repair/RepairJobDesc.java
@@ -100,6 +100,7 @@ public class RepairJobDesc
UUIDSerializer.serializer.serialize(desc.sessionId, out, version);
out.writeUTF(desc.keyspace);
out.writeUTF(desc.columnFamily);
+ MessagingService.validatePartitioner(desc.range);
AbstractBounds.serializer.serialize(desc.range, out, version);
}
@@ -114,7 +115,7 @@ public class RepairJobDesc
UUID sessionId = UUIDSerializer.serializer.deserialize(in, version);
String keyspace = in.readUTF();
String columnFamily = in.readUTF();
- Range<Token> range = (Range<Token>)AbstractBounds.serializer.deserialize(in, version);
+ Range<Token> range = (Range<Token>)AbstractBounds.serializer.deserialize(in, MessagingService.globalPartitioner(), version).toTokenBounds();
return new RepairJobDesc(parentSessionId, sessionId, keyspace, columnFamily, range);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java b/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java
index 239ab0e..455e5fb 100644
--- a/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java
+++ b/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java
@@ -27,6 +27,7 @@ import java.util.UUID;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.UUIDSerializer;
public class AnticompactionRequest extends RepairMessage
@@ -51,8 +52,11 @@ public class AnticompactionRequest extends RepairMessage
{
UUIDSerializer.serializer.serialize(message.parentRepairSession, out, version);
out.writeInt(message.successfulRanges.size());
- for (Range r : message.successfulRanges)
+ for (Range<Token> r : message.successfulRanges)
+ {
+ MessagingService.validatePartitioner(r);
Range.serializer.serialize(r, out, version);
+ }
}
public AnticompactionRequest deserialize(DataInput in, int version) throws IOException
@@ -61,14 +65,14 @@ public class AnticompactionRequest extends RepairMessage
int rangeCount = in.readInt();
List<Range<Token>> ranges = new ArrayList<>(rangeCount);
for (int i = 0; i < rangeCount; i++)
- ranges.add((Range<Token>) Range.serializer.deserialize(in, version).toTokenBounds());
+ ranges.add((Range<Token>) Range.serializer.deserialize(in, MessagingService.globalPartitioner(), version).toTokenBounds());
return new AnticompactionRequest(parentRepairSession, ranges);
}
public long serializedSize(AnticompactionRequest message, int version)
{
long size = UUIDSerializer.serializer.serializedSize(message.parentRepairSession, version);
- for (Range r : message.successfulRanges)
+ for (Range<Token> r : message.successfulRanges)
size += Range.serializer.serializedSize(r, version);
return size;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
index 035ccc4..d63bf70 100644
--- a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.UUIDSerializer;
@@ -58,8 +59,11 @@ public class PrepareMessage extends RepairMessage
UUIDSerializer.serializer.serialize(cfId, out, version);
UUIDSerializer.serializer.serialize(message.parentRepairSession, out, version);
out.writeInt(message.ranges.size());
- for (Range r : message.ranges)
+ for (Range<Token> r : message.ranges)
+ {
+ MessagingService.validatePartitioner(r);
Range.serializer.serialize(r, out, version);
+ }
out.writeBoolean(message.isIncremental);
}
@@ -73,7 +77,7 @@ public class PrepareMessage extends RepairMessage
int rangeCount = in.readInt();
List<Range<Token>> ranges = new ArrayList<>(rangeCount);
for (int i = 0; i < rangeCount; i++)
- ranges.add((Range<Token>) Range.serializer.deserialize(in, version).toTokenBounds());
+ ranges.add((Range<Token>) Range.serializer.deserialize(in, MessagingService.globalPartitioner(), version).toTokenBounds());
boolean isIncremental = in.readBoolean();
return new PrepareMessage(parentRepairSession, cfIds, ranges, isIncremental);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
index d500928..6af3bb3 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
@@ -81,7 +81,7 @@ public abstract class RepairMessage
return new MessageOut<>(MessagingService.Verb.REPAIR_MESSAGE, this, RepairMessage.serializer);
}
- public static class RepairMessageSerializer implements IVersionedSerializer<RepairMessage>
+ public static class RepairMessageSerializer implements MessageSerializer<RepairMessage>
{
public void serialize(RepairMessage message, DataOutputPlus out, int version) throws IOException
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
index c4d0ab6..077132a 100644
--- a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
+++ b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.net.CompactEndpointSerializationHelper;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.RepairJobDesc;
/**
@@ -66,7 +67,10 @@ public class SyncRequest extends RepairMessage
CompactEndpointSerializationHelper.serialize(message.dst, out);
out.writeInt(message.ranges.size());
for (Range<Token> range : message.ranges)
+ {
+ MessagingService.validatePartitioner(range);
AbstractBounds.serializer.serialize(range, out, version);
+ }
}
public SyncRequest deserialize(DataInput in, int version) throws IOException
@@ -78,7 +82,7 @@ public class SyncRequest extends RepairMessage
int rangesCount = in.readInt();
List<Range<Token>> ranges = new ArrayList<>(rangesCount);
for (int i = 0; i < rangesCount; ++i)
- ranges.add((Range<Token>) AbstractBounds.serializer.deserialize(in, version).toTokenBounds());
+ ranges.add((Range<Token>) AbstractBounds.serializer.deserialize(in, MessagingService.globalPartitioner(), version).toTokenBounds());
return new SyncRequest(desc, owner, src, dst, ranges);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/src/java/org/apache/cassandra/streaming/StreamRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamRequest.java b/src/java/org/apache/cassandra/streaming/StreamRequest.java
index 9c5b974..0fe40cf 100644
--- a/src/java/org/apache/cassandra/streaming/StreamRequest.java
+++ b/src/java/org/apache/cassandra/streaming/StreamRequest.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
public class StreamRequest
{
@@ -55,8 +56,9 @@ public class StreamRequest
out.writeInt(request.ranges.size());
for (Range<Token> range : request.ranges)
{
- Token.serializer.serialize(range.left, out);
- Token.serializer.serialize(range.right, out);
+ MessagingService.validatePartitioner(range);
+ Token.serializer.serialize(range.left, out, version);
+ Token.serializer.serialize(range.right, out, version);
}
out.writeInt(request.columnFamilies.size());
for (String cf : request.columnFamilies)
@@ -71,8 +73,8 @@ public class StreamRequest
List<Range<Token>> ranges = new ArrayList<>(rangeCount);
for (int i = 0; i < rangeCount; i++)
{
- Token left = Token.serializer.deserialize(in);
- Token right = Token.serializer.deserialize(in);
+ Token left = Token.serializer.deserialize(in, MessagingService.globalPartitioner(), version);
+ Token right = Token.serializer.deserialize(in, MessagingService.globalPartitioner(), version);
ranges.add(new Range<>(left, right));
}
int cfCount = in.readInt();
@@ -89,8 +91,8 @@ public class StreamRequest
size += TypeSizes.NATIVE.sizeof(request.ranges.size());
for (Range<Token> range : request.ranges)
{
- size += Token.serializer.serializedSize(range.left, TypeSizes.NATIVE);
- size += Token.serializer.serializedSize(range.right, TypeSizes.NATIVE);
+ size += Token.serializer.serializedSize(range.left, version);
+ size += Token.serializer.serializedSize(range.right, version);
}
size += TypeSizes.NATIVE.sizeof(request.columnFamilies.size());
for (String cf : request.columnFamilies)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/src/java/org/apache/cassandra/utils/MerkleTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/MerkleTree.java b/src/java/org/apache/cassandra/utils/MerkleTree.java
index 394b12a..4fec62d 100644
--- a/src/java/org/apache/cassandra/utils/MerkleTree.java
+++ b/src/java/org/apache/cassandra/utils/MerkleTree.java
@@ -28,6 +28,7 @@ import com.google.common.collect.PeekingIterator;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.IPartitionerDependentSerializer;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
@@ -85,8 +86,8 @@ public class MerkleTree implements Serializable
out.writeLong(mt.size);
out.writeUTF(mt.partitioner.getClass().getCanonicalName());
// full range
- Token.serializer.serialize(mt.fullRange.left, out);
- Token.serializer.serialize(mt.fullRange.right, out);
+ Token.serializer.serialize(mt.fullRange.left, out, version);
+ Token.serializer.serialize(mt.fullRange.right, out, version);
Hashable.serializer.serialize(mt.root, out, version);
}
@@ -106,13 +107,13 @@ public class MerkleTree implements Serializable
}
// full range
- Token left = Token.serializer.deserialize(in);
- Token right = Token.serializer.deserialize(in);
+ Token left = Token.serializer.deserialize(in, partitioner, version);
+ Token right = Token.serializer.deserialize(in, partitioner, version);
Range<Token> fullRange = new Range<>(left, right);
MerkleTree mt = new MerkleTree(partitioner, fullRange, hashdepth, maxsize);
mt.size = size;
- mt.root = Hashable.serializer.deserialize(in, version);
+ mt.root = Hashable.serializer.deserialize(in, partitioner, version);
return mt;
}
@@ -124,8 +125,8 @@ public class MerkleTree implements Serializable
+ TypeSizes.NATIVE.sizeof(mt.partitioner.getClass().getCanonicalName());
// full range
- size += Token.serializer.serializedSize(mt.fullRange.left, TypeSizes.NATIVE);
- size += Token.serializer.serializedSize(mt.fullRange.right, TypeSizes.NATIVE);
+ size += Token.serializer.serializedSize(mt.fullRange.left, version);
+ size += Token.serializer.serializedSize(mt.fullRange.right, version);
size += Hashable.serializer.serializedSize(mt.root, version);
return size;
@@ -811,7 +812,7 @@ public class MerkleTree implements Serializable
return buff.toString();
}
- private static class InnerSerializer implements IVersionedSerializer<Inner>
+ private static class InnerSerializer implements IPartitionerDependentSerializer<Inner>
{
public void serialize(Inner inner, DataOutputPlus out, int version) throws IOException
{
@@ -822,20 +823,20 @@ public class MerkleTree implements Serializable
out.writeInt(inner.hash.length);
out.write(inner.hash);
}
- Token.serializer.serialize(inner.token, out);
+ Token.serializer.serialize(inner.token, out, version);
Hashable.serializer.serialize(inner.lchild, out, version);
Hashable.serializer.serialize(inner.rchild, out, version);
}
- public Inner deserialize(DataInput in, int version) throws IOException
+ public Inner deserialize(DataInput in, IPartitioner p, int version) throws IOException
{
int hashLen = in.readInt();
byte[] hash = hashLen >= 0 ? new byte[hashLen] : null;
if (hash != null)
in.readFully(hash);
- Token token = Token.serializer.deserialize(in);
- Hashable lchild = Hashable.serializer.deserialize(in, version);
- Hashable rchild = Hashable.serializer.deserialize(in, version);
+ Token token = Token.serializer.deserialize(in, p, version);
+ Hashable lchild = Hashable.serializer.deserialize(in, p, version);
+ Hashable rchild = Hashable.serializer.deserialize(in, p, version);
return new Inner(token, lchild, rchild);
}
@@ -845,7 +846,7 @@ public class MerkleTree implements Serializable
? TypeSizes.NATIVE.sizeof(-1)
: TypeSizes.NATIVE.sizeof(inner.hash().length) + inner.hash().length;
- size += Token.serializer.serializedSize(inner.token, TypeSizes.NATIVE)
+ size += Token.serializer.serializedSize(inner.token, version)
+ Hashable.serializer.serializedSize(inner.lchild, version)
+ Hashable.serializer.serializedSize(inner.rchild, version);
return size;
@@ -892,7 +893,7 @@ public class MerkleTree implements Serializable
return "#<Leaf " + Hashable.toString(hash()) + ">";
}
- private static class LeafSerializer implements IVersionedSerializer<Leaf>
+ private static class LeafSerializer implements IPartitionerDependentSerializer<Leaf>
{
public void serialize(Leaf leaf, DataOutputPlus out, int version) throws IOException
{
@@ -907,7 +908,7 @@ public class MerkleTree implements Serializable
}
}
- public Leaf deserialize(DataInput in, int version) throws IOException
+ public Leaf deserialize(DataInput in, IPartitioner p, int version) throws IOException
{
int hashLen = in.readInt();
byte[] hash = hashLen < 0 ? null : new byte[hashLen];
@@ -955,7 +956,7 @@ public class MerkleTree implements Serializable
static abstract class Hashable implements Serializable
{
private static final long serialVersionUID = 1L;
- private static final IVersionedSerializer<Hashable> serializer = new HashableSerializer();
+ private static final IPartitionerDependentSerializer<Hashable> serializer = new HashableSerializer();
protected byte[] hash;
protected long sizeOfRange;
@@ -1033,7 +1034,7 @@ public class MerkleTree implements Serializable
return "[" + Hex.bytesToHex(hash) + "]";
}
- private static class HashableSerializer implements IVersionedSerializer<Hashable>
+ private static class HashableSerializer implements IPartitionerDependentSerializer<Hashable>
{
public void serialize(Hashable h, DataOutputPlus out, int version) throws IOException
{
@@ -1051,13 +1052,13 @@ public class MerkleTree implements Serializable
throw new IOException("Unexpected Hashable: " + h.getClass().getCanonicalName());
}
- public Hashable deserialize(DataInput in, int version) throws IOException
+ public Hashable deserialize(DataInput in, IPartitioner p, int version) throws IOException
{
byte ident = in.readByte();
if (Inner.IDENT == ident)
- return Inner.serializer.deserialize(in, version);
+ return Inner.serializer.deserialize(in, p, version);
else if (Leaf.IDENT == ident)
- return Leaf.serializer.deserialize(in, version);
+ return Leaf.serializer.deserialize(in, p, version);
else
throw new IOException("Unexpected Hashable: " + ident);
}