You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jw...@apache.org on 2020/05/04 18:18:46 UTC

[cassandra] branch trunk updated: Improve logging when mutation passed to commit log is too large

This is an automated email from the ASF dual-hosted git repository.

jwest pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d3dadcd  Improve logging when mutation passed to commit log is too large
d3dadcd is described below

commit d3dadcd6f3bbde471e972f8332eb62de0f2d4aae
Author: nvharikrishna <n....@gmail.com>
AuthorDate: Wed Jan 8 22:06:06 2020 +0530

    Improve logging when mutation passed to commit log is too large
    
    Patch by Venkata Harikrishna Nukala; reviewed by Jordan West for CASSANDRA-14781
---
 CHANGES.txt                                        |  1 +
 src/java/org/apache/cassandra/batchlog/Batch.java  |  4 +-
 .../org/apache/cassandra/db/CounterMutation.java   | 39 +++++++++-
 src/java/org/apache/cassandra/db/IMutation.java    | 13 ++++
 src/java/org/apache/cassandra/db/Mutation.java     | 36 +++++++++
 .../db/MutationExceededMaxSizeException.java       | 89 ++++++++++++++++++++++
 .../apache/cassandra/db/commitlog/CommitLog.java   | 14 +---
 .../cassandra/db/virtual/VirtualMutation.java      |  5 ++
 src/java/org/apache/cassandra/hints/Hint.java      |  2 +-
 .../apache/cassandra/schema/MigrationManager.java  |  2 +-
 .../service/reads/repair/BlockingReadRepairs.java  | 61 +++++++--------
 .../cassandra/test/microbench/MutationBench.java   |  2 +-
 .../db/MutationExceededMaxSizeExceptionTest.java   | 46 +++++++++++
 .../cassandra/db/commitlog/CommitLogTest.java      | 55 +++++++++++--
 14 files changed, 316 insertions(+), 53 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 89c8d7d..8c4cf35 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-alpha5
+ * Improve logging when mutation passed to commit log is too large (CASSANDRA-14781)
  * replace LZ4FastDecompressor with LZ4SafeDecompressor (CASSANDRA-15560)
  * Fix buffer pool NPE with concurrent release due to in-progress tiny pool eviction (CASSANDRA-15726)
  * Avoid race condition when completing stream sessions (CASSANDRA-15666)
diff --git a/src/java/org/apache/cassandra/batchlog/Batch.java b/src/java/org/apache/cassandra/batchlog/Batch.java
index e91e3ca..fb6c5d5 100644
--- a/src/java/org/apache/cassandra/batchlog/Batch.java
+++ b/src/java/org/apache/cassandra/batchlog/Batch.java
@@ -90,7 +90,7 @@ public final class Batch
             size += sizeofUnsignedVInt(batch.decodedMutations.size());
             for (Mutation mutation : batch.decodedMutations)
             {
-                int mutationSize = (int) Mutation.serializer.serializedSize(mutation, version);
+                int mutationSize = mutation.serializedSize(version);
                 size += sizeofUnsignedVInt(mutationSize);
                 size += mutationSize;
             }
@@ -108,7 +108,7 @@ public final class Batch
             out.writeUnsignedVInt(batch.decodedMutations.size());
             for (Mutation mutation : batch.decodedMutations)
             {
-                out.writeUnsignedVInt(Mutation.serializer.serializedSize(mutation, version));
+                out.writeUnsignedVInt(mutation.serializedSize(version));
                 Mutation.serializer.serialize(mutation, out, version);
             }
         }
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java
index bb10a6a..722ad73 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -45,6 +45,9 @@ import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.btree.BTreeSet;
 
 import static java.util.concurrent.TimeUnit.*;
+import static org.apache.cassandra.net.MessagingService.VERSION_30;
+import static org.apache.cassandra.net.MessagingService.VERSION_3014;
+import static org.apache.cassandra.net.MessagingService.VERSION_40;
 
 public class CounterMutation implements IMutation
 {
@@ -76,6 +79,15 @@ public class CounterMutation implements IMutation
         return mutation.getPartitionUpdates();
     }
 
