You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2016/11/21 14:52:33 UTC

[2/6] cassandra git commit: AnticompactionRequestSerializer serializedSize is incorrect

AnticompactionRequestSerializer serializedSize is incorrect

patch by jasobrown; reviewed by pcmanus for CASSANDRA-12934


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3fd4c688
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3fd4c688
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3fd4c688

Branch: refs/heads/cassandra-3.X
Commit: 3fd4c68803ddf0d20d23b37d4b936258f8420209
Parents: 59b40b3
Author: Jason Brown <ja...@gmail.com>
Authored: Fri Nov 18 18:13:45 2016 -0800
Committer: Jason Brown <ja...@gmail.com>
Committed: Mon Nov 21 06:37:56 2016 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../repair/messages/AnticompactionRequest.java  |  19 ++
 .../repair/messages/CleanupMessage.java         |  17 ++
 .../repair/messages/PrepareMessage.java         |  22 +++
 .../repair/messages/SnapshotMessage.java        |  16 ++
 .../cassandra/repair/messages/SyncComplete.java |  19 ++
 .../cassandra/repair/messages/SyncRequest.java  |  21 +++
 .../repair/messages/ValidationComplete.java     |  18 ++
 .../RepairMessageSerializationsTest.java        | 187 +++++++++++++++++++
 9 files changed, 320 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3fd4c688/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bcd0b5c..e613d7c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.11
+ * AnticompactionRequestSerializer serializedSize is incorrect (CASSANDRA-12934)
  * Prevent reloading of logback.xml from UDF sandbox (CASSANDRA-12535)
 Merged from 2.2:
  * Avoid blocking gossip during pending range calculation (CASSANDRA-12281)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3fd4c688/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java b/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java
index 3e47374..a29cc87 100644
--- a/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java
+++ b/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Objects;
 import java.util.UUID;
 
 import org.apache.cassandra.dht.Range;
@@ -46,6 +47,23 @@ public class AnticompactionRequest extends RepairMessage
         this.successfulRanges = ranges;
     }
 
