You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2016/11/21 14:52:33 UTC
[2/6] cassandra git commit: AnticompactionRequestSerializer
serializedSize is incorrect
AnticompactionRequestSerializer serializedSize is incorrect
patch by jasobrown; reviewed by pcmanus for CASSANDRA-12934
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3fd4c688
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3fd4c688
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3fd4c688
Branch: refs/heads/cassandra-3.X
Commit: 3fd4c68803ddf0d20d23b37d4b936258f8420209
Parents: 59b40b3
Author: Jason Brown <ja...@gmail.com>
Authored: Fri Nov 18 18:13:45 2016 -0800
Committer: Jason Brown <ja...@gmail.com>
Committed: Mon Nov 21 06:37:56 2016 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../repair/messages/AnticompactionRequest.java | 19 ++
.../repair/messages/CleanupMessage.java | 17 ++
.../repair/messages/PrepareMessage.java | 22 +++
.../repair/messages/SnapshotMessage.java | 16 ++
.../cassandra/repair/messages/SyncComplete.java | 19 ++
.../cassandra/repair/messages/SyncRequest.java | 21 +++
.../repair/messages/ValidationComplete.java | 18 ++
.../RepairMessageSerializationsTest.java | 187 +++++++++++++++++++
9 files changed, 320 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3fd4c688/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bcd0b5c..e613d7c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.11
+ * AnticompactionRequestSerializer serializedSize is incorrect (CASSANDRA-12934)
* Prevent reloading of logback.xml from UDF sandbox (CASSANDRA-12535)
Merged from 2.2:
* Avoid blocking gossip during pending range calculation (CASSANDRA-12281)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3fd4c688/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java b/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java
index 3e47374..a29cc87 100644
--- a/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java
+++ b/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Objects;
import java.util.UUID;
import org.apache.cassandra.dht.Range;
@@ -46,6 +47,23 @@ public class AnticompactionRequest extends RepairMessage
this.successfulRanges = ranges;
}
+ @Override
+ public boolean equals(Object o)
+ {
+ if (!(o instanceof AnticompactionRequest))
+ return false;
+ AnticompactionRequest other = (AnticompactionRequest)o;
+ return messageType == other.messageType &&
+ parentRepairSession.equals(other.parentRepairSession) &&
+ successfulRanges.equals(other.successfulRanges);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(messageType, parentRepairSession, successfulRanges);
+ }
+
public static class AnticompactionRequestSerializer implements MessageSerializer<AnticompactionRequest>
{
public void serialize(AnticompactionRequest message, DataOutputPlus out, int version) throws IOException
@@ -72,6 +90,7 @@ public class AnticompactionRequest extends RepairMessage
public long serializedSize(AnticompactionRequest message, int version)
{
long size = UUIDSerializer.serializer.serializedSize(message.parentRepairSession, version);
+ size += Integer.BYTES; // count of items in successfulRanges
for (Range<Token> r : message.successfulRanges)
size += Range.tokenSerializer.serializedSize(r, version);
return size;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3fd4c688/src/java/org/apache/cassandra/repair/messages/CleanupMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/CleanupMessage.java b/src/java/org/apache/cassandra/repair/messages/CleanupMessage.java
index 43a8f02..69d147a 100644
--- a/src/java/org/apache/cassandra/repair/messages/CleanupMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/CleanupMessage.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.repair.messages;
import java.io.IOException;
+import java.util.Objects;
import java.util.UUID;
import org.apache.cassandra.io.util.DataInputPlus;
@@ -40,6 +41,22 @@ public class CleanupMessage extends RepairMessage
this.parentRepairSession = parentRepairSession;
}
+ @Override
+ public boolean equals(Object o)
+ {
+ if (!(o instanceof CleanupMessage))
+ return false;
+ CleanupMessage other = (CleanupMessage) o;
+ return messageType == other.messageType &&
+ parentRepairSession.equals(other.parentRepairSession);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(messageType, parentRepairSession);
+ }
+
public static class CleanupMessageSerializer implements MessageSerializer<CleanupMessage>
{
public void serialize(CleanupMessage message, DataOutputPlus out, int version) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3fd4c688/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
index 8909f1b..b3efeae 100644
--- a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Objects;
import java.util.UUID;
import org.apache.cassandra.db.TypeSizes;
@@ -54,6 +55,27 @@ public class PrepareMessage extends RepairMessage
this.isGlobal = isGlobal;
}
+ @Override
+ public boolean equals(Object o)
+ {
+ if (!(o instanceof PrepareMessage))
+ return false;
+ PrepareMessage other = (PrepareMessage) o;
+ return messageType == other.messageType &&
+ parentRepairSession.equals(other.parentRepairSession) &&
+ isIncremental == other.isIncremental &&
+ isGlobal == other.isGlobal &&
+ timestamp == other.timestamp &&
+ cfIds.equals(other.cfIds) &&
+ ranges.equals(other.ranges);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(messageType, parentRepairSession, isGlobal, isIncremental, timestamp, cfIds, ranges);
+ }
+
public static class PrepareMessageSerializer implements MessageSerializer<PrepareMessage>
{
public void serialize(PrepareMessage message, DataOutputPlus out, int version) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3fd4c688/src/java/org/apache/cassandra/repair/messages/SnapshotMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/SnapshotMessage.java b/src/java/org/apache/cassandra/repair/messages/SnapshotMessage.java
index 1b15126..d4737d3 100644
--- a/src/java/org/apache/cassandra/repair/messages/SnapshotMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/SnapshotMessage.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.repair.messages;
import java.io.IOException;
+import java.util.Objects;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
@@ -32,6 +33,21 @@ public class SnapshotMessage extends RepairMessage
super(Type.SNAPSHOT, desc);
}
+ @Override
+ public boolean equals(Object o)
+ {
+ if (!(o instanceof SnapshotMessage))
+ return false;
+ SnapshotMessage other = (SnapshotMessage) o;
+ return messageType == other.messageType;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(messageType);
+ }
+
public static class SnapshotMessageSerializer implements MessageSerializer<SnapshotMessage>
{
public void serialize(SnapshotMessage message, DataOutputPlus out, int version) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3fd4c688/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/SyncComplete.java b/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
index 35cf5d4..178e710 100644
--- a/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
+++ b/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.repair.messages;
import java.io.IOException;
import java.net.InetAddress;
+import java.util.Objects;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.util.DataInputPlus;
@@ -53,6 +54,24 @@ public class SyncComplete extends RepairMessage
this.success = success;
}
+ @Override
+ public boolean equals(Object o)
+ {
+ if (!(o instanceof SyncComplete))
+ return false;
+ SyncComplete other = (SyncComplete)o;
+ return messageType == other.messageType &&
+ desc.equals(other.desc) &&
+ success == other.success &&
+ nodes.equals(other.nodes);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(messageType, desc, success, nodes);
+ }
+
private static class SyncCompleteSerializer implements MessageSerializer<SyncComplete>
{
public void serialize(SyncComplete message, DataOutputPlus out, int version) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3fd4c688/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
index 2c9799e..f79f482 100644
--- a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
+++ b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
@@ -22,6 +22,7 @@ import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Objects;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.dht.AbstractBounds;
@@ -57,6 +58,26 @@ public class SyncRequest extends RepairMessage
this.ranges = ranges;
}
+ @Override
+ public boolean equals(Object o)
+ {
+ if (!(o instanceof SyncRequest))
+ return false;
+ SyncRequest req = (SyncRequest)o;
+ return messageType == req.messageType &&
+ desc.equals(req.desc) &&
+ initiator.equals(req.initiator) &&
+ src.equals(req.src) &&
+ dst.equals(req.dst) &&
+ ranges.equals(ranges);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(messageType, desc, initiator, src, dst, ranges);
+ }
+
public static class SyncRequestSerializer implements MessageSerializer<SyncRequest>
{
public void serialize(SyncRequest message, DataOutputPlus out, int version) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3fd4c688/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java b/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java
index 90be8e5..704bffb 100644
--- a/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java
+++ b/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.repair.messages;
import java.io.IOException;
+import java.util.Objects;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.util.DataInputPlus;
@@ -55,6 +56,23 @@ public class ValidationComplete extends RepairMessage
return trees != null;
}
+ @Override
+ public boolean equals(Object o)
+ {
+ if (!(o instanceof ValidationComplete))
+ return false;
+
+ ValidationComplete other = (ValidationComplete)o;
+ return messageType == other.messageType &&
+ desc.equals(other.desc);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(messageType, desc);
+ }
+
private static class ValidationCompleteSerializer implements MessageSerializer<ValidationComplete>
{
public void serialize(ValidationComplete message, DataOutputPlus out, int version) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3fd4c688/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java b/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java
new file mode 100644
index 0000000..5dbed3f
--- /dev/null
+++ b/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.messages;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputBufferFixed;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.repair.NodePair;
+import org.apache.cassandra.repair.RepairJobDesc;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.MerkleTrees;
+
+public class RepairMessageSerializationsTest
+{
+ private static final int PROTOCOL_VERSION = MessagingService.current_version;
+ private static final int GC_BEFORE = 1000000;
+
+ private static IPartitioner originalPartitioner;
+
+ @BeforeClass
+ public static void before()
+ {
+ originalPartitioner = StorageService.instance.setPartitionerUnsafe(Murmur3Partitioner.instance);
+ }
+
+ @AfterClass
+ public static void after()
+ {
+ DatabaseDescriptor.setPartitionerUnsafe(originalPartitioner);
+ }
+
+ @Test
+ public void validationRequestMessage() throws IOException
+ {
+ RepairJobDesc jobDesc = buildRepairJobDesc();
+ ValidationRequest msg = new ValidationRequest(jobDesc, GC_BEFORE);
+ ValidationRequest deserialized = serializeRoundTrip(msg, ValidationRequest.serializer);
+ Assert.assertEquals(jobDesc, deserialized.desc);
+ }
+
+ private RepairJobDesc buildRepairJobDesc()
+ {
+ List<Range<Token>> tokenRanges = buildTokenRanges();
+ return new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "serializationsTestKeyspace", "repairMessages", tokenRanges);
+ }
+
+ private List<Range<Token>> buildTokenRanges()
+ {
+ List<Range<Token>> tokenRanges = new ArrayList<>(4);
+ tokenRanges.add(new Range<>(new LongToken(1000), new LongToken(1001)));
+ tokenRanges.add(new Range<>(new LongToken(2000), new LongToken(2001)));
+ tokenRanges.add(new Range<>(new LongToken(3000), new LongToken(3001)));
+ tokenRanges.add(new Range<>(new LongToken(4000), new LongToken(4001)));
+ return tokenRanges;
+ }
+
+ private <T extends RepairMessage> T serializeRoundTrip(T msg, IVersionedSerializer<T> serializer) throws IOException
+ {
+ long size = serializer.serializedSize(msg, PROTOCOL_VERSION);
+
+ ByteBuffer buf = ByteBuffer.allocate((int)size);
+ DataOutputPlus out = new DataOutputBufferFixed(buf);
+ serializer.serialize(msg, out, PROTOCOL_VERSION);
+ Assert.assertEquals(size, buf.position());
+
+ buf.flip();
+ DataInputPlus in = new DataInputBuffer(buf, false);
+ T deserialized = serializer.deserialize(in, PROTOCOL_VERSION);
+ Assert.assertEquals(msg, deserialized);
+ Assert.assertEquals(msg.hashCode(), deserialized.hashCode());
+ return deserialized;
+ }
+
+ @Test
+ public void validationCompleteMessage_NoMerkleTree() throws IOException
+ {
+ ValidationComplete deserialized = validationCompleteMessage(null);
+ Assert.assertNull(deserialized.trees);
+ }
+
+ @Test
+ public void validationCompleteMessage_WithMerkleTree() throws IOException
+ {
+ MerkleTrees trees = new MerkleTrees(Murmur3Partitioner.instance);
+ trees.addMerkleTree(256, new Range<>(new LongToken(1000), new LongToken(1001)));
+ ValidationComplete deserialized = validationCompleteMessage(trees);
+
+ // a simple check to make sure we got some merkle trees back.
+ Assert.assertEquals(trees.size(), deserialized.trees.size());
+ }
+
+ private ValidationComplete validationCompleteMessage(MerkleTrees trees) throws IOException
+ {
+ RepairJobDesc jobDesc = buildRepairJobDesc();
+ ValidationComplete msg = trees == null ?
+ new ValidationComplete(jobDesc) :
+ new ValidationComplete(jobDesc, trees);
+ ValidationComplete deserialized = serializeRoundTrip(msg, ValidationComplete.serializer);
+ return deserialized;
+ }
+
+ @Test
+ public void syncRequestMessage() throws IOException
+ {
+ InetAddress initiator = InetAddress.getByName("127.0.0.1");
+ InetAddress src = InetAddress.getByName("127.0.0.2");
+ InetAddress dst = InetAddress.getByName("127.0.0.3");
+
+ SyncRequest msg = new SyncRequest(buildRepairJobDesc(), initiator, src, dst, buildTokenRanges());
+ serializeRoundTrip(msg, SyncRequest.serializer);
+ }
+
+ @Test
+ public void syncCompleteMessage() throws IOException
+ {
+ InetAddress src = InetAddress.getByName("127.0.0.2");
+ InetAddress dst = InetAddress.getByName("127.0.0.3");
+ SyncComplete msg = new SyncComplete(buildRepairJobDesc(), new NodePair(src, dst), true);
+ serializeRoundTrip(msg, SyncComplete.serializer);
+ }
+
+ @Test
+ public void antiCompactionRequestMessage() throws IOException
+ {
+ AnticompactionRequest msg = new AnticompactionRequest(UUID.randomUUID(), buildTokenRanges());
+ serializeRoundTrip(msg, AnticompactionRequest.serializer);
+ }
+
+ @Test
+ public void prepareMessage() throws IOException
+ {
+ PrepareMessage msg = new PrepareMessage(UUID.randomUUID(), new ArrayList<UUID>() {{add(UUID.randomUUID());}},
+ buildTokenRanges(), true, 100000L, false);
+ serializeRoundTrip(msg, PrepareMessage.serializer);
+ }
+
+ @Test
+ public void snapshotMessage() throws IOException
+ {
+ SnapshotMessage msg = new SnapshotMessage(buildRepairJobDesc());
+ serializeRoundTrip(msg, SnapshotMessage.serializer);
+ }
+
+ @Test
+ public void cleanupMessage() throws IOException
+ {
+ CleanupMessage msg = new CleanupMessage(UUID.randomUUID());
+ serializeRoundTrip(msg, CleanupMessage.serializer);
+ }
+}