+    public void validateSize(int version, int overhead)
+    {
+        long totalSize = serializedSize(version) + overhead;
+        if(totalSize > MAX_MUTATION_SIZE)
+        {
+            throw new MutationExceededMaxSizeException(this, version, totalSize);
+        }
+    }
+
     public Mutation getMutation()
     {
         return mutation;
@@ -308,6 +320,31 @@ public class CounterMutation implements IMutation
         return DatabaseDescriptor.getCounterWriteRpcTimeout(unit);
     }
 
+    private int serializedSize30;
+    private int serializedSize3014;
+    private int serializedSize40;
+
+    public int serializedSize(int version)
+    {
+        switch (version)
+        {
+            case VERSION_30:
+                if (serializedSize30 == 0)
+                    serializedSize30 = (int) serializer.serializedSize(this, VERSION_30);
+                return serializedSize30;
+            case VERSION_3014:
+                if (serializedSize3014 == 0)
+                    serializedSize3014 = (int) serializer.serializedSize(this, VERSION_3014);
+                return serializedSize3014;
+            case VERSION_40:
+                if (serializedSize40 == 0)
+                    serializedSize40 = (int) serializer.serializedSize(this, VERSION_40);
+                return serializedSize40;
+            default:
+                throw new IllegalStateException("Unknown serialization version: " + version);
+        }
+    }
+
     @Override
     public String toString()
     {
@@ -336,7 +373,7 @@ public class CounterMutation implements IMutation
 
         public long serializedSize(CounterMutation cm, int version)
         {
-            return Mutation.serializer.serializedSize(cm.mutation, version)
+            return cm.mutation.serializedSize(version)
                  + TypeSizes.sizeof(cm.consistency.name());
         }
     }
