You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2016/07/18 15:43:56 UTC

[41/50] cassandra git commit: Wait until the message is being send to decide which serializer must be used

Wait until the message is being send to decide which serializer must be used

patch by Benjamin Lerer; reviewed by Tyler Hobbs for CASSANDRA-11393


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fbd287ad
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fbd287ad
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fbd287ad

Branch: refs/heads/cassandra-3.8
Commit: fbd287ad2ed09190dd9c6e152b82215e81020847
Parents: e99ee19
Author: Benjamin Lerer <b....@gmail.com>
Authored: Thu Jul 14 11:33:08 2016 +0200
Committer: Benjamin Lerer <b....@gmail.com>
Committed: Thu Jul 14 11:33:08 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/config/DatabaseDescriptor.java    |  1 +
 .../cassandra/db/PartitionRangeReadCommand.java |  7 +--
 .../org/apache/cassandra/db/ReadCommand.java    | 65 ++++++++++----------
 .../org/apache/cassandra/db/ReadResponse.java   | 43 +++++--------
 .../db/SinglePartitionReadCommand.java          |  2 +-
 .../io/ForwardingVersionedSerializer.java       | 57 +++++++++++++++++
 .../apache/cassandra/net/MessagingService.java  |  6 +-
 8 files changed, 113 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbd287ad/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 70210a8..3829046 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.9
+ * Wait until the message is being send to decide which serializer must be used (CASSANDRA-11393)
  * Fix migration of static thrift column names with non-text comparators (CASSANDRA-12147)
  * Fix upgrading sparse tables that are incorrectly marked as dense (CASSANDRA-11315)
  * Fix reverse queries ignoring range tombstones (CASSANDRA-11733)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbd287ad/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 100bcf4..b71ebf6 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1124,6 +1124,7 @@ public class DatabaseDescriptor
             case READ:
                 return getReadRpcTimeout();
             case RANGE_SLICE:
+            case PAGED_RANGE:
                 return getRangeRpcTimeout();
             case TRUNCATE:
                 return getTruncateRpcTimeout();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbd287ad/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index 842ad5f..99e24c8 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -253,12 +253,9 @@ public class PartitionRangeReadCommand extends ReadCommand
 
     public MessageOut<ReadCommand> createMessage(int version)
     {
-        if (version >= MessagingService.VERSION_30)
-            return new MessageOut<>(MessagingService.Verb.RANGE_SLICE, this, serializer);
-
         return dataRange().isPaging()
-             ? new MessageOut<>(MessagingService.Verb.PAGED_RANGE, this, legacyPagedRangeCommandSerializer)
-             : new MessageOut<>(MessagingService.Verb.RANGE_SLICE, this, legacyRangeSliceCommandSerializer);
+             ? new MessageOut<>(MessagingService.Verb.PAGED_RANGE, this, pagedRangeSerializer)
+             : new MessageOut<>(MessagingService.Verb.RANGE_SLICE, this, rangeSliceSerializer);
     }
 
     protected void appendCQLWhereClause(StringBuilder sb)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbd287ad/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index c792a5a..36969f8 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -34,6 +34,7 @@ import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.index.Index;
 import org.apache.cassandra.index.IndexNotAvailableException;
+import org.apache.cassandra.io.ForwardingVersionedSerializer;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -58,9 +59,39 @@ public abstract class ReadCommand implements ReadQuery
     protected static final Logger logger = LoggerFactory.getLogger(ReadCommand.class);
 
     public static final IVersionedSerializer<ReadCommand> serializer = new Serializer();
+
+    // For READ verb: will either dispatch on 'serializer' for 3.0 or 'legacyReadCommandSerializer' for earlier version.
+    // Can be removed (and replaced by 'serializer') once we drop pre-3.0 backward compatibility.
+    public static final IVersionedSerializer<ReadCommand> readSerializer = new ForwardingVersionedSerializer<ReadCommand>()
+    {
+        protected IVersionedSerializer<ReadCommand> delegate(int version)
+        {
+            return version < MessagingService.VERSION_30
+                    ? legacyReadCommandSerializer : serializer;
+        }
+    };
+
     // For RANGE_SLICE verb: will either dispatch on 'serializer' for 3.0 or 'legacyRangeSliceCommandSerializer' for earlier version.
     // Can be removed (and replaced by 'serializer') once we drop pre-3.0 backward compatibility.
