You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by GitBox <gi...@apache.org> on 2022/10/26 21:13:28 UTC

[GitHub] [cassandra] tjake opened a new pull request, #1954: Cassandra 17998

tjake opened a new pull request, #1954:
URL: https://github.com/apache/cassandra/pull/1954

   
   The [Cassandra Jira](https://issues.apache.org/jira/projects/CASSANDRA/issues/CASSANDRA-17998)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] josh-mckenzie commented on a diff in pull request #1954: Cassandra 17998

Posted by GitBox <gi...@apache.org>.
josh-mckenzie commented on code in PR #1954:
URL: https://github.com/apache/cassandra/pull/1954#discussion_r1035247463


##########
src/java/org/apache/cassandra/db/Mutation.java:
##########
@@ -64,6 +67,13 @@ public class Mutation implements IMutation, Supplier<Mutation>
 
     private final boolean cdcEnabled;
 
+    // Contains serialized representations of this mutation.
+    // Note: there is no functionality to clear/remove serialized instances, because a mutation must never
+    // be modified (e.g. calling add(PartitionUpdate)) when it's being serialized.
+    private static final int CACHED_SERIALIZATIONS = MessagingService.Version.values().length;
+    private static final int CACHEABLE_MUTATION_SIZE_LIMIT = Integer.getInteger(Config.PROPERTY_PREFIX + "cacheable_mutation_size_limit_bytes", 2 * 1024 * 1024) - 24;

Review Comment:
   What's the -24 here for? Maybe document it?



##########
src/java/org/apache/cassandra/db/Mutation.java:
##########
@@ -393,34 +388,130 @@ public static class MutationSerializer implements IVersionedSerializer<Mutation>
     {
         public void serialize(Mutation mutation, DataOutputPlus out, int version) throws IOException
         {
+            serialization(mutation, version).serialize(PartitionUpdate.serializer, mutation, out, version);
+        }
+
+        /**
+         * Called early during request processing to prevent that {@link #serialization(Mutation)} is
+         * called concurrently.
+         * See {@link org.apache.cassandra.service.StorageProxy#sendToHintedEndpoints(Mutation, WriteEndpoints, WriteHandler, Verb.AckedRequest)}.
+         */
+        @SuppressWarnings("JavadocReference")
+        public void prepareSerializedBuffer(Mutation mutation, int version)
+        {
+            serialization(mutation, version);
+        }
+
+        /**
+         * Retrieve the cached serialization of this mutation, or computed and cache said serialization if it doesn't
+         * exists yet. Note that this method is _not_ synchronized even though it may (and will often) be called
+         * concurrently. Concurrent calls are still safe however, the only risk is that the value is not cached yet,
+         * multiple concurrent calls may compute it multiple times instead of just once. This is ok as in practice
+         * as we make sure this doesn't happen in the hot path by forcing the initial caching in
+         * {@link org.apache.cassandra.service.StorageProxy#sendToHintedEndpoints(Mutation, WriteEndpoints, WriteHandler, Verb.AckedRequest)}
+         * via {@link #prepareSerializedBuffer(Mutation)}, which is the only caller that passes
+         * {@code isPrepare==true}.
+         */
+        @SuppressWarnings("JavadocReference")
+        private Serialization serialization(Mutation mutation, int version)
+        {
+            int versionIndex = MessagingService.getVersionIndex(version);
+            // Retrieves the cached version, or build+cache it if it's not cached already.
+            Serialization serialization = mutation.serializations[versionIndex];
+            if (serialization == null)
+            {
+                // We need to use a capacity-limited DOB here.
+                // If a mutation consists of one PartitionUpdate with one column that exceeds the
+                // "cacheable-mutation-size-limit", a capacity-limited DOB can handle that case and
+                // throw a BufferCapacityExceededException. "Huge" serialized mutations can have a
+                // bad impact to G1 GC, if the cached serialized mutation results in a
+                // "humonguous object" and also frequent re-allocations of the scratch buffer(s).
+                // I.e. large cached mutation objects cause GC pressure.
+                try (DataOutputBuffer dob = DataOutputBuffer.limitedScratchBuffer(CACHEABLE_MUTATION_SIZE_LIMIT))
+                {
+                    if (!serializeInternal(PartitionUpdate.serializer, mutation, dob, version,true))
+                    {
+                        serialization = new NonCacheableSerialization();
+                    }
+                    else
+                    {
+                        serialization = new CachedSerialization(dob.toByteArray());
+                    }
+                }
+                catch (DataOutputBuffer.BufferCapacityExceededException tooBig)

Review Comment:
   What are the implications of us using exception handling basically for control flow in the case of mutations that exceed our cacheable capacity vs. some other less invasive method? Wondering from a performance perspective as this is hot path and exception handling is notoriously Not Fast.



##########
src/java/org/apache/cassandra/db/Mutation.java:
##########
@@ -393,34 +388,130 @@ public static class MutationSerializer implements IVersionedSerializer<Mutation>
     {
         public void serialize(Mutation mutation, DataOutputPlus out, int version) throws IOException
         {
+            serialization(mutation, version).serialize(PartitionUpdate.serializer, mutation, out, version);
+        }
+
+        /**
+         * Called early during request processing to prevent that {@link #serialization(Mutation)} is
+         * called concurrently.
+         * See {@link org.apache.cassandra.service.StorageProxy#sendToHintedEndpoints(Mutation, WriteEndpoints, WriteHandler, Verb.AckedRequest)}.
+         */
+        @SuppressWarnings("JavadocReference")
+        public void prepareSerializedBuffer(Mutation mutation, int version)
+        {
+            serialization(mutation, version);
+        }
+
+        /**
+         * Retrieve the cached serialization of this mutation, or computed and cache said serialization if it doesn't
+         * exists yet. Note that this method is _not_ synchronized even though it may (and will often) be called
+         * concurrently. Concurrent calls are still safe however, the only risk is that the value is not cached yet,
+         * multiple concurrent calls may compute it multiple times instead of just once. This is ok as in practice
+         * as we make sure this doesn't happen in the hot path by forcing the initial caching in
+         * {@link org.apache.cassandra.service.StorageProxy#sendToHintedEndpoints(Mutation, WriteEndpoints, WriteHandler, Verb.AckedRequest)}
+         * via {@link #prepareSerializedBuffer(Mutation)}, which is the only caller that passes
+         * {@code isPrepare==true}.
+         */
+        @SuppressWarnings("JavadocReference")
+        private Serialization serialization(Mutation mutation, int version)
+        {
+            int versionIndex = MessagingService.getVersionIndex(version);
+            // Retrieves the cached version, or build+cache it if it's not cached already.
+            Serialization serialization = mutation.serializations[versionIndex];
+            if (serialization == null)
+            {
+                // We need to use a capacity-limited DOB here.
+                // If a mutation consists of one PartitionUpdate with one column that exceeds the
+                // "cacheable-mutation-size-limit", a capacity-limited DOB can handle that case and
+                // throw a BufferCapacityExceededException. "Huge" serialized mutations can have a
+                // bad impact to G1 GC, if the cached serialized mutation results in a
+                // "humonguous object" and also frequent re-allocations of the scratch buffer(s).
+                // I.e. large cached mutation objects cause GC pressure.
+                try (DataOutputBuffer dob = DataOutputBuffer.limitedScratchBuffer(CACHEABLE_MUTATION_SIZE_LIMIT))
+                {
+                    if (!serializeInternal(PartitionUpdate.serializer, mutation, dob, version,true))
+                    {
+                        serialization = new NonCacheableSerialization();
+                    }
+                    else
+                    {
+                        serialization = new CachedSerialization(dob.toByteArray());
+                    }
+                }
+                catch (DataOutputBuffer.BufferCapacityExceededException tooBig)
+                {
+                    serialization = new NonCacheableSerialization();
+                }
+                catch (IOException e)
+                {
+                    throw new RuntimeException(e);
+                }
+                mutation.serializations[versionIndex] = serialization;
+            }
+            return serialization;
+        }
+
+        static boolean serializeInternal(PartitionUpdate.PartitionUpdateSerializer serializer,
+                                         Mutation mutation,
+                                         DataOutputPlus out,
+                                         int version,
+                                         boolean isPrepare) throws IOException
+        {
+            Map<TableId, PartitionUpdate> modifications = mutation.modifications;
+
             /* serialize the modifications in the mutation */
-            int size = mutation.modifications.size();
+            int size = modifications.size();
             out.writeUnsignedVInt(size);
 
             assert size > 0;
-            for (Map.Entry<TableId, PartitionUpdate> entry : mutation.modifications.entrySet())
-                PartitionUpdate.serializer.serialize(entry.getValue(), out, version);
+            for (PartitionUpdate partitionUpdate : modifications.values())
+            {
+                serializer.serialize(partitionUpdate, out, version);
+                if (isCacheableMutationSizeLimit(out, isPrepare))
+                    return false;
+            }
+            return true;
         }
 
-        public Mutation deserialize(DataInputPlus in, int version, DeserializationHelper.Flag flag) throws IOException
+        private static boolean isCacheableMutationSizeLimit(DataOutputPlus out, boolean isPrepare)

Review Comment:
   Super nit: maybe we rename to `atCacheableMutationSizeLimit`? The name kind of confused me as to what it was doing; kind of assumed it was checking to see if the limit _existed_ not that we'd hit it.



##########
src/java/org/apache/cassandra/db/Mutation.java:
##########
@@ -393,34 +388,130 @@ public static class MutationSerializer implements IVersionedSerializer<Mutation>
     {
         public void serialize(Mutation mutation, DataOutputPlus out, int version) throws IOException
         {
+            serialization(mutation, version).serialize(PartitionUpdate.serializer, mutation, out, version);
+        }
+
+        /**
+         * Called early during request processing to prevent that {@link #serialization(Mutation)} is
+         * called concurrently.
+         * See {@link org.apache.cassandra.service.StorageProxy#sendToHintedEndpoints(Mutation, WriteEndpoints, WriteHandler, Verb.AckedRequest)}.
+         */
+        @SuppressWarnings("JavadocReference")
+        public void prepareSerializedBuffer(Mutation mutation, int version)
+        {
+            serialization(mutation, version);
+        }
+
+        /**
+         * Retrieve the cached serialization of this mutation, or computed and cache said serialization if it doesn't
+         * exists yet. Note that this method is _not_ synchronized even though it may (and will often) be called

Review Comment:
   nit: "exist yet"



##########
src/java/org/apache/cassandra/io/util/TeeDataInputPlus.java:
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.utils.Throwables;
+
+/**
+ * DataInput that also stores the raw inputs into an output buffer
+ * This is useful for storing serialized buffers as they are deserialized

Review Comment:
   Would add something to class level javadoc about the API -> that writes will stop buffering when you hit the limit and you can check that state with `isLimitReached`.



##########
src/java/org/apache/cassandra/db/Mutation.java:
##########
@@ -430,10 +521,71 @@ public Mutation deserialize(DataInputPlus in, int version) throws IOException
 
         public long serializedSize(Mutation mutation, int version)
         {
-            int size = TypeSizes.sizeofUnsignedVInt(mutation.modifications.size());
-            for (Map.Entry<TableId, PartitionUpdate> entry : mutation.modifications.entrySet())
-                size += PartitionUpdate.serializer.serializedSize(entry.getValue(), version);
+            return serialization(mutation, version).serializedSize(PartitionUpdate.serializer, mutation, version);
+        }
+    }
 
+    /**
+     * There are two implementations of this class. One that keeps the serialized representation on-heap for later
+     * reuse and one that doesn't. Keeping all sized mutations around may lead to "bad" GC pressure (G1 GC) due to humongous objects.

Review Comment:
   Think we should lift this up to the CACHEABLE_MUTATION_SIZE_LIMIT declaration as a javadoc so it's clear at declaration time why this limit exists.



##########
src/java/org/apache/cassandra/db/Mutation.java:
##########
@@ -393,34 +388,130 @@ public static class MutationSerializer implements IVersionedSerializer<Mutation>
     {
         public void serialize(Mutation mutation, DataOutputPlus out, int version) throws IOException
         {
+            serialization(mutation, version).serialize(PartitionUpdate.serializer, mutation, out, version);
+        }
+
+        /**
+         * Called early during request processing to prevent that {@link #serialization(Mutation)} is
+         * called concurrently.
+         * See {@link org.apache.cassandra.service.StorageProxy#sendToHintedEndpoints(Mutation, WriteEndpoints, WriteHandler, Verb.AckedRequest)}.
+         */
+        @SuppressWarnings("JavadocReference")
+        public void prepareSerializedBuffer(Mutation mutation, int version)
+        {
+            serialization(mutation, version);
+        }
+
+        /**
+         * Retrieve the cached serialization of this mutation, or computed and cache said serialization if it doesn't

Review Comment:
   nit: "compute and cache"



##########
src/java/org/apache/cassandra/db/Mutation.java:
##########
@@ -393,34 +388,130 @@ public static class MutationSerializer implements IVersionedSerializer<Mutation>
     {
         public void serialize(Mutation mutation, DataOutputPlus out, int version) throws IOException
         {
+            serialization(mutation, version).serialize(PartitionUpdate.serializer, mutation, out, version);
+        }
+
+        /**
+         * Called early during request processing to prevent that {@link #serialization(Mutation)} is
+         * called concurrently.
+         * See {@link org.apache.cassandra.service.StorageProxy#sendToHintedEndpoints(Mutation, WriteEndpoints, WriteHandler, Verb.AckedRequest)}.

Review Comment:
   This is an invalid javadoc target; doesn't resolve and I can't find it in the codebase.



##########
src/java/org/apache/cassandra/db/Mutation.java:
##########
@@ -393,34 +388,130 @@ public static class MutationSerializer implements IVersionedSerializer<Mutation>
     {
         public void serialize(Mutation mutation, DataOutputPlus out, int version) throws IOException
         {
+            serialization(mutation, version).serialize(PartitionUpdate.serializer, mutation, out, version);
+        }
+
+        /**
+         * Called early during request processing to prevent that {@link #serialization(Mutation)} is
+         * called concurrently.
+         * See {@link org.apache.cassandra.service.StorageProxy#sendToHintedEndpoints(Mutation, WriteEndpoints, WriteHandler, Verb.AckedRequest)}.
+         */
+        @SuppressWarnings("JavadocReference")
+        public void prepareSerializedBuffer(Mutation mutation, int version)
+        {
+            serialization(mutation, version);
+        }
+
+        /**
+         * Retrieve the cached serialization of this mutation, or computed and cache said serialization if it doesn't
+         * exists yet. Note that this method is _not_ synchronized even though it may (and will often) be called
+         * concurrently. Concurrent calls are still safe however, the only risk is that the value is not cached yet,
+         * multiple concurrent calls may compute it multiple times instead of just once. This is ok as in practice
+         * as we make sure this doesn't happen in the hot path by forcing the initial caching in
+         * {@link org.apache.cassandra.service.StorageProxy#sendToHintedEndpoints(Mutation, WriteEndpoints, WriteHandler, Verb.AckedRequest)}
+         * via {@link #prepareSerializedBuffer(Mutation)}, which is the only caller that passes
+         * {@code isPrepare==true}.
+         */
+        @SuppressWarnings("JavadocReference")
+        private Serialization serialization(Mutation mutation, int version)
+        {
+            int versionIndex = MessagingService.getVersionIndex(version);
+            // Retrieves the cached version, or build+cache it if it's not cached already.
+            Serialization serialization = mutation.serializations[versionIndex];
+            if (serialization == null)
+            {
+                // We need to use a capacity-limited DOB here.
+                // If a mutation consists of one PartitionUpdate with one column that exceeds the
+                // "cacheable-mutation-size-limit", a capacity-limited DOB can handle that case and
+                // throw a BufferCapacityExceededException. "Huge" serialized mutations can have a
+                // bad impact to G1 GC, if the cached serialized mutation results in a
+                // "humonguous object" and also frequent re-allocations of the scratch buffer(s).
+                // I.e. large cached mutation objects cause GC pressure.
+                try (DataOutputBuffer dob = DataOutputBuffer.limitedScratchBuffer(CACHEABLE_MUTATION_SIZE_LIMIT))
+                {
+                    if (!serializeInternal(PartitionUpdate.serializer, mutation, dob, version,true))
+                    {
+                        serialization = new NonCacheableSerialization();
+                    }
+                    else
+                    {
+                        serialization = new CachedSerialization(dob.toByteArray());
+                    }
+                }
+                catch (DataOutputBuffer.BufferCapacityExceededException tooBig)
+                {
+                    serialization = new NonCacheableSerialization();
+                }
+                catch (IOException e)
+                {
+                    throw new RuntimeException(e);
+                }
+                mutation.serializations[versionIndex] = serialization;
+            }
+            return serialization;
+        }
+
+        static boolean serializeInternal(PartitionUpdate.PartitionUpdateSerializer serializer,
+                                         Mutation mutation,
+                                         DataOutputPlus out,
+                                         int version,
+                                         boolean isPrepare) throws IOException
+        {
+            Map<TableId, PartitionUpdate> modifications = mutation.modifications;
+
             /* serialize the modifications in the mutation */
-            int size = mutation.modifications.size();
+            int size = modifications.size();
             out.writeUnsignedVInt(size);
 
             assert size > 0;
-            for (Map.Entry<TableId, PartitionUpdate> entry : mutation.modifications.entrySet())
-                PartitionUpdate.serializer.serialize(entry.getValue(), out, version);
+            for (PartitionUpdate partitionUpdate : modifications.values())
+            {
+                serializer.serialize(partitionUpdate, out, version);
+                if (isCacheableMutationSizeLimit(out, isPrepare))
+                    return false;
+            }
+            return true;
         }
 
-        public Mutation deserialize(DataInputPlus in, int version, DeserializationHelper.Flag flag) throws IOException
+        private static boolean isCacheableMutationSizeLimit(DataOutputPlus out, boolean isPrepare)
         {
-            int size = (int)in.readUnsignedVInt();
-            assert size > 0;
-
-            PartitionUpdate update = PartitionUpdate.serializer.deserialize(in, version, flag);
-            if (size == 1)
-                return new Mutation(update);
-
-            ImmutableMap.Builder<TableId, PartitionUpdate> modifications = new ImmutableMap.Builder<>();
-            DecoratedKey dk = update.partitionKey();
+            return isPrepare && out.position() > CACHEABLE_MUTATION_SIZE_LIMIT;

Review Comment:
   Consider a `CassandraRelevantProperties` for this parameter.



##########
src/java/org/apache/cassandra/db/Mutation.java:
##########
@@ -298,29 +308,14 @@ public String toString(boolean shallow)
         }
         return buff.append("])").toString();
     }
-    private int serializedSize30;
-    private int serializedSize3014;
-    private int serializedSize40;
+
+    private int serializedSize;
 
     public int serializedSize(int version)
     {
-        switch (version)
-        {
-            case VERSION_30:
-                if (serializedSize30 == 0)
-                    serializedSize30 = (int) serializer.serializedSize(this, VERSION_30);
-                return serializedSize30;
-            case VERSION_3014:
-                if (serializedSize3014 == 0)
-                    serializedSize3014 = (int) serializer.serializedSize(this, VERSION_3014);
-                return serializedSize3014;
-            case VERSION_40:
-                if (serializedSize40 == 0)
-                    serializedSize40 = (int) serializer.serializedSize(this, VERSION_40);
-                return serializedSize40;
-            default:
-                throw new IllegalStateException("Unknown serialization version: " + version);
-        }
+        if (serializedSize == 0)

Review Comment:
   Looks like we're losing our safety check against a bad version being passed in. If we have confidence this is the case we should document this method w/the invariant of the version being valid and/or Precondition check it. Or just restore some variation of the prior switch that throws an `IllegalStateException`



##########
src/java/org/apache/cassandra/io/util/TeeDataInputPlus.java:
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.utils.Throwables;
+
+/**
+ * DataInput that also stores the raw inputs into an output buffer
+ * This is useful for storing serialized buffers as they are deserialized
+ */
+public class TeeDataInputPlus implements DataInputPlus
+{
+    private final DataInputPlus source;
+    private final DataOutputPlus teeBuffer;
+
+    private final long limit;
+    private boolean limitReached = false;
+
+    public TeeDataInputPlus(DataInputPlus source, DataOutputPlus teeBuffer)
+    {
+        this(source, teeBuffer, -1);
+    }
+
+    public TeeDataInputPlus(DataInputPlus source, DataOutputPlus teeBuffer, long limit)
+    {
+        this.source = source;

Review Comment:
   We may want to assert that neither input is null here. May not; just a thought.



##########
src/java/org/apache/cassandra/db/Mutation.java:
##########
@@ -393,34 +388,130 @@ public static class MutationSerializer implements IVersionedSerializer<Mutation>
     {
         public void serialize(Mutation mutation, DataOutputPlus out, int version) throws IOException
         {
+            serialization(mutation, version).serialize(PartitionUpdate.serializer, mutation, out, version);
+        }
+
+        /**
+         * Called early during request processing to prevent that {@link #serialization(Mutation)} is
+         * called concurrently.
+         * See {@link org.apache.cassandra.service.StorageProxy#sendToHintedEndpoints(Mutation, WriteEndpoints, WriteHandler, Verb.AckedRequest)}.
+         */
+        @SuppressWarnings("JavadocReference")
+        public void prepareSerializedBuffer(Mutation mutation, int version)
+        {
+            serialization(mutation, version);
+        }
+
+        /**
+         * Retrieve the cached serialization of this mutation, or computed and cache said serialization if it doesn't
+         * exists yet. Note that this method is _not_ synchronized even though it may (and will often) be called
+         * concurrently. Concurrent calls are still safe however, the only risk is that the value is not cached yet,
+         * multiple concurrent calls may compute it multiple times instead of just once. This is ok as in practice
+         * as we make sure this doesn't happen in the hot path by forcing the initial caching in
+         * {@link org.apache.cassandra.service.StorageProxy#sendToHintedEndpoints(Mutation, WriteEndpoints, WriteHandler, Verb.AckedRequest)}
+         * via {@link #prepareSerializedBuffer(Mutation)}, which is the only caller that passes
+         * {@code isPrepare==true}.
+         */
+        @SuppressWarnings("JavadocReference")
+        private Serialization serialization(Mutation mutation, int version)
+        {
+            int versionIndex = MessagingService.getVersionIndex(version);
+            // Retrieves the cached version, or build+cache it if it's not cached already.
+            Serialization serialization = mutation.serializations[versionIndex];
+            if (serialization == null)
+            {
+                // We need to use a capacity-limited DOB here.
+                // If a mutation consists of one PartitionUpdate with one column that exceeds the
+                // "cacheable-mutation-size-limit", a capacity-limited DOB can handle that case and
+                // throw a BufferCapacityExceededException. "Huge" serialized mutations can have a
+                // bad impact to G1 GC, if the cached serialized mutation results in a
+                // "humonguous object" and also frequent re-allocations of the scratch buffer(s).
+                // I.e. large cached mutation objects cause GC pressure.
+                try (DataOutputBuffer dob = DataOutputBuffer.limitedScratchBuffer(CACHEABLE_MUTATION_SIZE_LIMIT))
+                {
+                    if (!serializeInternal(PartitionUpdate.serializer, mutation, dob, version,true))
+                    {
+                        serialization = new NonCacheableSerialization();
+                    }
+                    else
+                    {
+                        serialization = new CachedSerialization(dob.toByteArray());
+                    }
+                }
+                catch (DataOutputBuffer.BufferCapacityExceededException tooBig)
+                {
+                    serialization = new NonCacheableSerialization();
+                }
+                catch (IOException e)
+                {
+                    throw new RuntimeException(e);
+                }
+                mutation.serializations[versionIndex] = serialization;

Review Comment:
   I was a little startled to see a `NonCacheableSerialization`... cached here. :) Might be worth renaming it to something like `SizeOnlyCachedSerialization` or something?



##########
src/java/org/apache/cassandra/db/Mutation.java:
##########
@@ -393,34 +388,130 @@ public static class MutationSerializer implements IVersionedSerializer<Mutation>
     {
         public void serialize(Mutation mutation, DataOutputPlus out, int version) throws IOException
         {
+            serialization(mutation, version).serialize(PartitionUpdate.serializer, mutation, out, version);
+        }
+
+        /**
+         * Called early during request processing to prevent that {@link #serialization(Mutation)} is
+         * called concurrently.
+         * See {@link org.apache.cassandra.service.StorageProxy#sendToHintedEndpoints(Mutation, WriteEndpoints, WriteHandler, Verb.AckedRequest)}.
+         */
+        @SuppressWarnings("JavadocReference")
+        public void prepareSerializedBuffer(Mutation mutation, int version)
+        {
+            serialization(mutation, version);
+        }
+
+        /**
+         * Retrieve the cached serialization of this mutation, or computed and cache said serialization if it doesn't
+         * exists yet. Note that this method is _not_ synchronized even though it may (and will often) be called
+         * concurrently. Concurrent calls are still safe however, the only risk is that the value is not cached yet,
+         * multiple concurrent calls may compute it multiple times instead of just once. This is ok as in practice
+         * as we make sure this doesn't happen in the hot path by forcing the initial caching in
+         * {@link org.apache.cassandra.service.StorageProxy#sendToHintedEndpoints(Mutation, WriteEndpoints, WriteHandler, Verb.AckedRequest)}
+         * via {@link #prepareSerializedBuffer(Mutation)}, which is the only caller that passes
+         * {@code isPrepare==true}.
+         */
+        @SuppressWarnings("JavadocReference")
+        private Serialization serialization(Mutation mutation, int version)
+        {
+            int versionIndex = MessagingService.getVersionIndex(version);
+            // Retrieves the cached version, or build+cache it if it's not cached already.
+            Serialization serialization = mutation.serializations[versionIndex];
+            if (serialization == null)
+            {
+                // We need to use a capacity-limited DOB here.
+                // If a mutation consists of one PartitionUpdate with one column that exceeds the
+                // "cacheable-mutation-size-limit", a capacity-limited DOB can handle that case and
+                // throw a BufferCapacityExceededException. "Huge" serialized mutations can have a
+                // bad impact to G1 GC, if the cached serialized mutation results in a
+                // "humonguous object" and also frequent re-allocations of the scratch buffer(s).
+                // I.e. large cached mutation objects cause GC pressure.
+                try (DataOutputBuffer dob = DataOutputBuffer.limitedScratchBuffer(CACHEABLE_MUTATION_SIZE_LIMIT))
+                {
+                    if (!serializeInternal(PartitionUpdate.serializer, mutation, dob, version,true))

Review Comment:
   A ternary operator on the construction of the serialization variable here may help tidy up clarity a bit.



##########
src/java/org/apache/cassandra/net/MessagingService.java:
##########
@@ -212,6 +215,23 @@ public class MessagingService extends MessagingServiceMBeanImpl
     public static final int current_version = VERSION_40;
     static AcceptVersions accept_messaging = new AcceptVersions(minimum_version, current_version);
     static AcceptVersions accept_streaming = new AcceptVersions(current_version, current_version);
+    static Map<Integer, Integer> versionIndexMap = Arrays.stream(Version.values()).collect(Collectors.toMap(v -> v.value, v -> v.ordinal()));
+
+    /**
+     * This is an optimisation to speed up the translation of the serialization
+     * version to the {@link Version} enum.
+     *
+     * @param version the serialization version
+     * @return a {@link Version}
+     */
+    public static int getVersionIndex(int version)
+    {
+        if (versionIndexMap.containsKey(version))

Review Comment:
   I think we could just .get on the map and throw on value == null. Save us the lookup then retrieve steps.



##########
src/java/org/apache/cassandra/io/util/TeeDataInputPlus.java:
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.utils.Throwables;
+
+/**
+ * DataInput that also stores the raw inputs into an output buffer
+ * This is useful for storing serialized buffers as they are deserialized
+ */
+public class TeeDataInputPlus implements DataInputPlus
+{
+    private final DataInputPlus source;
+    private final DataOutputPlus teeBuffer;
+
+    private final long limit;

Review Comment:
   Why do we use -1 to indicate no limit here vs. the 0 to mean no limit in `DataOutputBuffer.LimitingScratchBuffer`?



##########
src/java/org/apache/cassandra/db/Mutation.java:
##########
@@ -393,34 +388,130 @@ public static class MutationSerializer implements IVersionedSerializer<Mutation>
     {
         public void serialize(Mutation mutation, DataOutputPlus out, int version) throws IOException
         {
+            serialization(mutation, version).serialize(PartitionUpdate.serializer, mutation, out, version);
+        }
+
+        /**
+         * Called early during request processing to prevent that {@link #serialization(Mutation)} is
+         * called concurrently.
+         * See {@link org.apache.cassandra.service.StorageProxy#sendToHintedEndpoints(Mutation, WriteEndpoints, WriteHandler, Verb.AckedRequest)}.
+         */
+        @SuppressWarnings("JavadocReference")
+        public void prepareSerializedBuffer(Mutation mutation, int version)
+        {
+            serialization(mutation, version);
+        }
+
+        /**
+         * Retrieve the cached serialization of this mutation, or computed and cache said serialization if it doesn't
+         * exists yet. Note that this method is _not_ synchronized even though it may (and will often) be called
+         * concurrently. Concurrent calls are still safe however, the only risk is that the value is not cached yet,
+         * multiple concurrent calls may compute it multiple times instead of just once. This is ok as in practice
+         * as we make sure this doesn't happen in the hot path by forcing the initial caching in
+         * {@link org.apache.cassandra.service.StorageProxy#sendToHintedEndpoints(Mutation, WriteEndpoints, WriteHandler, Verb.AckedRequest)}
+         * via {@link #prepareSerializedBuffer(Mutation)}, which is the only caller that passes
+         * {@code isPrepare==true}.
+         */
+        @SuppressWarnings("JavadocReference")
+        private Serialization serialization(Mutation mutation, int version)
+        {
+            int versionIndex = MessagingService.getVersionIndex(version);
+            // Retrieves the cached version, or build+cache it if it's not cached already.
+            Serialization serialization = mutation.serializations[versionIndex];
+            if (serialization == null)
+            {
+                // We need to use a capacity-limited DOB here.
+                // If a mutation consists of one PartitionUpdate with one column that exceeds the
+                // "cacheable-mutation-size-limit", a capacity-limited DOB can handle that case and
+                // throw a BufferCapacityExceededException. "Huge" serialized mutations can have a
+                // bad impact to G1 GC, if the cached serialized mutation results in a
+                // "humonguous object" and also frequent re-allocations of the scratch buffer(s).
+                // I.e. large cached mutation objects cause GC pressure.
+                try (DataOutputBuffer dob = DataOutputBuffer.limitedScratchBuffer(CACHEABLE_MUTATION_SIZE_LIMIT))
+                {
+                    if (!serializeInternal(PartitionUpdate.serializer, mutation, dob, version,true))
+                    {
+                        serialization = new NonCacheableSerialization();
+                    }
+                    else
+                    {
+                        serialization = new CachedSerialization(dob.toByteArray());
+                    }
+                }
+                catch (DataOutputBuffer.BufferCapacityExceededException tooBig)
+                {
+                    serialization = new NonCacheableSerialization();
+                }
+                catch (IOException e)
+                {
+                    throw new RuntimeException(e);
+                }
+                mutation.serializations[versionIndex] = serialization;
+            }
+            return serialization;
+        }
+
+        static boolean serializeInternal(PartitionUpdate.PartitionUpdateSerializer serializer,

Review Comment:
   Recommend we change this to `maybeSerializeInternal` to reflect that it might or might not actually complete and that's acceptable. The fact that we use it w/out parsing the return value and also w/parsing it implies that there's some flexibility in that usage.



##########
src/java/org/apache/cassandra/net/MessagingService.java:
##########
@@ -212,6 +215,23 @@ public class MessagingService extends MessagingServiceMBeanImpl
     public static final int current_version = VERSION_40;
     static AcceptVersions accept_messaging = new AcceptVersions(minimum_version, current_version);
     static AcceptVersions accept_streaming = new AcceptVersions(current_version, current_version);
+    static Map<Integer, Integer> versionIndexMap = Arrays.stream(Version.values()).collect(Collectors.toMap(v -> v.value, v -> v.ordinal()));
+
+    /**
+     * This is an optimisation to speed up the translation of the serialization
+     * version to the {@link Version} enum.
+     *
+     * @param version the serialization version
+     * @return a {@link Version}
+     */
+    public static int getVersionIndex(int version)
+    {
+        if (versionIndexMap.containsKey(version))
+            return versionIndexMap.get(version);
+        throw new IllegalStateException("Unkown serialization version: " + version);
+    }
+
+    public final static boolean NON_GRACEFUL_SHUTDOWN = Boolean.getBoolean("cassandra.test.messagingService.nonGracefulShutdown");

Review Comment:
   This looks vestigial.



##########
src/java/org/apache/cassandra/db/Mutation.java:
##########
@@ -393,34 +388,130 @@ public static class MutationSerializer implements IVersionedSerializer<Mutation>
     {
         public void serialize(Mutation mutation, DataOutputPlus out, int version) throws IOException
         {
+            serialization(mutation, version).serialize(PartitionUpdate.serializer, mutation, out, version);
+        }
+
+        /**
+         * Called early during request processing to prevent that {@link #serialization(Mutation)} is
+         * called concurrently.
+         * See {@link org.apache.cassandra.service.StorageProxy#sendToHintedEndpoints(Mutation, WriteEndpoints, WriteHandler, Verb.AckedRequest)}.
+         */
+        @SuppressWarnings("JavadocReference")
+        public void prepareSerializedBuffer(Mutation mutation, int version)
+        {
+            serialization(mutation, version);
+        }
+
+        /**
+         * Retrieve the cached serialization of this mutation, or computed and cache said serialization if it doesn't
+         * exists yet. Note that this method is _not_ synchronized even though it may (and will often) be called
+         * concurrently. Concurrent calls are still safe however, the only risk is that the value is not cached yet,
+         * multiple concurrent calls may compute it multiple times instead of just once. This is ok as in practice
+         * as we make sure this doesn't happen in the hot path by forcing the initial caching in
+         * {@link org.apache.cassandra.service.StorageProxy#sendToHintedEndpoints(Mutation, WriteEndpoints, WriteHandler, Verb.AckedRequest)}
+         * via {@link #prepareSerializedBuffer(Mutation)}, which is the only caller that passes
+         * {@code isPrepare==true}.
+         */
+        @SuppressWarnings("JavadocReference")
+        private Serialization serialization(Mutation mutation, int version)
+        {
+            int versionIndex = MessagingService.getVersionIndex(version);
+            // Retrieves the cached version, or build+cache it if it's not cached already.
+            Serialization serialization = mutation.serializations[versionIndex];
+            if (serialization == null)
+            {
+                // We need to use a capacity-limited DOB here.
+                // If a mutation consists of one PartitionUpdate with one column that exceeds the
+                // "cacheable-mutation-size-limit", a capacity-limited DOB can handle that case and
+                // throw a BufferCapacityExceededException. "Huge" serialized mutations can have a
+                // bad impact to G1 GC, if the cached serialized mutation results in a
+                // "humonguous object" and also frequent re-allocations of the scratch buffer(s).
+                // I.e. large cached mutation objects cause GC pressure.
+                try (DataOutputBuffer dob = DataOutputBuffer.limitedScratchBuffer(CACHEABLE_MUTATION_SIZE_LIMIT))
+                {
+                    if (!serializeInternal(PartitionUpdate.serializer, mutation, dob, version,true))
+                    {
+                        serialization = new NonCacheableSerialization();
+                    }
+                    else
+                    {
+                        serialization = new CachedSerialization(dob.toByteArray());
+                    }
+                }
+                catch (DataOutputBuffer.BufferCapacityExceededException tooBig)
+                {
+                    serialization = new NonCacheableSerialization();
+                }
+                catch (IOException e)
+                {
+                    throw new RuntimeException(e);
+                }
+                mutation.serializations[versionIndex] = serialization;
+            }
+            return serialization;
+        }
+
+        static boolean serializeInternal(PartitionUpdate.PartitionUpdateSerializer serializer,
+                                         Mutation mutation,
+                                         DataOutputPlus out,
+                                         int version,
+                                         boolean isPrepare) throws IOException
+        {
+            Map<TableId, PartitionUpdate> modifications = mutation.modifications;
+
             /* serialize the modifications in the mutation */
-            int size = mutation.modifications.size();
+            int size = modifications.size();
             out.writeUnsignedVInt(size);
 
             assert size > 0;
-            for (Map.Entry<TableId, PartitionUpdate> entry : mutation.modifications.entrySet())
-                PartitionUpdate.serializer.serialize(entry.getValue(), out, version);
+            for (PartitionUpdate partitionUpdate : modifications.values())
+            {
+                serializer.serialize(partitionUpdate, out, version);
+                if (isCacheableMutationSizeLimit(out, isPrepare))
+                    return false;
+            }
+            return true;
         }
 
-        public Mutation deserialize(DataInputPlus in, int version, DeserializationHelper.Flag flag) throws IOException
+        private static boolean isCacheableMutationSizeLimit(DataOutputPlus out, boolean isPrepare)
         {
-            int size = (int)in.readUnsignedVInt();
-            assert size > 0;
-
-            PartitionUpdate update = PartitionUpdate.serializer.deserialize(in, version, flag);
-            if (size == 1)
-                return new Mutation(update);
-
-            ImmutableMap.Builder<TableId, PartitionUpdate> modifications = new ImmutableMap.Builder<>();
-            DecoratedKey dk = update.partitionKey();
+            return isPrepare && out.position() > CACHEABLE_MUTATION_SIZE_LIMIT;
+        }
 
-            modifications.put(update.metadata().id, update);
-            for (int i = 1; i < size; ++i)
+        public Mutation deserialize(DataInputPlus in, int version, DeserializationHelper.Flag flag) throws IOException
+        {
+            Mutation m;
+            try (DataOutputBuffer dob = DataOutputBuffer.limitedScratchBuffer(CACHEABLE_MUTATION_SIZE_LIMIT))
             {
-                update = PartitionUpdate.serializer.deserialize(in, version, flag);
-                modifications.put(update.metadata().id, update);
+                in = new TeeDataInputPlus(in, dob, CACHEABLE_MUTATION_SIZE_LIMIT);
+
+                int size = (int) in.readUnsignedVInt();
+                assert size > 0;
+
+                PartitionUpdate update = PartitionUpdate.serializer.deserialize(in, version, flag);
+                if (size == 1)
+                {
+                    m = new Mutation(update);
+                }
+                else
+                {
+                    ImmutableMap.Builder<TableId, PartitionUpdate> modifications = new ImmutableMap.Builder<>();
+                    DecoratedKey dk = update.partitionKey();
+
+                    modifications.put(update.metadata().id, update);
+                    for (int i = 1; i < size; ++i)
+                    {
+                        update = PartitionUpdate.serializer.deserialize(in, version, flag);
+                        modifications.put(update.metadata().id, update);
+                    }
+                    m = new Mutation(update.metadata().keyspace, dk, modifications.build(), approxTime.now());
+                }
+
+                if (!((TeeDataInputPlus)in).isLimitReached())

Review Comment:
   So higher level meta question: right now there's really no sign that the data in the `DataOutputBuffer` is incomplete except for checking the `isLimitReached` method inside `TeeDataInputPlus`. This leaves us with the case where a buffer could be floating around and accessed in an incomplete state (between write calls above and checking) - how could we better tie this together?
   
   Full disclosure - I can't think of a good answer. I don't think decorating `DataOutputBuffer` with a bool indicating if it's valid or not makes any sense for 99+% of the other use-cases. We could consider wrapping access to the nested `teeBuffer` inside `TeeDataInputPlus` to return null or some other sentinel on limit reached but at that point it's just a worse version of this current API.
   
   Anyway - this might be something worth calling out on the JavaDoc class level comment for TeeDataInputPlus and be clearly prescriptive about the intended use of the class.



##########
src/java/org/apache/cassandra/io/util/DataOutputBuffer.java:
##########
@@ -61,32 +61,62 @@ private enum AllocationType { DIRECT, ONHEAP }
         @Override
         protected DataOutputBuffer initialValue()
         {
-            return new DataOutputBuffer()
-            {
-                @Override
-                public void close()
-                {
-                    if (buffer != null && buffer.capacity() <= MAX_RECYCLE_BUFFER_SIZE)
-                    {
-                        buffer.clear();
-                    }
-                    else
-                    {
-                        setBuffer(allocate(DEFAULT_INITIAL_BUFFER_SIZE));
-                    }
-                }
-
-                @Override
-                protected ByteBuffer allocate(int size)
-                {
-                    return ALLOCATION_TYPE == AllocationType.DIRECT ?
-                           ByteBuffer.allocateDirect(size) :
-                           ByteBuffer.allocate(size);
-                }
-            };
+            return new LimitingScratchBuffer();
         }
     };
 
+    /**
+     * Returns a {@link DataOutputBuffer} that is limited to a capacity of {@code limit} bytes.
+     * A {@code limit} of 0 means unlimited.
+     */
+    public static DataOutputBuffer limitedScratchBuffer(long limit)
+    {
+        LimitingScratchBuffer dob = (LimitingScratchBuffer) scratchBuffer.get();
+        dob.limit = limit;
+        return dob;
+    }
+
+    public static class BufferCapacityExceededException extends RuntimeException
+    {}
+
+    private static final class LimitingScratchBuffer extends DataOutputBuffer

Review Comment:
   Similar to how size limiting is a first class citizen in `TeeDataInputPlus`, I _think_ we could do the same thing for DataOutputPlus by promoting the `initialSizeBuffer` and `limit` to top level variables and just not use them in the case where the "no limit" constructor is used. That'd give us consistency between the structures of the `DataOutputPlus` and `TeeDataInputPlus` and their treatment of limits and remove one layer of type indirection that exists here, as well as the casting on usage.



##########
src/java/org/apache/cassandra/io/util/TeeDataInputPlus.java:
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.utils.Throwables;
+
+/**
+ * DataInput that also stores the raw inputs into an output buffer
+ * This is useful for storing serialized buffers as they are deserialized
+ */
+public class TeeDataInputPlus implements DataInputPlus
+{
+    private final DataInputPlus source;
+    private final DataOutputPlus teeBuffer;
+
+    private final long limit;
+    private boolean limitReached = false;
+
+    public TeeDataInputPlus(DataInputPlus source, DataOutputPlus teeBuffer)
+    {
+        this(source, teeBuffer, -1);
+    }
+
+    public TeeDataInputPlus(DataInputPlus source, DataOutputPlus teeBuffer, long limit)
+    {
+        this.source = source;
+        this.teeBuffer = teeBuffer;
+        this.limit = limit;
+    }
+
+    private void maybeWrite(int length, Throwables.DiscreteAction<IOException> writeAction) throws IOException
+    {
+        if (!limitReached && (limit <= 0 || teeBuffer.position() + length < limit))
+            writeAction.perform();
+        else
+            limitReached = true;
+    }
+
+    @Override
+    public void readFully(byte[] bytes) throws IOException
+    {
+        source.readFully(bytes);
+        maybeWrite(bytes.length, () -> teeBuffer.write(bytes));
+    }
+
+    @Override
+    public void readFully(byte[] bytes, int offset, int length) throws IOException
+    {
+        source.readFully(bytes, offset, length);
+        maybeWrite(length, () -> teeBuffer.write(bytes, offset, length));
+    }
+
+    @Override
+    public int skipBytes(int n) throws IOException
+    {
+        for (int i = 0; i < n; i++)
+        {
+            try
+            {
+                byte v = source.readByte();
+                maybeWrite(TypeSizes.BYTE_SIZE, () -> teeBuffer.writeByte(v));
+            }
+            catch (EOFException eof)
+            {
+                return i;
+            }
+        }
+        return n;
+    }
+
+    @Override
+    public boolean readBoolean() throws IOException
+    {
+        boolean v = source.readBoolean();
+        maybeWrite(TypeSizes.BOOL_SIZE, () -> teeBuffer.writeBoolean(v));
+        return v;
+    }
+
+    @Override
+    public byte readByte() throws IOException
+    {
+        byte v = source.readByte();
+        maybeWrite(TypeSizes.BYTE_SIZE, () -> teeBuffer.writeByte(v));
+        return v;
+    }
+
+    @Override
+    public int readUnsignedByte() throws IOException
+    {
+        int v = source.readUnsignedByte();
+        maybeWrite(TypeSizes.BYTE_SIZE, () -> teeBuffer.writeByte(v));
+        return v;
+    }
+
+    @Override
+    public short readShort() throws IOException
+    {
+        short v = source.readShort();
+        maybeWrite(TypeSizes.SHORT_SIZE, () -> teeBuffer.writeShort(v));
+        return v;
+    }
+
+    @Override
+    public int readUnsignedShort() throws IOException
+    {
+        int v = source.readUnsignedShort();
+        maybeWrite(TypeSizes.SHORT_SIZE, () -> teeBuffer.writeShort(v));
+        return v;
+    }
+
+    @Override
+    public char readChar() throws IOException
+    {
+        char v = source.readChar();
+        maybeWrite(TypeSizes.BYTE_SIZE, () -> teeBuffer.writeChar(v));
+        return v;
+    }
+
+    @Override
+    public int readInt() throws IOException
+    {
+        int v = source.readInt();
+        maybeWrite(TypeSizes.INT_SIZE, () -> teeBuffer.writeInt(v));
+        return v;
+    }
+
+    @Override
+    public long readLong() throws IOException
+    {
+        long v = source.readLong();
+        maybeWrite(TypeSizes.LONG_SIZE, () -> teeBuffer.writeLong(v));
+        return v;
+    }
+
+    @Override
+    public float readFloat() throws IOException
+    {
+        float v = source.readFloat();
+        maybeWrite(TypeSizes.FLOAT_SIZE, () -> teeBuffer.writeFloat(v));
+        return v;
+    }
+
+    @Override
+    public double readDouble() throws IOException
+    {
+        double v = source.readDouble();
+        maybeWrite(TypeSizes.DOUBLE_SIZE, () -> teeBuffer.writeDouble(v));
+        return v;
+    }
+
+    @Override
+    public String readLine() throws IOException
+    {
+        //This one isn't safe since we know the actual line termination type
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public String readUTF() throws IOException
+    {
+        String v = source.readUTF();
+        maybeWrite(TypeSizes.sizeof(v), () -> teeBuffer.writeUTF(v));
+        return v;
+    }
+
+    @Override
+    public long readVInt() throws IOException
+    {
+        long v = source.readVInt();
+        maybeWrite(TypeSizes.sizeofVInt(v), () -> teeBuffer.writeVInt(v));
+        return v;
+    }
+
+    @Override
+    public long readUnsignedVInt() throws IOException
+    {
+        long v = source.readUnsignedVInt();
+        maybeWrite(TypeSizes.sizeofUnsignedVInt(v), () -> teeBuffer.writeUnsignedVInt(v));
+        return v;
+    }
+
+    @Override
+    public void skipBytesFully(int n) throws IOException
+    {
+        source.skipBytesFully(n);
+        maybeWrite(n, () -> {
+            for (int i = 0; i < n; i++)
+                teeBuffer.writeByte(0);
+        });
+    }
+
+    public boolean isLimitReached()

Review Comment:
   JavaDoc this to indicate that a "true" return indicates the data in your `teeBuffer` can't be trusted.



##########
src/java/org/apache/cassandra/net/MessagingService.java:
##########
@@ -212,6 +215,23 @@ public class MessagingService extends MessagingServiceMBeanImpl
     public static final int current_version = VERSION_40;
     static AcceptVersions accept_messaging = new AcceptVersions(minimum_version, current_version);
     static AcceptVersions accept_streaming = new AcceptVersions(current_version, current_version);
+    static Map<Integer, Integer> versionIndexMap = Arrays.stream(Version.values()).collect(Collectors.toMap(v -> v.value, v -> v.ordinal()));
+
+    /**
+     * This is an optimisation to speed up the translation of the serialization
+     * version to the {@link Version} enum.
+     *
+     * @param version the serialization version
+     * @return a {@link Version}
+     */
+    public static int getVersionIndex(int version)

Review Comment:
   Nit: Consider renaming to something very literal like `getVersionOrdinal` -> the current name (index) doesn't really clarify _what_ index it's retrieving for what context / what purpose / etc.
   
   I've spent some time chewing on this; I really don't like it from a meta / aesthetic perspective but I think we're backed into a corner w/the original enum implementation and it's rather strange values that we're using for `Version` right now.



##########
test/unit/org/apache/cassandra/utils/TeeDataInputPlusTest.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.util.Arrays;
+
+import org.junit.Test;
+
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.TeeDataInputPlus;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TeeDataInputPlusTest
+{
+    @Test
+    public void testTeeBuffer() throws Exception
+    {
+        DataOutputBuffer out = new DataOutputBuffer();
+        byte[] testData;
+
+        // boolean
+        out.writeBoolean(true);
+        // byte
+        out.writeByte(0x1);
+        // char
+        out.writeChar('a');
+        // short
+        out.writeShort(1);
+        // int
+        out.writeInt(1);
+        // long
+        out.writeLong(1L);
+        // float
+        out.writeFloat(1.0f);
+        // double
+        out.writeDouble(1.0d);
+        // vint
+        out.writeVInt(-1337L);
+        //unsigned vint
+        out.writeUnsignedVInt(1337L);
+        // String
+        out.writeUTF("abc");
+        //Another string to test skip
+        out.writeUTF("garbagetoskipattheend");
+        testData = out.toByteArray();
+
+        int LIMITED_SIZE = 40;
+        DataInputBuffer reader = new DataInputBuffer(testData);
+        DataInputBuffer reader2 = new DataInputBuffer(testData);
+        DataOutputBuffer teeOut = new DataOutputBuffer();
+        DataOutputBuffer limitedTeeOut = new DataOutputBuffer();
+        TeeDataInputPlus tee = new TeeDataInputPlus(reader, teeOut);
+        TeeDataInputPlus limitedTee = new TeeDataInputPlus(reader2, limitedTeeOut, LIMITED_SIZE);
+
+        // boolean = 1byte
+        boolean bool = tee.readBoolean();
+        assertTrue(bool);
+        bool = limitedTee.readBoolean();
+        assertTrue(bool);
+        // byte = 1byte
+        byte b = tee.readByte();
+        assertEquals(b, 0x1);
+        b = limitedTee.readByte();
+        assertEquals(b, 0x1);
+        // char = 2byte
+        char c = tee.readChar();
+        assertEquals('a', c);
+        c = limitedTee.readChar();
+        assertEquals('a', c);
+        // short = 2bytes
+        short s = tee.readShort();
+        assertEquals(1, s);
+        s = limitedTee.readShort();
+        assertEquals(1, s);
+        // int = 4bytes
+        int i = tee.readInt();
+        assertEquals(1, i);
+        i = limitedTee.readInt();
+        assertEquals(1, i);
+        // long = 8bytes
+        long l = tee.readLong();
+        assertEquals(1L, l);
+        l = limitedTee.readLong();
+        assertEquals(1L, l);
+        // float = 4bytes
+        float f = tee.readFloat();
+        assertEquals(1.0f, f, 0);
+        f = limitedTee.readFloat();
+        assertEquals(1.0f, f, 0);
+        // double = 8bytes
+        double d = tee.readDouble();
+        assertEquals(1.0d, d, 0);
+        d = limitedTee.readDouble();
+        assertEquals(1.0d, d, 0);
+        long vint = tee.readVInt();
+        assertEquals(-1337L, vint);
+        vint = limitedTee.readVInt();
+        assertEquals(-1337L, vint);
+        long uvint = tee.readUnsignedVInt();
+        assertEquals(1337L, uvint);
+        uvint = limitedTee.readUnsignedVInt();
+        assertEquals(1337L, uvint);
+        // String("abc") = 2(string size) + 3 = 5 bytes
+        String str = tee.readUTF();
+        assertEquals("abc", str);
+        str = limitedTee.readUTF();
+        assertEquals("abc", str);
+        int skipped = tee.skipBytes(100);
+        assertEquals(23, skipped);
+        skipped = limitedTee.skipBytes(100);
+        assertEquals(23, skipped);
+
+        byte[] teeData = teeOut.toByteArray();
+        assertFalse(tee.isLimitReached());
+        assertTrue(Arrays.equals(testData, teeData));
+
+        byte[] limitedTeeData = limitedTeeOut.toByteArray();
+        assertTrue(limitedTee.isLimitReached());
+        assertTrue(Arrays.equals(Arrays.copyOf(testData, LIMITED_SIZE -1 ), limitedTeeData));

Review Comment:
   ... 2 integers together without a delimiter? I'm not familiar with this - what's this about?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] josh-mckenzie commented on pull request #1954: Cassandra 17998

Posted by GitBox <gi...@apache.org>.
josh-mckenzie commented on PR #1954:
URL: https://github.com/apache/cassandra/pull/1954#issuecomment-1331256000

   Some top level questions (about to transfer line items from notes to the diff):
   * Why would caching a mutation at any given size be particularly bad for GC? We're already paying the tax to ser or deser in the first place; how long do we expect these messages to hang around and live -> worst-case is timeout from replicas?
   * Where does our initial value for the default on `cacheable_mutation_size_limit_bytes` come from?
   * If we're doing ThreadLocal on the limitedScratchBuffer, how are we managing the implications of a Mutation `deserialize` call bouncing across threads? We'll end up with a deserialized copy of it living in memory on another thread should something jump to another Stage in the future etc right? Not a big deal as they should be short-lived eden and we'd be deserializing it anyway, just thinking of the tradeoffs between having a central cache of deserialized mutations w/the current "raciness" attributes vs. the threadlocal approach.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] josh-mckenzie commented on a diff in pull request #1954: Cassandra 17998

Posted by GitBox <gi...@apache.org>.
josh-mckenzie commented on code in PR #1954:
URL: https://github.com/apache/cassandra/pull/1954#discussion_r1041255144


##########
src/java/org/apache/cassandra/io/util/DataOutputBuffer.java:
##########
@@ -61,32 +61,62 @@ private enum AllocationType { DIRECT, ONHEAP }
         @Override
         protected DataOutputBuffer initialValue()
         {
-            return new DataOutputBuffer()
-            {
-                @Override
-                public void close()
-                {
-                    if (buffer != null && buffer.capacity() <= MAX_RECYCLE_BUFFER_SIZE)
-                    {
-                        buffer.clear();
-                    }
-                    else
-                    {
-                        setBuffer(allocate(DEFAULT_INITIAL_BUFFER_SIZE));
-                    }
-                }
-
-                @Override
-                protected ByteBuffer allocate(int size)
-                {
-                    return ALLOCATION_TYPE == AllocationType.DIRECT ?
-                           ByteBuffer.allocateDirect(size) :
-                           ByteBuffer.allocate(size);
-                }
-            };
+            return new LimitingScratchBuffer();
         }
     };
 
+    /**
+     * Returns a {@link DataOutputBuffer} that is limited to a capacity of {@code limit} bytes.
+     * A {@code limit} of 0 means unlimited.
+     */
+    public static DataOutputBuffer limitedScratchBuffer(long limit)
+    {
+        LimitingScratchBuffer dob = (LimitingScratchBuffer) scratchBuffer.get();

Review Comment:
   A lot clearer now w/the new init + overrides on scratchBuffer init. 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] smiklosovic closed pull request #1954: Cassandra 17998

Posted by "smiklosovic (via GitHub)" <gi...@apache.org>.
smiklosovic closed pull request #1954: Cassandra 17998
URL: https://github.com/apache/cassandra/pull/1954


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] tjake commented on a diff in pull request #1954: Cassandra 17998

Posted by GitBox <gi...@apache.org>.
tjake commented on code in PR #1954:
URL: https://github.com/apache/cassandra/pull/1954#discussion_r1039024376


##########
src/java/org/apache/cassandra/db/Mutation.java:
##########
@@ -393,34 +388,130 @@ public static class MutationSerializer implements IVersionedSerializer<Mutation>
     {
         public void serialize(Mutation mutation, DataOutputPlus out, int version) throws IOException
         {
+            serialization(mutation, version).serialize(PartitionUpdate.serializer, mutation, out, version);
+        }
+
+        /**
+         * Called early during request processing to prevent that {@link #serialization(Mutation)} is
+         * called concurrently.
+         * See {@link org.apache.cassandra.service.StorageProxy#sendToHintedEndpoints(Mutation, WriteEndpoints, WriteHandler, Verb.AckedRequest)}.
+         */
+        @SuppressWarnings("JavadocReference")
+        public void prepareSerializedBuffer(Mutation mutation, int version)
+        {
+            serialization(mutation, version);
+        }
+
+        /**
+         * Retrieve the cached serialization of this mutation, or computed and cache said serialization if it doesn't
+         * exists yet. Note that this method is _not_ synchronized even though it may (and will often) be called
+         * concurrently. Concurrent calls are still safe however, the only risk is that the value is not cached yet,
+         * multiple concurrent calls may compute it multiple times instead of just once. This is ok as in practice
+         * as we make sure this doesn't happen in the hot path by forcing the initial caching in
+         * {@link org.apache.cassandra.service.StorageProxy#sendToHintedEndpoints(Mutation, WriteEndpoints, WriteHandler, Verb.AckedRequest)}
+         * via {@link #prepareSerializedBuffer(Mutation)}, which is the only caller that passes
+         * {@code isPrepare==true}.
+         */
+        @SuppressWarnings("JavadocReference")
+        private Serialization serialization(Mutation mutation, int version)
+        {
+            int versionIndex = MessagingService.getVersionIndex(version);
+            // Retrieves the cached version, or build+cache it if it's not cached already.
+            Serialization serialization = mutation.serializations[versionIndex];
+            if (serialization == null)
+            {
+                // We need to use a capacity-limited DOB here.
+                // If a mutation consists of one PartitionUpdate with one column that exceeds the
+                // "cacheable-mutation-size-limit", a capacity-limited DOB can handle that case and
+                // throw a BufferCapacityExceededException. "Huge" serialized mutations can have a
+                // bad impact to G1 GC, if the cached serialized mutation results in a
+                // "humonguous object" and also frequent re-allocations of the scratch buffer(s).
+                // I.e. large cached mutation objects cause GC pressure.
+                try (DataOutputBuffer dob = DataOutputBuffer.limitedScratchBuffer(CACHEABLE_MUTATION_SIZE_LIMIT))
+                {
+                    if (!serializeInternal(PartitionUpdate.serializer, mutation, dob, version,true))
+                    {
+                        serialization = new NonCacheableSerialization();
+                    }
+                    else
+                    {
+                        serialization = new CachedSerialization(dob.toByteArray());
+                    }
+                }
+                catch (DataOutputBuffer.BufferCapacityExceededException tooBig)

Review Comment:
   Yeah, I'm reworking this, good call out



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] tjake commented on pull request #1954: Cassandra 17998

Posted by GitBox <gi...@apache.org>.
tjake commented on PR #1954:
URL: https://github.com/apache/cassandra/pull/1954#issuecomment-1336512942

   Thanks for the great review.  I reworked the code to avoid the whole limiting buffer/exception throwing. And I think addressed all the other issues you mentioned.
   
   > Why would caching a mutation at any given size be particularly bad for GC? 
   
   Since you can end up making [Humongous allocations](https://docs.oracle.com/javase/10/gctuning/garbage-first-garbage-collector.htm#JSGCT-GUID-AC383806-7FA7-4698-8B92-4FD092B9F368) 
   
   This threshold is based on the size of the heap but 1MB is a good default since that's the low end and covers 99% of Mutations.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] tjake commented on a diff in pull request #1954: Cassandra 17998

Posted by GitBox <gi...@apache.org>.
tjake commented on code in PR #1954:
URL: https://github.com/apache/cassandra/pull/1954#discussion_r1039025145


##########
src/java/org/apache/cassandra/net/MessagingService.java:
##########
@@ -212,6 +215,23 @@ public class MessagingService extends MessagingServiceMBeanImpl
     public static final int current_version = VERSION_40;
     static AcceptVersions accept_messaging = new AcceptVersions(minimum_version, current_version);
     static AcceptVersions accept_streaming = new AcceptVersions(current_version, current_version);
+    static Map<Integer, Integer> versionIndexMap = Arrays.stream(Version.values()).collect(Collectors.toMap(v -> v.value, v -> v.ordinal()));
+
+    /**
+     * This is an optimisation to speed up the translation of the serialization
+     * version to the {@link Version} enum.
+     *
+     * @param version the serialization version
+     * @return a {@link Version}
+     */
+    public static int getVersionIndex(int version)
+    {
+        if (versionIndexMap.containsKey(version))
+            return versionIndexMap.get(version);
+        throw new IllegalStateException("Unkown serialization version: " + version);
+    }
+
+    public final static boolean NON_GRACEFUL_SHUTDOWN = Boolean.getBoolean("cassandra.test.messagingService.nonGracefulShutdown");

Review Comment:
   oops



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] josh-mckenzie commented on a diff in pull request #1954: Cassandra 17998

Posted by GitBox <gi...@apache.org>.
josh-mckenzie commented on code in PR #1954:
URL: https://github.com/apache/cassandra/pull/1954#discussion_r1037568084


##########
src/java/org/apache/cassandra/io/util/DataOutputBuffer.java:
##########
@@ -61,32 +61,62 @@ private enum AllocationType { DIRECT, ONHEAP }
         @Override
         protected DataOutputBuffer initialValue()
         {
-            return new DataOutputBuffer()
-            {
-                @Override
-                public void close()
-                {
-                    if (buffer != null && buffer.capacity() <= MAX_RECYCLE_BUFFER_SIZE)
-                    {
-                        buffer.clear();
-                    }
-                    else
-                    {
-                        setBuffer(allocate(DEFAULT_INITIAL_BUFFER_SIZE));
-                    }
-                }
-
-                @Override
-                protected ByteBuffer allocate(int size)
-                {
-                    return ALLOCATION_TYPE == AllocationType.DIRECT ?
-                           ByteBuffer.allocateDirect(size) :
-                           ByteBuffer.allocate(size);
-                }
-            };
+            return new LimitingScratchBuffer();
         }
     };
 
+    /**
+     * Returns a {@link DataOutputBuffer} that is limited to a capacity of {@code limit} bytes.
+     * A {@code limit} of 0 means unlimited.
+     */
+    public static DataOutputBuffer limitedScratchBuffer(long limit)
+    {
+        LimitingScratchBuffer dob = (LimitingScratchBuffer) scratchBuffer.get();

Review Comment:
   Been chewing on this - given the advice is to close out this DOB quickly to prevent heap pressure, what's the point of using the FastThreadLocal scratchBuffer instead of just a new DOB?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] josh-mckenzie commented on pull request #1954: Cassandra 17998

Posted by GitBox <gi...@apache.org>.
josh-mckenzie commented on PR #1954:
URL: https://github.com/apache/cassandra/pull/1954#issuecomment-1339720224

   Couple spelling nits and it looks like we dropped the new unit test in DataOutputTest when we dropped that unused import:
   ```
       @Test
       public void testDataOutputBufferLimitingScratchBuffer() throws IOException
       {
   ```
   
   w/those fixed we're good to go. 👍 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] josh-mckenzie commented on a diff in pull request #1954: Cassandra 17998

Posted by GitBox <gi...@apache.org>.
josh-mckenzie commented on code in PR #1954:
URL: https://github.com/apache/cassandra/pull/1954#discussion_r1041237457


##########
src/java/org/apache/cassandra/db/Mutation.java:
##########
@@ -393,34 +411,115 @@ public static class MutationSerializer implements IVersionedSerializer<Mutation>
     {
         public void serialize(Mutation mutation, DataOutputPlus out, int version) throws IOException
         {
+            serialization(mutation, version).serialize(PartitionUpdate.serializer, mutation, out, version);
+        }
+
+        /**
+         * Called early during request processing to prevent that {@link #serialization(Mutation, int)} is
+         * called concurrently.
+         * See {@link org.apache.cassandra.service.StorageProxy#sendToHintedReplicas(Mutation, ReplicaPlan.ForWrite, AbstractWriteResponseHandler, String, Stage)}
+         */
+        @SuppressWarnings("JavadocReference")
+        public void prepareSerializedBuffer(Mutation mutation, int version)
+        {
+            serialization(mutation, version);
+        }
+
+        /**
+         * Retrieve the cached serialization of this mutation, or compute and cache said serialization if it doesn't
+         * exist yet. Note that this method is _not_ synchronized even though it may (and will often) be called
+         * concurrently. Concurrent calls are still safe however, the only risk is that the value is not cached yet,
+         * multiple concurrent calls may compute it multiple times instead of just once. This is ok as in practice
+         * as we make sure this doesn't happen in the hot path by forcing the initial caching in
+         * {@link org.apache.cassandra.service.StorageProxy#sendToHintedReplicas(Mutation, ReplicaPlan.ForWrite, AbstractWriteResponseHandler, String, Stage)}
+         * via {@link #prepareSerializedBuffer(Mutation)}, which is the only caller that passes
+         * {@code isPrepare==true}.
+         */
+        @SuppressWarnings("JavadocReference")
+        private Serialization serialization(Mutation mutation, int version)
+        {
+            int versionOrdinal = MessagingService.getVersionOrdinal(version);
+            // Retrieves the cached version, or build+cache it if it's not cached already.
+            Serialization serialization = mutation.cachedSerializations[versionOrdinal];
+            if (serialization == null)
+            {
+                serialization = new SizeOnlyCacheableSerialization();
+                long sertializedSize = serialization.serializedSize(PartitionUpdate.serializer, mutation, version);
+
+                // Exessively large mutation objects cause GC pressure and huge allocations when serialized.

Review Comment:
   nit: Excessively



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] tjake commented on pull request #1954: Cassandra 17998

Posted by GitBox <gi...@apache.org>.
tjake commented on PR #1954:
URL: https://github.com/apache/cassandra/pull/1954#issuecomment-1356848357

   > Couple spelling nits and it looks like we dropped the new unit test in DataOutputTest when we dropped that unused import:
   > 
   > ```
   >     @Test
   >     public void testDataOutputBufferLimitingScratchBuffer() throws IOException
   >     {
   > ```
   > 
   > w/those fixed we're good to go. +1
   
   I removed the LimitingScratchBuffer altogether.  Fixing the typos thx


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] smiklosovic commented on a diff in pull request #1954: Cassandra 17998

Posted by GitBox <gi...@apache.org>.
smiklosovic commented on code in PR #1954:
URL: https://github.com/apache/cassandra/pull/1954#discussion_r1052288718


##########
src/java/org/apache/cassandra/db/Mutation.java:
##########
@@ -64,6 +66,13 @@ public class Mutation implements IMutation, Supplier<Mutation>
 
     private final boolean cdcEnabled;
 
+    // Contains serialized representations of this mutation.
+    // Note: there is no functionality to clear/remove serialized instances, because a mutation must never
+    // be modified (e.g. calling add(PartitionUpdate)) when it's being serialized.
+    private static final int CACHED_SERIALIZATIONS = MessagingService.Version.values().length;
+    private static final int CACHEABLE_MUTATION_SIZE_LIMIT = Integer.getInteger(Config.PROPERTY_PREFIX + "cacheable_mutation_size_limit_bytes", 2 * 1024 * 1024) - 24;

Review Comment:
   This should be moved to `CassandraRelevantProperties`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] josh-mckenzie commented on a diff in pull request #1954: Cassandra 17998

Posted by GitBox <gi...@apache.org>.
josh-mckenzie commented on code in PR #1954:
URL: https://github.com/apache/cassandra/pull/1954#discussion_r1041253027


##########
test/unit/org/apache/cassandra/utils/TeeDataInputPlusTest.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.util.Arrays;
+
+import org.junit.Test;
+
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.TeeDataInputPlus;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TeeDataInputPlusTest
+{
+    @Test
+    public void testTeeBuffer() throws Exception
+    {
+        DataOutputBuffer out = new DataOutputBuffer();
+        byte[] testData;
+
+        // boolean
+        out.writeBoolean(true);
+        // byte
+        out.writeByte(0x1);
+        // char
+        out.writeChar('a');
+        // short
+        out.writeShort(1);
+        // int
+        out.writeInt(1);
+        // long
+        out.writeLong(1L);
+        // float
+        out.writeFloat(1.0f);
+        // double
+        out.writeDouble(1.0d);
+        // vint
+        out.writeVInt(-1337L);
+        //unsigned vint
+        out.writeUnsignedVInt(1337L);
+        // String
+        out.writeUTF("abc");
+        //Another string to test skip
+        out.writeUTF("garbagetoskipattheend");
+        testData = out.toByteArray();
+
+        int LIMITED_SIZE = 40;
+        DataInputBuffer reader = new DataInputBuffer(testData);
+        DataInputBuffer reader2 = new DataInputBuffer(testData);
+        DataOutputBuffer teeOut = new DataOutputBuffer();
+        DataOutputBuffer limitedTeeOut = new DataOutputBuffer();
+        TeeDataInputPlus tee = new TeeDataInputPlus(reader, teeOut);
+        TeeDataInputPlus limitedTee = new TeeDataInputPlus(reader2, limitedTeeOut, LIMITED_SIZE);
+
+        // boolean = 1byte
+        boolean bool = tee.readBoolean();
+        assertTrue(bool);
+        bool = limitedTee.readBoolean();
+        assertTrue(bool);
+        // byte = 1byte
+        byte b = tee.readByte();
+        assertEquals(b, 0x1);
+        b = limitedTee.readByte();
+        assertEquals(b, 0x1);
+        // char = 2byte
+        char c = tee.readChar();
+        assertEquals('a', c);
+        c = limitedTee.readChar();
+        assertEquals('a', c);
+        // short = 2bytes
+        short s = tee.readShort();
+        assertEquals(1, s);
+        s = limitedTee.readShort();
+        assertEquals(1, s);
+        // int = 4bytes
+        int i = tee.readInt();
+        assertEquals(1, i);
+        i = limitedTee.readInt();
+        assertEquals(1, i);
+        // long = 8bytes
+        long l = tee.readLong();
+        assertEquals(1L, l);
+        l = limitedTee.readLong();
+        assertEquals(1L, l);
+        // float = 4bytes
+        float f = tee.readFloat();
+        assertEquals(1.0f, f, 0);
+        f = limitedTee.readFloat();
+        assertEquals(1.0f, f, 0);
+        // double = 8bytes
+        double d = tee.readDouble();
+        assertEquals(1.0d, d, 0);
+        d = limitedTee.readDouble();
+        assertEquals(1.0d, d, 0);
+        long vint = tee.readVInt();
+        assertEquals(-1337L, vint);
+        vint = limitedTee.readVInt();
+        assertEquals(-1337L, vint);
+        long uvint = tee.readUnsignedVInt();
+        assertEquals(1337L, uvint);
+        uvint = limitedTee.readUnsignedVInt();
+        assertEquals(1337L, uvint);
+        // String("abc") = 2(string size) + 3 = 5 bytes
+        String str = tee.readUTF();
+        assertEquals("abc", str);
+        str = limitedTee.readUTF();
+        assertEquals("abc", str);
+        int skipped = tee.skipBytes(100);
+        assertEquals(23, skipped);
+        skipped = limitedTee.skipBytes(100);
+        assertEquals(23, skipped);
+
+        byte[] teeData = teeOut.toByteArray();
+        assertFalse(tee.isLimitReached());
+        assertTrue(Arrays.equals(testData, teeData));
+
+        byte[] limitedTeeData = limitedTeeOut.toByteArray();
+        assertTrue(limitedTee.isLimitReached());
+        assertTrue(Arrays.equals(Arrays.copyOf(testData, LIMITED_SIZE -1 ), limitedTeeData));

Review Comment:
   Oh. 🤦 This is a formatting issue - missing a whitespace between the - and the 1. I was literally reading
   `[int] [-int]` instead of `[int] - [int]`.
   
   So this should be
   ```
   assertTrue(Arrays.equals(Arrays.copyOf(testData, LIMITED_SIZE - 1 ), limitedTeeData));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] josh-mckenzie commented on a diff in pull request #1954: Cassandra 17998

Posted by GitBox <gi...@apache.org>.
josh-mckenzie commented on code in PR #1954:
URL: https://github.com/apache/cassandra/pull/1954#discussion_r1041248090


##########
src/java/org/apache/cassandra/db/Mutation.java:
##########
@@ -393,34 +411,115 @@ public static class MutationSerializer implements IVersionedSerializer<Mutation>
     {
         public void serialize(Mutation mutation, DataOutputPlus out, int version) throws IOException
         {
+            serialization(mutation, version).serialize(PartitionUpdate.serializer, mutation, out, version);
+        }
+
+        /**
+         * Called early during request processing to prevent that {@link #serialization(Mutation, int)} is
+         * called concurrently.
+         * See {@link org.apache.cassandra.service.StorageProxy#sendToHintedReplicas(Mutation, ReplicaPlan.ForWrite, AbstractWriteResponseHandler, String, Stage)}
+         */
+        @SuppressWarnings("JavadocReference")
+        public void prepareSerializedBuffer(Mutation mutation, int version)
+        {
+            serialization(mutation, version);
+        }
+
+        /**
+         * Retrieve the cached serialization of this mutation, or compute and cache said serialization if it doesn't
+         * exist yet. Note that this method is _not_ synchronized even though it may (and will often) be called
+         * concurrently. Concurrent calls are still safe however, the only risk is that the value is not cached yet,
+         * multiple concurrent calls may compute it multiple times instead of just once. This is ok as in practice
+         * as we make sure this doesn't happen in the hot path by forcing the initial caching in
+         * {@link org.apache.cassandra.service.StorageProxy#sendToHintedReplicas(Mutation, ReplicaPlan.ForWrite, AbstractWriteResponseHandler, String, Stage)}
+         * via {@link #prepareSerializedBuffer(Mutation)}, which is the only caller that passes
+         * {@code isPrepare==true}.
+         */
+        @SuppressWarnings("JavadocReference")
+        private Serialization serialization(Mutation mutation, int version)
+        {
+            int versionOrdinal = MessagingService.getVersionOrdinal(version);
+            // Retrieves the cached version, or build+cache it if it's not cached already.
+            Serialization serialization = mutation.cachedSerializations[versionOrdinal];
+            if (serialization == null)
+            {
+                serialization = new SizeOnlyCacheableSerialization();
+                long sertializedSize = serialization.serializedSize(PartitionUpdate.serializer, mutation, version);

Review Comment:
   nit: spelling. Should be "serializedSize"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] tjake commented on a diff in pull request #1954: Cassandra 17998

Posted by GitBox <gi...@apache.org>.
tjake commented on code in PR #1954:
URL: https://github.com/apache/cassandra/pull/1954#discussion_r1039025640


##########
test/unit/org/apache/cassandra/utils/TeeDataInputPlusTest.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.util.Arrays;
+
+import org.junit.Test;
+
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.TeeDataInputPlus;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TeeDataInputPlusTest
+{
+    @Test
+    public void testTeeBuffer() throws Exception
+    {
+        DataOutputBuffer out = new DataOutputBuffer();
+        byte[] testData;
+
+        // boolean
+        out.writeBoolean(true);
+        // byte
+        out.writeByte(0x1);
+        // char
+        out.writeChar('a');
+        // short
+        out.writeShort(1);
+        // int
+        out.writeInt(1);
+        // long
+        out.writeLong(1L);
+        // float
+        out.writeFloat(1.0f);
+        // double
+        out.writeDouble(1.0d);
+        // vint
+        out.writeVInt(-1337L);
+        //unsigned vint
+        out.writeUnsignedVInt(1337L);
+        // String
+        out.writeUTF("abc");
+        //Another string to test skip
+        out.writeUTF("garbagetoskipattheend");
+        testData = out.toByteArray();
+
+        int LIMITED_SIZE = 40;
+        DataInputBuffer reader = new DataInputBuffer(testData);
+        DataInputBuffer reader2 = new DataInputBuffer(testData);
+        DataOutputBuffer teeOut = new DataOutputBuffer();
+        DataOutputBuffer limitedTeeOut = new DataOutputBuffer();
+        TeeDataInputPlus tee = new TeeDataInputPlus(reader, teeOut);
+        TeeDataInputPlus limitedTee = new TeeDataInputPlus(reader2, limitedTeeOut, LIMITED_SIZE);
+
+        // boolean = 1byte
+        boolean bool = tee.readBoolean();
+        assertTrue(bool);
+        bool = limitedTee.readBoolean();
+        assertTrue(bool);
+        // byte = 1byte
+        byte b = tee.readByte();
+        assertEquals(b, 0x1);
+        b = limitedTee.readByte();
+        assertEquals(b, 0x1);
+        // char = 2byte
+        char c = tee.readChar();
+        assertEquals('a', c);
+        c = limitedTee.readChar();
+        assertEquals('a', c);
+        // short = 2bytes
+        short s = tee.readShort();
+        assertEquals(1, s);
+        s = limitedTee.readShort();
+        assertEquals(1, s);
+        // int = 4bytes
+        int i = tee.readInt();
+        assertEquals(1, i);
+        i = limitedTee.readInt();
+        assertEquals(1, i);
+        // long = 8bytes
+        long l = tee.readLong();
+        assertEquals(1L, l);
+        l = limitedTee.readLong();
+        assertEquals(1L, l);
+        // float = 4bytes
+        float f = tee.readFloat();
+        assertEquals(1.0f, f, 0);
+        f = limitedTee.readFloat();
+        assertEquals(1.0f, f, 0);
+        // double = 8bytes
+        double d = tee.readDouble();
+        assertEquals(1.0d, d, 0);
+        d = limitedTee.readDouble();
+        assertEquals(1.0d, d, 0);
+        long vint = tee.readVInt();
+        assertEquals(-1337L, vint);
+        vint = limitedTee.readVInt();
+        assertEquals(-1337L, vint);
+        long uvint = tee.readUnsignedVInt();
+        assertEquals(1337L, uvint);
+        uvint = limitedTee.readUnsignedVInt();
+        assertEquals(1337L, uvint);
+        // String("abc") = 2(string size) + 3 = 5 bytes
+        String str = tee.readUTF();
+        assertEquals("abc", str);
+        str = limitedTee.readUTF();
+        assertEquals("abc", str);
+        int skipped = tee.skipBytes(100);
+        assertEquals(23, skipped);
+        skipped = limitedTee.skipBytes(100);
+        assertEquals(23, skipped);
+
+        byte[] teeData = teeOut.toByteArray();
+        assertFalse(tee.isLimitReached());
+        assertTrue(Arrays.equals(testData, teeData));
+
+        byte[] limitedTeeData = limitedTeeOut.toByteArray();
+        assertTrue(limitedTee.isLimitReached());
+        assertTrue(Arrays.equals(Arrays.copyOf(testData, LIMITED_SIZE -1 ), limitedTeeData));

Review Comment:
   I'm not sure I follow, do you mean the `LIMITED_SIZE-1`?  Since the limit is `LIMITED_SIZE` the buffer stopped copying at `LIMITED_SIZE-1` bytes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org