diff --git a/src/java/org/apache/cassandra/db/IMutation.java b/src/java/org/apache/cassandra/db/IMutation.java
index 1710cfd..10472c1 100644
--- a/src/java/org/apache/cassandra/db/IMutation.java
+++ b/src/java/org/apache/cassandra/db/IMutation.java
@@ -20,11 +20,14 @@ package org.apache.cassandra.db;
 import java.util.Collection;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.schema.TableId;
 
 public interface IMutation
 {
+    public long MAX_MUTATION_SIZE = DatabaseDescriptor.getMaxMutationSize();
+
     public void apply();
     public String getKeyspaceName();
     public Collection<TableId> getTableIds();
@@ -40,6 +43,16 @@ public interface IMutation
     }
 
     /**
+     * Validates size of mutation does not exceed {@link DatabaseDescriptor#getMaxMutationSize()}.
+     *
+     * @param version the MessagingService version the mutation is being serialized for.
+     *                see {@link org.apache.cassandra.net.MessagingService#current_version}
+     * @param overhead overhadd to add for mutation size to validate. Pass zero if not required but not a negative value.
+     * @throws {@link MutationExceededMaxSizeException} if {@link DatabaseDescriptor#getMaxMutationSize()} is exceeded
+      */
+    public void validateSize(int version, int overhead);
+
+    /**
      * Computes the total data size of the specified mutations.
      * @param mutations the mutations
      * @return the total data size of the specified mutations
diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java
index 3d27ef3..16d20db 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -38,6 +38,9 @@ import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
+import static org.apache.cassandra.net.MessagingService.VERSION_30;
+import static org.apache.cassandra.net.MessagingService.VERSION_3014;
+import static org.apache.cassandra.net.MessagingService.VERSION_40;
 import static org.apache.cassandra.utils.MonotonicClock.approxTime;
 
 public class Mutation implements IMutation
@@ -119,6 +122,15 @@ public class Mutation implements IMutation
         return modifications.values();
     }
 
+    public void validateSize(int version, int overhead)
+    {
+        long totalSize = serializedSize(version) + overhead;
+        if(totalSize > MAX_MUTATION_SIZE)
+        {
+            throw new MutationExceededMaxSizeException(this, version, totalSize);
+        }
+    }
+
     public PartitionUpdate getPartitionUpdate(TableMetadata table)
     {
         return table == null ? null : modifications.get(table.id);
@@ -256,6 +268,30 @@ public class Mutation implements IMutation
         }
         return buff.append("])").toString();
     }
+    private int serializedSize30;
+    private int serializedSize3014;
+    private int serializedSize40;
+
+    public int serializedSize(int version)
+    {
+        switch (version)
+        {
+            case VERSION_30:
+                if (serializedSize30 == 0)
+                    serializedSize30 = (int) serializer.serializedSize(this, VERSION_30);
+                return serializedSize30;
+            case VERSION_3014:
+                if (serializedSize3014 == 0)
+                    serializedSize3014 = (int) serializer.serializedSize(this, VERSION_3014);
+                return serializedSize3014;
+            case VERSION_40:
+                if (serializedSize40 == 0)
+                    serializedSize40 = (int) serializer.serializedSize(this, VERSION_40);
+                return serializedSize40;
+            default:
+                throw new IllegalStateException("Unknown serialization version: " + version);
+        }
+    }
 
     /**
      * Creates a new simple mutuation builder.
diff --git a/src/java/org/apache/cassandra/db/MutationExceededMaxSizeException.java b/src/java/org/apache/cassandra/db/MutationExceededMaxSizeException.java
new file mode 100644
index 0000000..084c21e
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/MutationExceededMaxSizeException.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+
+import static org.apache.cassandra.db.IMutation.MAX_MUTATION_SIZE;
+
+public class MutationExceededMaxSizeException extends RuntimeException
+{
+    public static final int PARTITION_MESSAGE_LIMIT = 1024;
+
+    public final long mutationSize;
+
+    MutationExceededMaxSizeException(IMutation mutation, int serializationVersion, long totalSize)
+    {
+        super(prepareMessage(mutation, serializationVersion, totalSize));
+        this.mutationSize = totalSize;
+    }
+
+    private static String prepareMessage(IMutation mutation, int version, long totalSize)
+    {
+        List<String> topPartitions = mutation.getPartitionUpdates().stream()
+                                             .sorted((upd1, upd2) ->
+                                                     Long.compare(PartitionUpdate.serializer.serializedSize(upd2, version),
+                                                                  PartitionUpdate.serializer.serializedSize(upd1, version)))
+                                             .map(upd -> String.format("%s.%s",
+                                                                       upd.metadata().name,
+                                                                       upd.metadata().partitionKeyType.getString(upd.partitionKey().getKey())))
+                                             .collect(Collectors.toList());
+
+        String topKeys = makeTopKeysString(topPartitions, PARTITION_MESSAGE_LIMIT);
+        return String.format("Encountered an oversized mutation (%d/%d) for keyspace: %s. Top keys are: %s",
+                             totalSize,
+                             MAX_MUTATION_SIZE,
+                             mutation.getKeyspaceName(),
+                             topKeys);
+    }
+
+    @VisibleForTesting
+    static String makeTopKeysString(List<String> keys, int maxLength) {
+        Iterator<String> iterator = keys.listIterator();
+        StringBuilder stringBuilder = new StringBuilder();
+        while (iterator.hasNext())
+        {
+            String key = iterator.next();
+
+            if (stringBuilder.length() == 0)
+            {
+                stringBuilder.append(key); //ensures atleast one key is added
+                iterator.remove();
+            }
+            else if (stringBuilder.length() + key.length() + 2 <= maxLength) // 2 for ", "
+            {
+                stringBuilder.append(", ").append(key);
+                iterator.remove();
+            }
+            else
+                break;
+        }
+
+        if (keys.size() > 0)
+            stringBuilder.append(" and ").append(keys.size()).append(" more.");
+
+        return stringBuilder.toString();
+    }
+}
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index c9e79cd..e7f8743 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -64,10 +64,6 @@ public class CommitLog implements CommitLogMBean
 
     public static final CommitLog instance = CommitLog.construct();
 
-    // we only permit records HALF the size of a commit log, to ensure we don't spin allocating many mostly
-    // empty segments when writing large records
-    final long MAX_MUTATION_SIZE = DatabaseDescriptor.getMaxMutationSize();
-
     final public AbstractCommitLogSegmentManager segmentManager;
 
     public final CommitLogArchiver archiver;
@@ -265,19 +261,13 @@ public class CommitLog implements CommitLogMBean
     {
         assert mutation != null;
 
+        mutation.validateSize(MessagingService.current_version, ENTRY_OVERHEAD_SIZE);
+
         try (DataOutputBuffer dob = DataOutputBuffer.scratchBuffer.get())
         {
             Mutation.serializer.serialize(mutation, dob, MessagingService.current_version);
             int size = dob.getLength();
-
             int totalSize = size + ENTRY_OVERHEAD_SIZE;
-            if (totalSize > MAX_MUTATION_SIZE)
-            {
-                throw new IllegalArgumentException(String.format("Mutation of %s is too large for the maximum size of %s",
-                                                                 FBUtilities.prettyPrintMemory(totalSize),
-                                                                 FBUtilities.prettyPrintMemory(MAX_MUTATION_SIZE)));
-            }
-
             Allocation alloc = segmentManager.allocate(mutation, totalSize);
 
             CRC32 checksum = new CRC32();
diff --git a/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java b/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java
index 6db0acd..09ac4a6 100644
--- a/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java
+++ b/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java
@@ -109,4 +109,9 @@ public final class VirtualMutation implements IMutation
     {
         // no-op
     }
+
+    public void validateSize(int version, int overhead)
+    {
+        // no-op
+    }
 }
diff --git a/src/java/org/apache/cassandra/hints/Hint.java b/src/java/org/apache/cassandra/hints/Hint.java
index 7e4618c..6c7c5d4 100644
--- a/src/java/org/apache/cassandra/hints/Hint.java
+++ b/src/java/org/apache/cassandra/hints/Hint.java
@@ -149,7 +149,7 @@ public final class Hint
         {
             long size = sizeof(hint.creationTime);
             size += sizeofUnsignedVInt(hint.gcgs);
-            size += Mutation.serializer.serializedSize(hint.mutation, version);
+            size += hint.mutation.serializedSize(version);
             return size;
         }
 
diff --git a/src/java/org/apache/cassandra/schema/MigrationManager.java b/src/java/org/apache/cassandra/schema/MigrationManager.java
index 257634c..4f91d94 100644
--- a/src/java/org/apache/cassandra/schema/MigrationManager.java
+++ b/src/java/org/apache/cassandra/schema/MigrationManager.java
@@ -487,7 +487,7 @@ public class MigrationManager
         {
             int size = TypeSizes.sizeof(schema.size());
             for (Mutation mutation : schema)
-                size += Mutation.serializer.serializedSize(mutation, version);
+                size += mutation.serializedSize(version);
             return size;
         }
     }
diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepairs.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepairs.java
index 3a4978e..68d1b4c 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepairs.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepairs.java
@@ -21,11 +21,11 @@ package org.apache.cassandra.service.reads.repair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.MutationExceededMaxSizeException;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.locator.InetAddressAndPort;
@@ -33,6 +33,8 @@ import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.tracing.Tracing;
 
+import static org.apache.cassandra.db.IMutation.MAX_MUTATION_SIZE;
+
 public class BlockingReadRepairs
 {
     private static final Logger logger = LoggerFactory.getLogger(BlockingReadRepairs.class);
@@ -51,43 +53,42 @@ public class BlockingReadRepairs
 
         DecoratedKey key = update.partitionKey();
         Mutation mutation = new Mutation(update);
-        Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
-        TableMetadata metadata = update.metadata();
-
         int messagingVersion = MessagingService.instance().versions.get(destination);
 
-        int    mutationSize = (int) Mutation.serializer.serializedSize(mutation, messagingVersion);
-        int maxMutationSize = DatabaseDescriptor.getMaxMutationSize();
-
-
-        if (mutationSize <= maxMutationSize)
+        try
         {
+            mutation.validateSize(messagingVersion, 0);
             return mutation;
         }
-        else if (DROP_OVERSIZED_READ_REPAIR_MUTATIONS)
-        {
-            logger.debug("Encountered an oversized ({}/{}) read repair mutation for table {}, key {}, node {}",
-                         mutationSize,
-                         maxMutationSize,
-                         metadata,
-                         metadata.partitionKeyType.getString(key.getKey()),
-                         destination);
-            return null;
-        }
-        else
+        catch (MutationExceededMaxSizeException e)
         {
-            logger.warn("Encountered an oversized ({}/{}) read repair mutation for table {}, key {}, node {}",
-                        mutationSize,
-                        maxMutationSize,
-                        metadata,
-                        metadata.partitionKeyType.getString(key.getKey()),
-                        destination);
+            Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
+            TableMetadata metadata = update.metadata();
 
-            if (!suppressException)
+            if (DROP_OVERSIZED_READ_REPAIR_MUTATIONS)
             {
-                int blockFor = consistency.blockFor(keyspace);
-                Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor);
-                throw new ReadTimeoutException(consistency, blockFor - 1, blockFor, true);
+                logger.debug("Encountered an oversized ({}/{}) read repair mutation for table {}, key {}, node {}",
+                             e.mutationSize,
+                             MAX_MUTATION_SIZE,
+                             metadata,
+                             metadata.partitionKeyType.getString(key.getKey()),
+                             destination);
+            }
+            else
+            {
+                logger.warn("Encountered an oversized ({}/{}) read repair mutation for table {}, key {}, node {}",
+                            e.mutationSize,
+                            MAX_MUTATION_SIZE,
+                            metadata,
+                            metadata.partitionKeyType.getString(key.getKey()),
+                            destination);
+
+                if (!suppressException)
+                {
+                    int blockFor = consistency.blockFor(keyspace);
+                    Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor);
+                    throw new ReadTimeoutException(consistency, blockFor - 1, blockFor, true);
+                }
             }
             return null;
         }
diff --git a/test/microbench/org/apache/cassandra/test/microbench/MutationBench.java b/test/microbench/org/apache/cassandra/test/microbench/MutationBench.java
index 074e183..41d6aab 100644
--- a/test/microbench/org/apache/cassandra/test/microbench/MutationBench.java
+++ b/test/microbench/org/apache/cassandra/test/microbench/MutationBench.java
@@ -100,7 +100,7 @@ public class MutationBench
         Schema.instance.load(ksm.withSwapped(ksm.tables.with(metadata)));
 
         mutation = (Mutation)UpdateBuilder.create(metadata, 1L).newRow(1L).add("commentid", 32L).makeMutation();
-        buffer = ByteBuffer.allocate((int) Mutation.serializer.serializedSize(mutation, MessagingService.current_version));
+        buffer = ByteBuffer.allocate(mutation.serializedSize(MessagingService.current_version));
         outputBuffer = new DataOutputBufferFixed(buffer);
         inputBuffer = new DataInputBuffer(buffer, false);
 
diff --git a/test/unit/org/apache/cassandra/db/MutationExceededMaxSizeExceptionTest.java b/test/unit/org/apache/cassandra/db/MutationExceededMaxSizeExceptionTest.java
new file mode 100644
index 0000000..81d9735
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/MutationExceededMaxSizeExceptionTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Test;
+
+import static org.apache.cassandra.db.MutationExceededMaxSizeException.makeTopKeysString;
+import static org.junit.Assert.*;
+
+public class MutationExceededMaxSizeExceptionTest
+{
+
+    @Test
+    public void testMakePKString()
+    {
+        List<String> keys = Arrays.asList("aaa", "bbb", "ccc");
+
+        assertEquals(0, makeTopKeysString(new ArrayList<>(), 1024).length());
+        assertEquals("aaa and 2 more.", makeTopKeysString(new ArrayList<>(keys), 0));
+        assertEquals("aaa and 2 more.", makeTopKeysString(new ArrayList<>(keys), 5));
+        assertEquals("aaa, bbb, ccc", makeTopKeysString(new ArrayList<>(keys), 13));
+        assertEquals("aaa, bbb, ccc", makeTopKeysString(new ArrayList<>(keys), 1024));
+        assertEquals("aaa, bbb and 1 more.", makeTopKeysString(new ArrayList<>(keys), 8));
+        assertEquals("aaa, bbb and 1 more.", makeTopKeysString(new ArrayList<>(keys), 10));
+    }
+}
\ No newline at end of file
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 1e3f622..0e7f30d 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@ -69,9 +69,13 @@ import org.apache.cassandra.utils.KillerForTests;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.vint.VIntCoding;
 
+import org.junit.After;
+
+import static org.apache.cassandra.db.commitlog.CommitLogSegment.ENTRY_OVERHEAD_SIZE;
 import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import org.apache.cassandra.db.marshal.IntegerType;
@@ -420,7 +424,7 @@ public abstract class CommitLogTest
         max -= CommitLogSegment.ENTRY_OVERHEAD_SIZE; // log entry overhead
 
         // Note that the size of the value if vint encoded. So we first compute the ovehead of the mutation without the value and it's size
-        int mutationOverhead = (int)Mutation.serializer.serializedSize(rm, MessagingService.current_version) - (VIntCoding.computeVIntSize(allocSize) + allocSize);
+        int mutationOverhead = rm.serializedSize(MessagingService.current_version) - (VIntCoding.computeVIntSize(allocSize) + allocSize);
         max -= mutationOverhead;
 
         // Now, max is the max for both the value and it's size. But we want to know how much we can allocate, i.e. the size of the value.
@@ -447,7 +451,7 @@ public abstract class CommitLogTest
         CommitLog.instance.add(rm);
     }
 
-    @Test(expected = IllegalArgumentException.class)
+    @Test(expected = MutationExceededMaxSizeException.class)
     public void testExceedRecordLimit() throws Exception
     {
         Keyspace ks = Keyspace.open(KEYSPACE1);
@@ -459,6 +463,47 @@ public abstract class CommitLogTest
         CommitLog.instance.add(rm);
         throw new AssertionError("mutation larger than limit was accepted");
     }
+    @Test
+    public void testExceedRecordLimitWithMultiplePartitions() throws Exception
+    {
+        CommitLog.instance.resetUnsafe(true);
+        List<Mutation> mutations = new ArrayList<>();
+        Keyspace ks = Keyspace.open(KEYSPACE1);
+        char[] keyChars = new char[MutationExceededMaxSizeException.PARTITION_MESSAGE_LIMIT];
+        Arrays.fill(keyChars, 'k');
+        String key = new String(keyChars);
+
+        // large mutation
+        mutations.add(new RowUpdateBuilder(ks.getColumnFamilyStore(STANDARD1).metadata(), 0, key)
+                      .clustering("bytes")
+                      .add("val", ByteBuffer.allocate(1 + getMaxRecordDataSize()))
+                      .build());
+
+        // smaller mutation
+        mutations.add(new RowUpdateBuilder(ks.getColumnFamilyStore(STANDARD2).metadata(), 0, key)
+                      .clustering("bytes")
+                      .add("val", ByteBuffer.allocate(1 + getMaxRecordDataSize() - 1024))
+                      .build());
+
+        Mutation mutation = Mutation.merge(mutations);
+        try
+        {
+            CommitLog.instance.add(Mutation.merge(mutations));
+            throw new AssertionError("mutation larger than limit was accepted");
+        }
+        catch (MutationExceededMaxSizeException exception)
+        {
+            String message = exception.getMessage();
+
+            long mutationSize = mutation.serializedSize(MessagingService.current_version) + ENTRY_OVERHEAD_SIZE;
+            final String expectedMessagePrefix = String.format("Encountered an oversized mutation (%d/%d) for keyspace: %s.",
+                                                               mutationSize,
+                                                               DatabaseDescriptor.getMaxMutationSize(),
+                                                               KEYSPACE1);
+            assertTrue(message.startsWith(expectedMessagePrefix));
+            assertTrue(message.contains(String.format("%s.%s and 1 more.", STANDARD1, key)));
+        }
+    }
 
     protected void testRecoveryWithBadSizeArgument(int size, int dataSize) throws Exception
     {
@@ -655,7 +700,7 @@ public abstract class CommitLogTest
         {
             DatabaseDescriptor.setAutoSnapshot(false);
             Keyspace notDurableKs = Keyspace.open(KEYSPACE2);
-            Assert.assertFalse(notDurableKs.getMetadata().params.durableWrites);
+            assertFalse(notDurableKs.getMetadata().params.durableWrites);
 
             ColumnFamilyStore cfs = notDurableKs.getColumnFamilyStore("Standard1");
             new RowUpdateBuilder(cfs.metadata(), 0, "key1")
@@ -699,7 +744,7 @@ public abstract class CommitLogTest
 
         SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, CommitLogPosition.NONE, cfs.metadata());
         List<String> activeSegments = CommitLog.instance.getActiveSegmentNames();
-        Assert.assertFalse(activeSegments.isEmpty());
+        assertFalse(activeSegments.isEmpty());
 
         File[] files = new File(CommitLog.instance.segmentManager.storageDirectory).listFiles((file, name) -> activeSegments.contains(name));
         replayer.replayFiles(files);
@@ -736,7 +781,7 @@ public abstract class CommitLogTest
 
         SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, commitLogPosition, cfs.metadata());
         List<String> activeSegments = CommitLog.instance.getActiveSegmentNames();
-        Assert.assertFalse(activeSegments.isEmpty());
+        assertFalse(activeSegments.isEmpty());
 
         File[] files = new File(CommitLog.instance.segmentManager.storageDirectory).listFiles((file, name) -> activeSegments.contains(name));
         replayer.replayFiles(files);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org