-    public static final IVersionedSerializer<ReadCommand> rangeSliceSerializer = new RangeSliceSerializer();
+    public static final IVersionedSerializer<ReadCommand> rangeSliceSerializer = new ForwardingVersionedSerializer<ReadCommand>()
+    {
+        protected IVersionedSerializer<ReadCommand> delegate(int version)
+        {
+            return version < MessagingService.VERSION_30
+                    ? legacyRangeSliceCommandSerializer : serializer;
+        }
+    };
+
+    // For PAGED_RANGE verb: will either dispatch on 'serializer' for 3.0 or 'legacyPagedRangeCommandSerializer' for earlier version.
+    // Can be removed (and replaced by 'serializer') once we drop pre-3.0 backward compatibility.
+    public static final IVersionedSerializer<ReadCommand> pagedRangeSerializer = new ForwardingVersionedSerializer<ReadCommand>()
+    {
+        protected IVersionedSerializer<ReadCommand> delegate(int version)
+        {
+            return version < MessagingService.VERSION_30
+                    ? legacyPagedRangeCommandSerializer : serializer;
+        }
+    };
 
     public static final IVersionedSerializer<ReadCommand> legacyRangeSliceCommandSerializer = new LegacyRangeSliceCommandSerializer();
     public static final IVersionedSerializer<ReadCommand> legacyPagedRangeCommandSerializer = new LegacyPagedRangeCommandSerializer();