+    @Override
+    public boolean equals(Object o)
+    {
+        if (!(o instanceof AnticompactionRequest))
+            return false;
+        AnticompactionRequest other = (AnticompactionRequest)o;
+        return messageType == other.messageType &&
+               parentRepairSession.equals(other.parentRepairSession) &&
+               successfulRanges.equals(other.successfulRanges);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(messageType, parentRepairSession, successfulRanges);
+    }
+
     public static class AnticompactionRequestSerializer implements MessageSerializer<AnticompactionRequest>
     {
         public void serialize(AnticompactionRequest message, DataOutputPlus out, int version) throws IOException
@@ -72,6 +90,7 @@ public class AnticompactionRequest extends RepairMessage
         public long serializedSize(AnticompactionRequest message, int version)
         {
             long size = UUIDSerializer.serializer.serializedSize(message.parentRepairSession, version);
+            size += Integer.BYTES; // count of items in successfulRanges
             for (Range<Token> r : message.successfulRanges)
                 size += Range.tokenSerializer.serializedSize(r, version);
             return size;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3fd4c688/src/java/org/apache/cassandra/repair/messages/CleanupMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/CleanupMessage.java b/src/java/org/apache/cassandra/repair/messages/CleanupMessage.java
index 43a8f02..69d147a 100644
--- a/src/java/org/apache/cassandra/repair/messages/CleanupMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/CleanupMessage.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.repair.messages;
 
 import java.io.IOException;
+import java.util.Objects;
 import java.util.UUID;
 
 import org.apache.cassandra.io.util.DataInputPlus;
@@ -40,6 +41,22 @@ public class CleanupMessage extends RepairMessage
         this.parentRepairSession = parentRepairSession;
     }
 
+    @Override
+    public boolean equals(Object o)
+    {
+        if (!(o instanceof CleanupMessage))
+            return false;
+        CleanupMessage other = (CleanupMessage) o;
+        return messageType == other.messageType &&
+               parentRepairSession.equals(other.parentRepairSession);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(messageType, parentRepairSession);
+    }
+
     public static class CleanupMessageSerializer implements MessageSerializer<CleanupMessage>
     {
         public void serialize(CleanupMessage message, DataOutputPlus out, int version) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3fd4c688/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
index 8909f1b..b3efeae 100644
--- a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Objects;
 import java.util.UUID;
 
 import org.apache.cassandra.db.TypeSizes;
@@ -54,6 +55,27 @@ public class PrepareMessage extends RepairMessage
         this.isGlobal = isGlobal;
     }
 
+    @Override
+    public boolean equals(Object o)
+    {
+        if (!(o instanceof PrepareMessage))
+            return false;
+        PrepareMessage other = (PrepareMessage) o;
+        return messageType == other.messageType &&
+               parentRepairSession.equals(other.parentRepairSession) &&
+               isIncremental == other.isIncremental &&
+               isGlobal == other.isGlobal &&
+               timestamp == other.timestamp &&
+               cfIds.equals(other.cfIds) &&
+               ranges.equals(other.ranges);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(messageType, parentRepairSession, isGlobal, isIncremental, timestamp, cfIds, ranges);
+    }
+
     public static class PrepareMessageSerializer implements MessageSerializer<PrepareMessage>
     {
         public void serialize(PrepareMessage message, DataOutputPlus out, int version) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3fd4c688/src/java/org/apache/cassandra/repair/messages/SnapshotMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/SnapshotMessage.java b/src/java/org/apache/cassandra/repair/messages/SnapshotMessage.java
index 1b15126..d4737d3 100644
--- a/src/java/org/apache/cassandra/repair/messages/SnapshotMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/SnapshotMessage.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.repair.messages;
 
 import java.io.IOException;
+import java.util.Objects;
 
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -32,6 +33,21 @@ public class SnapshotMessage extends RepairMessage
         super(Type.SNAPSHOT, desc);
     }
 
+    @Override
+    public boolean equals(Object o)
+    {
+        if (!(o instanceof SnapshotMessage))
+            return false;
+        SnapshotMessage other = (SnapshotMessage) o;
+        return messageType == other.messageType;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(messageType);
+    }
+
     public static class SnapshotMessageSerializer implements MessageSerializer<SnapshotMessage>
     {
         public void serialize(SnapshotMessage message, DataOutputPlus out, int version) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3fd4c688/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/SyncComplete.java b/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
index 35cf5d4..178e710 100644
--- a/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
+++ b/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.repair.messages;
 
 import java.io.IOException;
 import java.net.InetAddress;
+import java.util.Objects;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.util.DataInputPlus;
@@ -53,6 +54,24 @@ public class SyncComplete extends RepairMessage
         this.success = success;
     }
 
+    @Override
+    public boolean equals(Object o)
+    {
+        if (!(o instanceof SyncComplete))
+            return false;
+        SyncComplete other = (SyncComplete)o;
+        return messageType == other.messageType &&
+               desc.equals(other.desc) &&
+               success == other.success &&
+               nodes.equals(other.nodes);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(messageType, desc, success, nodes);
+    }
+
     private static class SyncCompleteSerializer implements MessageSerializer<SyncComplete>
     {
         public void serialize(SyncComplete message, DataOutputPlus out, int version) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3fd4c688/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
index 2c9799e..f79f482 100644
--- a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
+++ b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
@@ -22,6 +22,7 @@ import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Objects;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.dht.AbstractBounds;
@@ -57,6 +58,26 @@ public class SyncRequest extends RepairMessage
         this.ranges = ranges;
     }
 
+    @Override
+    public boolean equals(Object o)
+    {
+        if (!(o instanceof SyncRequest))
+            return false;
+        SyncRequest req = (SyncRequest)o;
+        return messageType == req.messageType &&
+               desc.equals(req.desc) &&
+               initiator.equals(req.initiator) &&
+               src.equals(req.src) &&
+               dst.equals(req.dst) &&
+               ranges.equals(ranges);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(messageType, desc, initiator, src, dst, ranges);
+    }
+
     public static class SyncRequestSerializer implements MessageSerializer<SyncRequest>
     {
         public void serialize(SyncRequest message, DataOutputPlus out, int version) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3fd4c688/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java b/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java
index 90be8e5..704bffb 100644
--- a/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java
+++ b/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.repair.messages;
 
 import java.io.IOException;
+import java.util.Objects;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.util.DataInputPlus;
@@ -55,6 +56,23 @@ public class ValidationComplete extends RepairMessage
         return trees != null;
     }
 
+    @Override
+    public boolean equals(Object o)
+    {
+        if (!(o instanceof ValidationComplete))
+            return false;
+
+        ValidationComplete other = (ValidationComplete)o;
+        return messageType == other.messageType &&
+               desc.equals(other.desc);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(messageType, desc);
+    }
+
     private static class ValidationCompleteSerializer implements MessageSerializer<ValidationComplete>
     {
         public void serialize(ValidationComplete message, DataOutputPlus out, int version) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3fd4c688/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java b/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java
new file mode 100644
index 0000000..5dbed3f
--- /dev/null
+++ b/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.messages;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputBufferFixed;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.repair.NodePair;
+import org.apache.cassandra.repair.RepairJobDesc;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.MerkleTrees;
+
+public class RepairMessageSerializationsTest
+{
+    private static final int PROTOCOL_VERSION = MessagingService.current_version;
+    private static final int GC_BEFORE = 1000000;
+
+    private static IPartitioner originalPartitioner;
+
+    @BeforeClass
+    public static void before()
+    {
+        originalPartitioner = StorageService.instance.setPartitionerUnsafe(Murmur3Partitioner.instance);
+    }
+
+    @AfterClass
+    public static void after()
+    {
+        DatabaseDescriptor.setPartitionerUnsafe(originalPartitioner);
+    }
+
+    @Test
+    public void validationRequestMessage() throws IOException
+    {
+        RepairJobDesc jobDesc = buildRepairJobDesc();
+        ValidationRequest msg = new ValidationRequest(jobDesc, GC_BEFORE);
+        ValidationRequest deserialized = serializeRoundTrip(msg, ValidationRequest.serializer);
+        Assert.assertEquals(jobDesc, deserialized.desc);
+    }
+
+    private RepairJobDesc buildRepairJobDesc()
+    {
+        List<Range<Token>> tokenRanges = buildTokenRanges();
+        return new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "serializationsTestKeyspace", "repairMessages", tokenRanges);
+    }
+
+    private List<Range<Token>> buildTokenRanges()
+    {
+        List<Range<Token>> tokenRanges = new ArrayList<>(4);
+        tokenRanges.add(new Range<>(new LongToken(1000), new LongToken(1001)));
+        tokenRanges.add(new Range<>(new LongToken(2000), new LongToken(2001)));
+        tokenRanges.add(new Range<>(new LongToken(3000), new LongToken(3001)));
+        tokenRanges.add(new Range<>(new LongToken(4000), new LongToken(4001)));
+        return tokenRanges;
+    }
+
+    private <T extends RepairMessage> T serializeRoundTrip(T msg, IVersionedSerializer<T> serializer) throws IOException
+    {
+        long size = serializer.serializedSize(msg, PROTOCOL_VERSION);
+
+        ByteBuffer buf = ByteBuffer.allocate((int)size);
+        DataOutputPlus out = new DataOutputBufferFixed(buf);
+        serializer.serialize(msg, out, PROTOCOL_VERSION);
+        Assert.assertEquals(size, buf.position());
+
+        buf.flip();
+        DataInputPlus in = new DataInputBuffer(buf, false);
+        T deserialized = serializer.deserialize(in, PROTOCOL_VERSION);
+        Assert.assertEquals(msg, deserialized);
+        Assert.assertEquals(msg.hashCode(), deserialized.hashCode());
+        return deserialized;
+    }
+
+    @Test
+    public void validationCompleteMessage_NoMerkleTree() throws IOException
+    {
+        ValidationComplete deserialized = validationCompleteMessage(null);
+        Assert.assertNull(deserialized.trees);
+    }
+
+    @Test
+    public void validationCompleteMessage_WithMerkleTree() throws IOException
+    {
+        MerkleTrees trees = new MerkleTrees(Murmur3Partitioner.instance);
+        trees.addMerkleTree(256, new Range<>(new LongToken(1000), new LongToken(1001)));
+        ValidationComplete deserialized = validationCompleteMessage(trees);
+
+        // a simple check to make sure we got some merkle trees back.
+        Assert.assertEquals(trees.size(), deserialized.trees.size());
+    }
+
+    private ValidationComplete validationCompleteMessage(MerkleTrees trees) throws IOException
+    {
+        RepairJobDesc jobDesc = buildRepairJobDesc();
+        ValidationComplete msg = trees == null ?
+                                 new ValidationComplete(jobDesc) :
+                                 new ValidationComplete(jobDesc, trees);
+        ValidationComplete deserialized = serializeRoundTrip(msg, ValidationComplete.serializer);
+        return deserialized;
+    }
+
+    @Test
+    public void syncRequestMessage() throws IOException
+    {
+        InetAddress initiator = InetAddress.getByName("127.0.0.1");
+        InetAddress src = InetAddress.getByName("127.0.0.2");
+        InetAddress dst = InetAddress.getByName("127.0.0.3");
+
+        SyncRequest msg = new SyncRequest(buildRepairJobDesc(), initiator, src, dst, buildTokenRanges());
+        serializeRoundTrip(msg, SyncRequest.serializer);
+    }
+
+    @Test
+    public void syncCompleteMessage() throws IOException
+    {
+        InetAddress src = InetAddress.getByName("127.0.0.2");
+        InetAddress dst = InetAddress.getByName("127.0.0.3");
+        SyncComplete msg = new SyncComplete(buildRepairJobDesc(), new NodePair(src, dst), true);
+        serializeRoundTrip(msg, SyncComplete.serializer);
+    }
+
+    @Test
+    public void antiCompactionRequestMessage() throws IOException
+    {
+        AnticompactionRequest msg = new AnticompactionRequest(UUID.randomUUID(), buildTokenRanges());
+        serializeRoundTrip(msg, AnticompactionRequest.serializer);
+    }
+
+    @Test
+    public void prepareMessage() throws IOException
+    {
+        PrepareMessage msg = new PrepareMessage(UUID.randomUUID(), new ArrayList<UUID>() {{add(UUID.randomUUID());}},
+                                                buildTokenRanges(), true, 100000L, false);
+        serializeRoundTrip(msg, PrepareMessage.serializer);
+    }
+
+    @Test
+    public void snapshotMessage() throws IOException
+    {
+        SnapshotMessage msg = new SnapshotMessage(buildRepairJobDesc());
+        serializeRoundTrip(msg, SnapshotMessage.serializer);
+    }
+
+    @Test
+    public void cleanupMessage() throws IOException
+    {
+        CleanupMessage msg = new CleanupMessage(UUID.randomUUID());
+        serializeRoundTrip(msg, CleanupMessage.serializer);
+    }
+}