@@ -567,7 +598,6 @@ public abstract class ReadCommand implements ReadQuery
 
         public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
         {
-            // for serialization, createLegacyMessage() should cause legacyReadCommandSerializer to be used directly
             assert version >= MessagingService.VERSION_30;
 
             out.writeByte(command.kind.ordinal());
@@ -587,8 +617,7 @@ public abstract class ReadCommand implements ReadQuery
 
         public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
         {
-            if (version < MessagingService.VERSION_30)
-                return legacyReadCommandSerializer.deserialize(in, version);
+            assert version >= MessagingService.VERSION_30;
 
             Kind kind = Kind.values()[in.readByte()];
             int flags = in.readByte();
@@ -628,7 +657,6 @@ public abstract class ReadCommand implements ReadQuery
 
         public long serializedSize(ReadCommand command, int version)
         {
-            // for serialization, createLegacyMessage() should cause legacyReadCommandSerializer to be used directly
             assert version >= MessagingService.VERSION_30;
 
             return 2 // kind + flags
@@ -643,33 +671,6 @@ public abstract class ReadCommand implements ReadQuery
         }
     }
 
-    // Dispatch to either Serializer or LegacyRangeSliceCommandSerializer. Only useful as long as we maintain pre-3.0
-    // compatibility
-    private static class RangeSliceSerializer implements IVersionedSerializer<ReadCommand>
-    {
-        public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
-        {
-            if (version < MessagingService.VERSION_30)
-                legacyRangeSliceCommandSerializer.serialize(command, out, version);
-            else
-                serializer.serialize(command, out, version);
-        }
-
-        public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
-        {
-            return version < MessagingService.VERSION_30
-                 ? legacyRangeSliceCommandSerializer.deserialize(in, version)
-                 : serializer.deserialize(in, version);
-        }
-
-        public long serializedSize(ReadCommand command, int version)
-        {
-            return version < MessagingService.VERSION_30
-                 ? legacyRangeSliceCommandSerializer.serializedSize(command, version)
-                 : serializer.serializedSize(command, version);
-        }
-    }
-
     private enum LegacyType
     {
         GET_BY_NAMES((byte)1),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbd287ad/src/java/org/apache/cassandra/db/ReadResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java
index 8bd1be6..12a200f 100644
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.dht.*;
+import org.apache.cassandra.io.ForwardingVersionedSerializer;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataInputPlus;
@@ -46,11 +47,20 @@ public abstract class ReadResponse
 {
     // Serializer for single partition read response
     public static final IVersionedSerializer<ReadResponse> serializer = new Serializer();
-    // Serializer for partition range read response (this actually delegate to 'serializer' in 3.0 and to
-    // 'legacyRangeSliceReplySerializer' in older version.
-    public static final IVersionedSerializer<ReadResponse> rangeSliceSerializer = new RangeSliceSerializer();
     // Serializer for the pre-3.0 rang slice responses.
     public static final IVersionedSerializer<ReadResponse> legacyRangeSliceReplySerializer = new LegacyRangeSliceReplySerializer();
+    // Serializer for partition range read response (this actually delegate to 'serializer' in 3.0 and to
+    // 'legacyRangeSliceReplySerializer' in older version.
+    public static final IVersionedSerializer<ReadResponse> rangeSliceSerializer = new ForwardingVersionedSerializer<ReadResponse>()
+    {
+        @Override
+        protected IVersionedSerializer<ReadResponse> delegate(int version)
+        {
+            return version < MessagingService.VERSION_30
+                    ? legacyRangeSliceReplySerializer
+                    : serializer;
+        }
+    };
 
     // This is used only when serializing data responses and we can't it easily in other cases. So this can be null, which is slighly
     // hacky, but as this hack doesn't escape this class, and it's easy enough to validate that it's not null when we need, it's "good enough".
@@ -411,31 +421,6 @@ public abstract class ReadResponse
         }
     }
 
-    private static class RangeSliceSerializer implements IVersionedSerializer<ReadResponse>
-    {
-        public void serialize(ReadResponse response, DataOutputPlus out, int version) throws IOException
-        {
-            if (version < MessagingService.VERSION_30)
-                legacyRangeSliceReplySerializer.serialize(response, out, version);
-            else
-                serializer.serialize(response, out, version);
-        }
-
-        public ReadResponse deserialize(DataInputPlus in, int version) throws IOException
-        {
-            return version < MessagingService.VERSION_30
-                 ? legacyRangeSliceReplySerializer.deserialize(in, version)
-                 : serializer.deserialize(in, version);
-        }
-
-        public long serializedSize(ReadResponse response, int version)
-        {
-            return version < MessagingService.VERSION_30
-                 ? legacyRangeSliceReplySerializer.serializedSize(response, version)
-                 : serializer.serializedSize(response, version);
-        }
-    }
-
     private static class LegacyRangeSliceReplySerializer implements IVersionedSerializer<ReadResponse>
     {
         public void serialize(ReadResponse response, DataOutputPlus out, int version) throws IOException
@@ -477,6 +462,8 @@ public abstract class ReadResponse
 
         public ReadResponse deserialize(DataInputPlus in, int version) throws IOException
         {
+            assert version < MessagingService.VERSION_30;
+
             int partitionCount = in.readInt();
             ArrayList<ImmutableBTreePartition> partitions = new ArrayList<>(partitionCount);
             for (int i = 0; i < partitionCount; i++)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbd287ad/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 6784770..73eb9bd 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -894,7 +894,7 @@ public class SinglePartitionReadCommand extends ReadCommand
 
     public MessageOut<ReadCommand> createMessage(int version)
     {
-        return new MessageOut<>(MessagingService.Verb.READ, this, version < MessagingService.VERSION_30 ? legacyReadCommandSerializer : serializer);
+        return new MessageOut<>(MessagingService.Verb.READ, this, readSerializer);
     }
 
     protected void appendCQLWhereClause(StringBuilder sb)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbd287ad/src/java/org/apache/cassandra/io/ForwardingVersionedSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/ForwardingVersionedSerializer.java b/src/java/org/apache/cassandra/io/ForwardingVersionedSerializer.java
new file mode 100644
index 0000000..64f91d7
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/ForwardingVersionedSerializer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io;
+
+import java.io.IOException;
+
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+/**
+ * A serializer which forwards all its method calls to another serializer. Subclasses should override one or more
+ * methods to modify the behavior of the backing serializer as desired per the decorator pattern.
+ */
+public abstract class ForwardingVersionedSerializer<T> implements IVersionedSerializer<T>
+{
+    protected ForwardingVersionedSerializer()
+    {
+    }
+
+    /**
+     * Returns the backing delegate instance that methods are forwarded to.
+     *
+     * @param version the server version
+     * @return the backing delegate instance that methods are forwarded to.
+     */
+    protected abstract IVersionedSerializer<T> delegate(int version);
+
+    public void serialize(T t, DataOutputPlus out, int version) throws IOException
+    {
+        delegate(version).serialize(t, out, version);
+    }
+
+    public T deserialize(DataInputPlus in, int version) throws IOException
+    {
+        return delegate(version).deserialize(in, version);
+    }
+
+    public long serializedSize(T t, int version)
+    {
+        return delegate(version).serializedSize(t, version);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbd287ad/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index fac46eb..d01419f 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -215,9 +215,9 @@ public final class MessagingService implements MessagingServiceMBean
 
         put(Verb.MUTATION, Mutation.serializer);
         put(Verb.READ_REPAIR, Mutation.serializer);
-        put(Verb.READ, ReadCommand.serializer);
+        put(Verb.READ, ReadCommand.readSerializer);
         put(Verb.RANGE_SLICE, ReadCommand.rangeSliceSerializer);
-        put(Verb.PAGED_RANGE, ReadCommand.legacyPagedRangeCommandSerializer);
+        put(Verb.PAGED_RANGE, ReadCommand.pagedRangeSerializer);
         put(Verb.BOOTSTRAP_TOKEN, BootStrapper.StringSerializer.instance);
         put(Verb.REPAIR_MESSAGE, RepairMessage.serializer);
         put(Verb.GOSSIP_DIGEST_ACK, GossipDigestAck.serializer);
@@ -247,7 +247,7 @@ public final class MessagingService implements MessagingServiceMBean
         put(Verb.READ_REPAIR, WriteResponse.serializer);
         put(Verb.COUNTER_MUTATION, WriteResponse.serializer);
         put(Verb.RANGE_SLICE, ReadResponse.rangeSliceSerializer);
-        put(Verb.PAGED_RANGE, ReadResponse.legacyRangeSliceReplySerializer);
+        put(Verb.PAGED_RANGE, ReadResponse.rangeSliceSerializer);
         put(Verb.READ, ReadResponse.serializer);
         put(Verb.TRUNCATE, TruncateResponse.serializer);
         put(Verb.SNAPSHOT, null);